1use async_trait::async_trait;
2use mnemara_core::{
3 ArchiveReceipt, ArchiveRequest, BatchUpsertRequest, CompactionReport, CompactionRequest,
4 DeleteReceipt, DeleteRequest, EngineConfig, Error, ExportRequest, ImportFailure, ImportMode,
5 ImportReport, ImportRequest, IntegrityCheckReport, IntegrityCheckRequest, LineageLink,
6 LineageRelationKind, MaintenanceStats, MemoryHistoricalState, MemoryQualityState, MemoryRecord,
7 MemoryScope, MemoryStore, MemoryTrustLevel, NamespaceStats, PortableRecord,
8 PortableStorePackage, RecallExplanation, RecallHistoricalMode, RecallHit, RecallPlanner,
9 RecallPlanningProfile, RecallPlanningTrace, RecallQuery, RecallResult, RecallScorer,
10 RecallTemporalOrder, RecallTraceCandidate, RecoverReceipt, RecoverRequest, RepairReport,
11 RepairRequest, Result, SemanticEmbedder, SnapshotManifest, StoreStatsReport, StoreStatsRequest,
12 SuppressReceipt, SuppressRequest, UpsertReceipt, UpsertRequest,
13};
14use serde::{Deserialize, Serialize};
15use sled::{Db, Tree};
16use std::collections::{BTreeMap, BTreeSet, HashMap};
17use std::fmt;
18use std::hash::{Hash, Hasher};
19use std::path::{Path, PathBuf};
20use std::sync::Arc;
21use std::time::{SystemTime, UNIX_EPOCH};
22
23const RECORDS_TREE: &str = "records";
24const IDEMPOTENCY_TREE: &str = "idempotency";
25
26#[derive(Clone)]
27pub struct SledStoreConfig {
28 pub data_dir: PathBuf,
29 pub engine_config: EngineConfig,
30 pub shared_embedder: Option<SharedEmbedderConfig>,
31}
32
33#[derive(Clone)]
34pub struct SharedEmbedderConfig {
35 pub embedder: Arc<dyn SemanticEmbedder>,
36 pub provider_note: String,
37}
38
39impl SharedEmbedderConfig {
40 fn new(embedder: Arc<dyn SemanticEmbedder>, provider_note: impl Into<String>) -> Self {
41 Self {
42 embedder,
43 provider_note: provider_note.into(),
44 }
45 }
46}
47
48impl fmt::Debug for SharedEmbedderConfig {
49 fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result {
50 formatter
51 .debug_struct("SharedEmbedderConfig")
52 .field("provider_note", &self.provider_note)
53 .finish()
54 }
55}
56
57impl fmt::Debug for SledStoreConfig {
58 fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result {
59 formatter
60 .debug_struct("SledStoreConfig")
61 .field("data_dir", &self.data_dir)
62 .field("engine_config", &self.engine_config)
63 .field("shared_embedder", &self.shared_embedder)
64 .finish()
65 }
66}
67
68impl SledStoreConfig {
69 pub fn new(data_dir: impl AsRef<Path>) -> Self {
70 Self {
71 data_dir: data_dir.as_ref().to_path_buf(),
72 engine_config: EngineConfig::default(),
73 shared_embedder: None,
74 }
75 }
76
77 pub fn with_engine_config(mut self, engine_config: EngineConfig) -> Self {
78 self.engine_config = engine_config;
79 self
80 }
81
82 pub fn with_shared_embedder(
83 mut self,
84 embedder: Arc<dyn SemanticEmbedder>,
85 provider_note: impl Into<String>,
86 ) -> Self {
87 self.shared_embedder = Some(SharedEmbedderConfig::new(embedder, provider_note));
88 self
89 }
90
91 fn recall_planner(&self) -> RecallPlanner {
92 if let Some(shared_embedder) = &self.shared_embedder {
93 RecallPlanner::with_shared_embedder(
94 self.engine_config.recall_planning_profile,
95 self.engine_config.graph_expansion_max_hops,
96 self.engine_config.recall_scorer_kind,
97 self.engine_config.recall_scoring_profile,
98 self.engine_config.recall_policy_profile,
99 Arc::clone(&shared_embedder.embedder),
100 shared_embedder.provider_note.clone(),
101 )
102 } else {
103 RecallPlanner::from_engine_config(&self.engine_config)
104 }
105 }
106}
107
108#[derive(Debug)]
109pub struct SledMemoryStore {
110 db: Db,
111 records: Tree,
112 idempotency: Tree,
113 config: SledStoreConfig,
114}
115
116#[derive(Debug, Clone, Serialize, Deserialize)]
117struct StoredRecord {
118 record: MemoryRecord,
119 idempotency_key: Option<String>,
120}
121
122#[derive(Debug, Clone)]
123struct IdempotencyMapping {
124 scoped_key: String,
125 record_id: String,
126}
127
128type ScopeKeyParts = (
129 String,
130 String,
131 String,
132 Option<String>,
133 Option<String>,
134 String,
135);
136
137#[derive(Debug, Clone, Copy, Default)]
138struct IntegritySummary {
139 scanned_records: u64,
140 scanned_idempotency_keys: u64,
141 stale_idempotency_keys: u64,
142 missing_idempotency_keys: u64,
143 duplicate_active_records: u64,
144}
145
146#[derive(Debug, Clone, Copy, Default)]
147struct RelativeTemporalBounds {
148 after_unix_ms: Option<u64>,
149 before_unix_ms: Option<u64>,
150}
151
152const PORTABLE_PACKAGE_VERSION: u32 = 1;
153
154impl SledMemoryStore {
155 pub fn open(config: SledStoreConfig) -> Result<Self> {
156 std::fs::create_dir_all(&config.data_dir).map_err(|err| {
157 Error::Backend(format!(
158 "failed to create sled store dir {}: {err}",
159 config.data_dir.display()
160 ))
161 })?;
162 let db = sled::open(&config.data_dir).map_err(|err| {
163 Error::Backend(format!(
164 "failed to open sled db {}: {err}",
165 config.data_dir.display()
166 ))
167 })?;
168 let records = db
169 .open_tree(RECORDS_TREE)
170 .map_err(|err| Error::Backend(format!("failed to open records tree: {err}")))?;
171 let idempotency = db
172 .open_tree(IDEMPOTENCY_TREE)
173 .map_err(|err| Error::Backend(format!("failed to open idempotency tree: {err}")))?;
174 Ok(Self {
175 db,
176 records,
177 idempotency,
178 config,
179 })
180 }
181
182 fn validate_record(record: &MemoryRecord) -> Result<()> {
183 record.validate()
184 }
185
186 fn validate_upsert_request(&self, request: &UpsertRequest) -> Result<()> {
187 Self::validate_record(&request.record)?;
188 if request.idempotency_key.is_none()
189 && self
190 .config
191 .engine_config
192 .ingestion
193 .idempotent_writes_required
194 {
195 return Err(Error::InvalidRequest(
196 "idempotency_key is required by the current ingestion policy".to_string(),
197 ));
198 }
199 if self.config.engine_config.ingestion.require_source_labels
200 && request.record.scope.labels.is_empty()
201 {
202 return Err(Error::InvalidRequest(
203 "at least one source label is required by the current ingestion policy".to_string(),
204 ));
205 }
206 Ok(())
207 }
208
209 fn is_pinned(record: &MemoryRecord) -> bool {
210 record.scope.trust_level == MemoryTrustLevel::Pinned
211 }
212
213 fn retention_exempt(&self, record: &MemoryRecord) -> bool {
214 self.config.engine_config.retention.pinned_records_exempt && Self::is_pinned(record)
215 }
216
217 fn validate_delete_request(request: &DeleteRequest) -> Result<()> {
218 if request.tenant_id.trim().is_empty() {
219 return Err(Error::InvalidRequest(
220 "delete tenant_id is required".to_string(),
221 ));
222 }
223 if request.namespace.trim().is_empty() {
224 return Err(Error::InvalidRequest(
225 "delete namespace is required".to_string(),
226 ));
227 }
228 if request.record_id.trim().is_empty() {
229 return Err(Error::InvalidRequest(
230 "delete record_id is required".to_string(),
231 ));
232 }
233 if request.audit_reason.trim().is_empty() {
234 return Err(Error::InvalidRequest(
235 "delete audit_reason is required".to_string(),
236 ));
237 }
238 Ok(())
239 }
240
241 fn validate_archive_request(request: &ArchiveRequest) -> Result<()> {
242 Self::validate_lifecycle_request(
243 "archive",
244 &request.tenant_id,
245 &request.namespace,
246 &request.record_id,
247 &request.audit_reason,
248 )
249 }
250
251 fn validate_suppress_request(request: &SuppressRequest) -> Result<()> {
252 Self::validate_lifecycle_request(
253 "suppress",
254 &request.tenant_id,
255 &request.namespace,
256 &request.record_id,
257 &request.audit_reason,
258 )
259 }
260
261 fn validate_recover_request(request: &RecoverRequest) -> Result<()> {
262 Self::validate_lifecycle_request(
263 "recover",
264 &request.tenant_id,
265 &request.namespace,
266 &request.record_id,
267 &request.audit_reason,
268 )?;
269 match request.quality_state {
270 MemoryQualityState::Active | MemoryQualityState::Verified => {}
271 _ => {
272 return Err(Error::InvalidRequest(
273 "recover quality_state must be Active or Verified".to_string(),
274 ));
275 }
276 }
277 if matches!(
278 request.historical_state,
279 Some(MemoryHistoricalState::Superseded)
280 ) {
281 return Err(Error::InvalidRequest(
282 "recover historical_state cannot be Superseded".to_string(),
283 ));
284 }
285 Ok(())
286 }
287
288 fn validate_lifecycle_request(
289 action: &str,
290 tenant_id: &str,
291 namespace: &str,
292 record_id: &str,
293 audit_reason: &str,
294 ) -> Result<()> {
295 if tenant_id.trim().is_empty() {
296 return Err(Error::InvalidRequest(format!(
297 "{action} tenant_id is required"
298 )));
299 }
300 if namespace.trim().is_empty() {
301 return Err(Error::InvalidRequest(format!(
302 "{action} namespace is required"
303 )));
304 }
305 if record_id.trim().is_empty() {
306 return Err(Error::InvalidRequest(format!(
307 "{action} record_id is required"
308 )));
309 }
310 if audit_reason.trim().is_empty() {
311 return Err(Error::InvalidRequest(format!(
312 "{action} audit_reason is required"
313 )));
314 }
315 Ok(())
316 }
317
318 fn validate_record_scope(
319 stored: &StoredRecord,
320 tenant_id: &str,
321 namespace: &str,
322 ) -> Result<()> {
323 if stored.record.scope.tenant_id != tenant_id {
324 return Err(Error::InvalidRequest(format!(
325 "record {} does not belong to tenant {}",
326 stored.record.id, tenant_id
327 )));
328 }
329 if stored.record.scope.namespace != namespace {
330 return Err(Error::InvalidRequest(format!(
331 "record {} does not belong to namespace {}",
332 stored.record.id, namespace
333 )));
334 }
335 Ok(())
336 }
337
338 fn validate_import_request(
339 &self,
340 request: &ImportRequest,
341 ) -> (u64, bool, Vec<ImportFailure>, Vec<PortableRecord>) {
342 let mut validated_records = 0u64;
343 let mut failures = Vec::new();
344 let mut entries = Vec::with_capacity(request.package.records.len());
345
346 if request.package.package_version != PORTABLE_PACKAGE_VERSION {
347 failures.push(ImportFailure {
348 record_id: None,
349 reason: format!(
350 "unsupported portable package version {}; expected {}",
351 request.package.package_version, PORTABLE_PACKAGE_VERSION
352 ),
353 });
354 }
355
356 if request.package.manifest.record_count != request.package.records.len() as u64 {
357 failures.push(ImportFailure {
358 record_id: None,
359 reason: format!(
360 "portable package manifest record_count={} does not match payload records={}",
361 request.package.manifest.record_count,
362 request.package.records.len()
363 ),
364 });
365 }
366
367 for entry in &request.package.records {
368 match Self::validate_record(&entry.record) {
369 Ok(()) => {
370 validated_records += 1;
371 entries.push(entry.clone());
372 }
373 Err(error) => failures.push(ImportFailure {
374 record_id: Some(entry.record.id.clone()),
375 reason: error.to_string(),
376 }),
377 }
378 }
379
380 (validated_records, failures.is_empty(), failures, entries)
381 }
382
383 fn now_unix_ms() -> Result<u64> {
384 SystemTime::now()
385 .duration_since(UNIX_EPOCH)
386 .map_err(|err| Error::Backend(format!("system clock error: {err}")))
387 .map(|value| value.as_millis() as u64)
388 }
389
390 fn encode_record(stored: &StoredRecord) -> Result<Vec<u8>> {
391 serde_json::to_vec(stored)
392 .map_err(|err| Error::Backend(format!("failed to encode record: {err}")))
393 }
394
395 fn decode_record(value: &[u8]) -> Result<StoredRecord> {
396 serde_json::from_slice::<StoredRecord>(value)
397 .map_err(|err| Error::Backend(format!("failed to decode stored record: {err}")))
398 }
399
400 fn idempotency_scope_key(scope: &MemoryScope, key: &str) -> String {
401 format!(
402 "{}\u{1f}{}\u{1f}{}\u{1f}{}\u{1f}{}\u{1f}{}",
403 scope.tenant_id,
404 scope.namespace,
405 scope.actor_id,
406 scope.conversation_id.as_deref().unwrap_or(""),
407 scope.session_id.as_deref().unwrap_or(""),
408 key
409 )
410 }
411
412 fn fetch_record(&self, record_id: &str) -> Result<Option<StoredRecord>> {
413 self.records
414 .get(record_id.as_bytes())
415 .map_err(|err| Error::Backend(format!("failed to read record: {err}")))?
416 .map(|value| Self::decode_record(&value))
417 .transpose()
418 }
419
420 fn remove_idempotency_mapping(&self, record: &StoredRecord) -> Result<()> {
421 if let Some(idempotency_key) = &record.idempotency_key {
422 let scoped_key = Self::idempotency_scope_key(&record.record.scope, idempotency_key);
423 self.idempotency
424 .remove(scoped_key.as_bytes())
425 .map_err(|err| {
426 Error::Backend(format!("failed to remove idempotency key: {err}"))
427 })?;
428 }
429 Ok(())
430 }
431
432 fn matches_scope(candidate: &MemoryScope, query: &MemoryScope) -> bool {
433 candidate.tenant_id == query.tenant_id
434 && candidate.namespace == query.namespace
435 && candidate.actor_id == query.actor_id
436 && (query.conversation_id.is_none()
437 || candidate.conversation_id == query.conversation_id)
438 && (query.session_id.is_none() || candidate.session_id == query.session_id)
439 }
440
441 fn record_passes_filters(
442 record: &MemoryRecord,
443 query: &RecallQuery,
444 relative_bounds: RelativeTemporalBounds,
445 ) -> bool {
446 if !Self::matches_scope(&record.scope, &query.scope) {
447 return false;
448 }
449
450 if let Some(expires_at_unix_ms) = record.expires_at_unix_ms
451 && expires_at_unix_ms <= Self::now_unix_ms().unwrap_or(u64::MAX)
452 {
453 return false;
454 }
455
456 if let Some(min_score) = query.filters.min_importance_score
457 && record.importance_score < min_score
458 {
459 return false;
460 }
461
462 if let Some(source) = &query.filters.source
463 && &record.scope.source != source
464 {
465 return false;
466 }
467
468 if let Some(from_unix_ms) = query.filters.from_unix_ms
469 && record.updated_at_unix_ms < from_unix_ms
470 {
471 return false;
472 }
473
474 if let Some(to_unix_ms) = query.filters.to_unix_ms
475 && record.updated_at_unix_ms > to_unix_ms
476 {
477 return false;
478 }
479
480 if let Some(after_unix_ms) = relative_bounds.after_unix_ms
481 && Self::record_temporal_anchor(record) <= after_unix_ms
482 {
483 return false;
484 }
485
486 if let Some(before_unix_ms) = relative_bounds.before_unix_ms
487 && Self::record_temporal_anchor(record) >= before_unix_ms
488 {
489 return false;
490 }
491
492 if !query.filters.trust_levels.is_empty()
493 && !query
494 .filters
495 .trust_levels
496 .contains(&record.scope.trust_level)
497 {
498 return false;
499 }
500
501 if !query.filters.required_labels.is_empty()
502 && !query.filters.required_labels.iter().all(|label| {
503 record
504 .scope
505 .labels
506 .iter()
507 .any(|candidate| candidate == label)
508 })
509 {
510 return false;
511 }
512
513 if !query.filters.kinds.is_empty() && !query.filters.kinds.contains(&record.kind) {
514 return false;
515 }
516
517 if let Some(episode_id) = &query.filters.episode_id
518 && record.episode.as_ref().map(|episode| &episode.episode_id) != Some(episode_id)
519 {
520 return false;
521 }
522
523 if !query.filters.continuity_states.is_empty()
524 && !record.episode.as_ref().is_some_and(|episode| {
525 query
526 .filters
527 .continuity_states
528 .contains(&episode.continuity_state)
529 })
530 {
531 return false;
532 }
533
534 if query.filters.unresolved_only
535 && !record
536 .episode
537 .as_ref()
538 .is_some_and(|episode| episode.continuity_state.is_unresolved())
539 {
540 return false;
541 }
542
543 if let Some(lineage_record_id) = &query.filters.lineage_record_id
544 && record.id != *lineage_record_id
545 && !record
546 .lineage
547 .iter()
548 .any(|link| &link.record_id == lineage_record_id)
549 {
550 return false;
551 }
552
553 if !query.filters.boundary_labels.is_empty()
554 && !record.episode.as_ref().is_some_and(|episode| {
555 episode.boundary_label.as_ref().is_some_and(|label| {
556 query
557 .filters
558 .boundary_labels
559 .iter()
560 .any(|expected| expected == label)
561 })
562 })
563 {
564 return false;
565 }
566
567 if let Some(recurrence_key) = &query.filters.recurrence_key
568 && record
569 .episode
570 .as_ref()
571 .and_then(|episode| episode.recurrence_key.as_ref())
572 != Some(recurrence_key)
573 {
574 return false;
575 }
576
577 if !query.filters.conflict_states.is_empty()
578 && !record
579 .conflict
580 .as_ref()
581 .is_some_and(|conflict| query.filters.conflict_states.contains(&conflict.state))
582 {
583 return false;
584 }
585
586 if !query.filters.resolution_kinds.is_empty()
587 && !record.conflict.as_ref().is_some_and(|conflict| {
588 query
589 .filters
590 .resolution_kinds
591 .contains(&conflict.resolution)
592 })
593 {
594 return false;
595 }
596
597 if query.filters.unresolved_conflicts_only
598 && !record.conflict.as_ref().is_some_and(|conflict| {
599 matches!(
600 conflict.state,
601 mnemara_core::ConflictReviewState::PotentialConflict
602 | mnemara_core::ConflictReviewState::UnderReview
603 )
604 })
605 {
606 return false;
607 }
608
609 match query.filters.historical_mode {
610 RecallHistoricalMode::CurrentOnly => {
611 if !matches!(record.historical_state, MemoryHistoricalState::Current) {
612 return false;
613 }
614 }
615 RecallHistoricalMode::HistoricalOnly => {
616 if matches!(record.historical_state, MemoryHistoricalState::Current) {
617 return false;
618 }
619 }
620 RecallHistoricalMode::IncludeHistorical => {}
621 }
622
623 if !query.filters.states.is_empty() {
624 if !query.filters.states.contains(&record.quality_state) {
625 return false;
626 }
627 } else {
628 match record.quality_state {
629 MemoryQualityState::Archived if !query.filters.include_archived => return false,
630 MemoryQualityState::Deleted | MemoryQualityState::Suppressed => return false,
631 _ => {}
632 }
633 }
634
635 true
636 }
637
638 fn relative_temporal_bounds(
639 records: &[StoredRecord],
640 query: &RecallQuery,
641 ) -> Result<RelativeTemporalBounds> {
642 let mut bounds = RelativeTemporalBounds::default();
643 if let Some(after_record_id) = &query.filters.after_record_id {
644 let Some(anchor) = records
645 .iter()
646 .find(|stored| {
647 stored.record.id == *after_record_id
648 && Self::matches_scope(&stored.record.scope, &query.scope)
649 })
650 .map(|stored| Self::record_temporal_anchor(&stored.record))
651 else {
652 return Err(Error::InvalidRequest(format!(
653 "after_record_id '{after_record_id}' was not found in recall scope"
654 )));
655 };
656 bounds.after_unix_ms = Some(anchor);
657 }
658 if let Some(before_record_id) = &query.filters.before_record_id {
659 let Some(anchor) = records
660 .iter()
661 .find(|stored| {
662 stored.record.id == *before_record_id
663 && Self::matches_scope(&stored.record.scope, &query.scope)
664 })
665 .map(|stored| Self::record_temporal_anchor(&stored.record))
666 else {
667 return Err(Error::InvalidRequest(format!(
668 "before_record_id '{before_record_id}' was not found in recall scope"
669 )));
670 };
671 bounds.before_unix_ms = Some(anchor);
672 }
673 if let (Some(after), Some(before)) = (bounds.after_unix_ms, bounds.before_unix_ms)
674 && after >= before
675 {
676 return Err(Error::InvalidRequest(
677 "after_record_id must refer to an earlier record than before_record_id".to_string(),
678 ));
679 }
680 Ok(bounds)
681 }
682
683 fn iterate_records(&self) -> Result<Vec<StoredRecord>> {
684 let mut records = Vec::new();
685 for item in self.records.iter() {
686 let (_, value) =
687 item.map_err(|err| Error::Backend(format!("sled iteration failed: {err}")))?;
688 records.push(Self::decode_record(&value)?);
689 }
690 Ok(records)
691 }
692
693 fn iterate_idempotency_mappings(&self) -> Result<Vec<IdempotencyMapping>> {
694 let mut mappings = Vec::new();
695 for item in self.idempotency.iter() {
696 let (key, value) =
697 item.map_err(|err| Error::Backend(format!("sled iteration failed: {err}")))?;
698 let scoped_key = String::from_utf8(key.to_vec()).map_err(|err| {
699 Error::Backend(format!("invalid idempotency key encoding: {err}"))
700 })?;
701 let record_id = String::from_utf8(value.to_vec()).map_err(|err| {
702 Error::Backend(format!("invalid idempotency value encoding: {err}"))
703 })?;
704 mappings.push(IdempotencyMapping {
705 scoped_key,
706 record_id,
707 });
708 }
709 Ok(mappings)
710 }
711
712 fn parse_scope_key(scoped_key: &str) -> Option<ScopeKeyParts> {
713 let parts = scoped_key.split('\u{1f}').collect::<Vec<_>>();
714 if parts.len() != 6 {
715 return None;
716 }
717 Some((
718 parts[0].to_string(),
719 parts[1].to_string(),
720 parts[2].to_string(),
721 (!parts[3].is_empty()).then(|| parts[3].to_string()),
722 (!parts[4].is_empty()).then(|| parts[4].to_string()),
723 parts[5].to_string(),
724 ))
725 }
726
727 fn scope_matches_filters(
728 tenant_id: &str,
729 namespace: &str,
730 tenant_filter: Option<&str>,
731 namespace_filter: Option<&str>,
732 ) -> bool {
733 tenant_filter.is_none_or(|expected| tenant_id == expected)
734 && namespace_filter.is_none_or(|expected| namespace == expected)
735 }
736
737 fn build_integrity_summary(
738 &self,
739 tenant_filter: Option<&str>,
740 namespace_filter: Option<&str>,
741 ) -> Result<IntegritySummary> {
742 let records = self.iterate_records()?;
743 let mappings = self.iterate_idempotency_mappings()?;
744 let now_unix_ms = Self::now_unix_ms()?;
745
746 let filtered_records = records
747 .iter()
748 .filter(|stored| {
749 Self::scope_matches_filters(
750 &stored.record.scope.tenant_id,
751 &stored.record.scope.namespace,
752 tenant_filter,
753 namespace_filter,
754 )
755 })
756 .collect::<Vec<_>>();
757
758 let mut mapping_lookup = HashMap::new();
759 let mut stale_idempotency_keys = 0u64;
760 let mut scanned_idempotency_keys = 0u64;
761
762 for mapping in &mappings {
763 let Some((tenant_id, namespace, _, _, _, idempotency_key)) =
764 Self::parse_scope_key(&mapping.scoped_key)
765 else {
766 stale_idempotency_keys += 1;
767 scanned_idempotency_keys += 1;
768 continue;
769 };
770 if !Self::scope_matches_filters(&tenant_id, &namespace, tenant_filter, namespace_filter)
771 {
772 continue;
773 }
774 scanned_idempotency_keys += 1;
775 let Some(stored) = self.fetch_record(&mapping.record_id)? else {
776 stale_idempotency_keys += 1;
777 continue;
778 };
779 if stored.record.scope.tenant_id != tenant_id
780 || stored.record.scope.namespace != namespace
781 || stored.idempotency_key.as_deref() != Some(idempotency_key.as_str())
782 || Self::idempotency_scope_key(&stored.record.scope, &idempotency_key)
783 != mapping.scoped_key
784 {
785 stale_idempotency_keys += 1;
786 continue;
787 }
788 mapping_lookup.insert(mapping.scoped_key.clone(), mapping.record_id.clone());
789 }
790
791 let mut duplicate_groups = HashMap::<String, usize>::new();
792 let mut missing_idempotency_keys = 0u64;
793 let mut duplicate_active_records = 0u64;
794
795 for stored in &filtered_records {
796 if let Some(idempotency_key) = &stored.idempotency_key {
797 let scoped_key = Self::idempotency_scope_key(&stored.record.scope, idempotency_key);
798 if mapping_lookup.get(&scoped_key) != Some(&stored.record.id) {
799 missing_idempotency_keys += 1;
800 }
801 }
802
803 if stored
804 .record
805 .expires_at_unix_ms
806 .is_some_and(|value| value <= now_unix_ms)
807 {
808 }
810
811 if !matches!(
812 stored.record.quality_state,
813 MemoryQualityState::Archived
814 | MemoryQualityState::Deleted
815 | MemoryQualityState::Suppressed
816 ) {
817 *duplicate_groups
818 .entry(Self::dedup_signature(&stored.record))
819 .or_default() += 1;
820 }
821 }
822
823 for group_size in duplicate_groups.into_values() {
824 if group_size > 1 {
825 duplicate_active_records += (group_size - 1) as u64;
826 }
827 }
828
829 Ok(IntegritySummary {
830 scanned_records: filtered_records.len() as u64,
831 scanned_idempotency_keys,
832 stale_idempotency_keys,
833 missing_idempotency_keys,
834 duplicate_active_records,
835 })
836 }
837
838 fn build_stats_report(&self, request: &StoreStatsRequest) -> Result<StoreStatsReport> {
839 let records = self.iterate_records()?;
840 let tenant_filter = request.tenant_id.as_deref();
841 let namespace_filter = request.namespace.as_deref();
842 let filtered_records = records
843 .iter()
844 .filter(|stored| {
845 Self::scope_matches_filters(
846 &stored.record.scope.tenant_id,
847 &stored.record.scope.namespace,
848 tenant_filter,
849 namespace_filter,
850 )
851 })
852 .collect::<Vec<_>>();
853 let now_unix_ms = Self::now_unix_ms()?;
854 let integrity = self.build_integrity_summary(tenant_filter, namespace_filter)?;
855 let mut namespace_map = BTreeMap::<(String, String), NamespaceStats>::new();
856 let mut duplicate_groups = HashMap::<String, usize>::new();
857 let mut tombstoned_records = 0u64;
858 let mut expired_records = 0u64;
859
860 for stored in &filtered_records {
861 let key = (
862 stored.record.scope.tenant_id.clone(),
863 stored.record.scope.namespace.clone(),
864 );
865 let entry = namespace_map
866 .entry(key.clone())
867 .or_insert_with(|| NamespaceStats {
868 tenant_id: key.0.clone(),
869 namespace: key.1.clone(),
870 active_records: 0,
871 archived_records: 0,
872 deleted_records: 0,
873 suppressed_records: 0,
874 pinned_records: 0,
875 });
876 match stored.record.quality_state {
877 MemoryQualityState::Archived => entry.archived_records += 1,
878 MemoryQualityState::Deleted => {
879 entry.deleted_records += 1;
880 tombstoned_records += 1;
881 }
882 MemoryQualityState::Suppressed => entry.suppressed_records += 1,
883 _ => entry.active_records += 1,
884 }
885 if stored.record.scope.trust_level == MemoryTrustLevel::Pinned {
886 entry.pinned_records += 1;
887 }
888 if stored
889 .record
890 .expires_at_unix_ms
891 .is_some_and(|value| value <= now_unix_ms)
892 {
893 expired_records += 1;
894 }
895 if !matches!(
896 stored.record.quality_state,
897 MemoryQualityState::Archived
898 | MemoryQualityState::Deleted
899 | MemoryQualityState::Suppressed
900 ) {
901 *duplicate_groups
902 .entry(Self::dedup_signature(&stored.record))
903 .or_default() += 1;
904 }
905 }
906
907 let mut duplicate_candidate_groups = 0u64;
908 let mut duplicate_candidate_records = 0u64;
909 let mut historical_records = 0u64;
910 let mut superseded_records = 0u64;
911 let mut lineage_links = 0u64;
912
913 for stored in &filtered_records {
914 if matches!(
915 stored.record.historical_state,
916 MemoryHistoricalState::Historical
917 ) {
918 historical_records += 1;
919 }
920 if matches!(
921 stored.record.historical_state,
922 MemoryHistoricalState::Superseded
923 ) {
924 superseded_records += 1;
925 }
926 lineage_links += stored.record.lineage.len() as u64;
927 }
928
929 for group_size in duplicate_groups.into_values() {
930 if group_size > 1 {
931 duplicate_candidate_groups += 1;
932 duplicate_candidate_records += (group_size - 1) as u64;
933 }
934 }
935
936 Ok(StoreStatsReport {
937 generated_at_unix_ms: now_unix_ms,
938 total_records: filtered_records.len() as u64,
939 storage_bytes: self
940 .db
941 .size_on_disk()
942 .map_err(|err| Error::Backend(format!("failed to determine db size: {err}")))?,
943 namespaces: namespace_map.into_values().collect(),
944 maintenance: MaintenanceStats {
945 duplicate_candidate_groups,
946 duplicate_candidate_records,
947 tombstoned_records,
948 expired_records,
949 stale_idempotency_keys: integrity.stale_idempotency_keys,
950 historical_records,
951 superseded_records,
952 lineage_links,
953 },
954 engine: self.config.engine_config.tuning_info(),
955 })
956 }
957
958 fn approximate_tokens(record: &MemoryRecord) -> usize {
959 let content_tokens = record.content.split_whitespace().count();
960 let summary_tokens = record
961 .summary
962 .as_deref()
963 .map(|summary| summary.split_whitespace().count())
964 .unwrap_or(0);
965 content_tokens + summary_tokens
966 }
967
968 fn record_temporal_anchor(record: &MemoryRecord) -> u64 {
969 record
970 .episode
971 .as_ref()
972 .and_then(|episode| episode.last_active_unix_ms.or(episode.started_at_unix_ms))
973 .unwrap_or(record.updated_at_unix_ms)
974 }
975
976 fn selected_channels_for_hit(hit: &RecallHit, empty_query: bool) -> Vec<String> {
977 let mut selected_channels = if empty_query {
978 vec!["temporal".to_string(), "policy".to_string()]
979 } else {
980 vec!["lexical".to_string(), "policy".to_string()]
981 };
982 if hit.breakdown.semantic > 0.0 {
983 selected_channels.push("semantic".to_string());
984 }
985 if hit.breakdown.metadata > 0.0 {
986 selected_channels.push("metadata".to_string());
987 }
988 if hit.breakdown.episodic > 0.0 {
989 selected_channels.push("episodic".to_string());
990 }
991 if hit.breakdown.salience > 0.0 {
992 selected_channels.push("salience".to_string());
993 }
994 if hit.breakdown.curation > 0.0 {
995 selected_channels.push("curation".to_string());
996 }
997 if hit.record.conflict.is_some() {
998 selected_channels.push("conflict".to_string());
999 }
1000 selected_channels.sort();
1001 selected_channels.dedup();
1002 selected_channels
1003 }
1004
1005 fn planning_profile_note(profile: RecallPlanningProfile) -> &'static str {
1006 match profile {
1007 RecallPlanningProfile::FastPath => "planning_profile=fast_path",
1008 RecallPlanningProfile::ContinuityAware => "planning_profile=continuity_aware",
1009 }
1010 }
1011
1012 fn dedup_signature(record: &MemoryRecord) -> String {
1013 format!(
1014 "{}\u{1f}{}\u{1f}{}\u{1f}{}\u{1f}{}\u{1f}{}",
1015 record.scope.tenant_id,
1016 record.scope.namespace,
1017 record.scope.actor_id,
1018 record.kind as u8,
1019 record.content.trim().to_ascii_lowercase(),
1020 record
1021 .summary
1022 .clone()
1023 .unwrap_or_default()
1024 .trim()
1025 .to_ascii_lowercase()
1026 )
1027 }
1028
1029 fn summary_record_id(signature: &str) -> String {
1030 let mut hasher = std::collections::hash_map::DefaultHasher::new();
1031 signature.hash(&mut hasher);
1032 format!("compacted-summary-{:016x}", hasher.finish())
1033 }
1034
1035 fn compaction_summary_record(
1036 group: &[StoredRecord],
1037 signature: &str,
1038 now_unix_ms: u64,
1039 ) -> StoredRecord {
1040 let canonical = &group[0].record;
1041 let representative_summary = canonical
1042 .summary
1043 .clone()
1044 .filter(|value| !value.trim().is_empty())
1045 .unwrap_or_else(|| canonical.content.clone());
1046 let cluster_size = group.len();
1047 let max_importance_score = group
1048 .iter()
1049 .map(|stored| stored.record.importance_score)
1050 .fold(canonical.importance_score, f32::max);
1051
1052 let mut metadata = BTreeMap::new();
1053 metadata.insert(
1054 "compaction_reason".to_string(),
1055 "duplicate_cluster_rollup".to_string(),
1056 );
1057 metadata.insert(
1058 "compaction_cluster_size".to_string(),
1059 cluster_size.to_string(),
1060 );
1061 metadata.insert("representative_record_id".to_string(), canonical.id.clone());
1062
1063 let mut labels = canonical.scope.labels.clone();
1064 if !labels.iter().any(|label| label == "compacted") {
1065 labels.push("compacted".to_string());
1066 }
1067
1068 StoredRecord {
1069 record: MemoryRecord {
1070 id: Self::summary_record_id(signature),
1071 scope: MemoryScope {
1072 tenant_id: canonical.scope.tenant_id.clone(),
1073 namespace: canonical.scope.namespace.clone(),
1074 actor_id: canonical.scope.actor_id.clone(),
1075 conversation_id: canonical.scope.conversation_id.clone(),
1076 session_id: canonical.scope.session_id.clone(),
1077 source: canonical.scope.source.clone(),
1078 labels,
1079 trust_level: canonical.scope.trust_level,
1080 },
1081 kind: mnemara_core::MemoryRecordKind::Summary,
1082 content: format!(
1083 "Compacted {} related records into a durable summary. Representative memory: {}",
1084 cluster_size, representative_summary
1085 ),
1086 summary: Some(format!(
1087 "{} related records: {}",
1088 cluster_size, representative_summary
1089 )),
1090 source_id: None,
1091 metadata,
1092 quality_state: if matches!(canonical.quality_state, MemoryQualityState::Verified) {
1093 MemoryQualityState::Verified
1094 } else {
1095 MemoryQualityState::Active
1096 },
1097 created_at_unix_ms: now_unix_ms,
1098 updated_at_unix_ms: now_unix_ms,
1099 expires_at_unix_ms: None,
1100 importance_score: max_importance_score,
1101 artifact: canonical.artifact.clone(),
1102 episode: canonical.episode.clone(),
1103 historical_state: MemoryHistoricalState::Current,
1104 lineage: group
1105 .iter()
1106 .map(|stored| LineageLink {
1107 record_id: stored.record.id.clone(),
1108 relation: LineageRelationKind::ConsolidatedFrom,
1109 confidence: 1.0,
1110 })
1111 .collect(),
1112 conflict: canonical.conflict.clone(),
1113 },
1114 idempotency_key: None,
1115 }
1116 }
1117
1118 fn cold_tiering_candidates(
1119 &self,
1120 tenant_id: &str,
1121 namespace: Option<&str>,
1122 now_unix_ms: u64,
1123 ) -> Result<Vec<StoredRecord>> {
1124 let cold_archive_after_days = self.config.engine_config.compaction.cold_archive_after_days;
1125 if cold_archive_after_days == 0 {
1126 return Ok(Vec::new());
1127 }
1128 let archive_threshold_ms =
1129 u64::from(cold_archive_after_days).saturating_mul(24 * 60 * 60 * 1_000);
1130 let max_importance = f32::from(
1131 self.config
1132 .engine_config
1133 .compaction
1134 .cold_archive_importance_threshold_per_mille,
1135 ) / 1000.0;
1136
1137 Ok(self
1138 .iterate_records()?
1139 .into_iter()
1140 .filter(|stored| stored.record.scope.tenant_id == tenant_id)
1141 .filter(|stored| namespace.is_none_or(|value| stored.record.scope.namespace == value))
1142 .filter(|stored| {
1143 matches!(
1144 stored.record.quality_state,
1145 MemoryQualityState::Draft
1146 | MemoryQualityState::Active
1147 | MemoryQualityState::Verified
1148 )
1149 })
1150 .filter(|stored| !Self::is_pinned(&stored.record))
1151 .filter(|stored| {
1152 now_unix_ms.saturating_sub(stored.record.updated_at_unix_ms) > archive_threshold_ms
1153 && stored.record.importance_score <= max_importance
1154 })
1155 .collect())
1156 }
1157
1158 fn persist_record(&self, stored: &StoredRecord) -> Result<()> {
1159 self.records
1160 .insert(stored.record.id.as_bytes(), Self::encode_record(stored)?)
1161 .map_err(|err| Error::Backend(format!("failed to write record: {err}")))?;
1162 Ok(())
1163 }
1164
1165 fn persist_imported_record(&self, stored: &StoredRecord) -> Result<()> {
1166 self.persist_record(stored)?;
1167 if let Some(idempotency_key) = &stored.idempotency_key {
1168 let scoped_key = Self::idempotency_scope_key(&stored.record.scope, idempotency_key);
1169 self.idempotency
1170 .insert(scoped_key.as_bytes(), stored.record.id.as_bytes())
1171 .map_err(|err| Error::Backend(format!("failed to write idempotency key: {err}")))?;
1172 }
1173 Ok(())
1174 }
1175
1176 fn retention_delete(&self, stored: StoredRecord) -> Result<()> {
1177 self.records
1178 .remove(stored.record.id.as_bytes())
1179 .map_err(|err| Error::Backend(format!("failed to delete expired record: {err}")))?;
1180 self.remove_idempotency_mapping(&stored)?;
1181 Ok(())
1182 }
1183
1184 fn clear_all_records(&self) -> Result<()> {
1185 for stored in self.iterate_records()? {
1186 self.records
1187 .remove(stored.record.id.as_bytes())
1188 .map_err(|err| Error::Backend(format!("failed to clear record: {err}")))?;
1189 self.remove_idempotency_mapping(&stored)?;
1190 }
1191 Ok(())
1192 }
1193
1194 fn retention_archive(&self, stored: &mut StoredRecord, now_unix_ms: u64) -> Result<bool> {
1195 if stored.record.quality_state == MemoryQualityState::Archived {
1196 return Ok(false);
1197 }
1198 stored.record.quality_state = MemoryQualityState::Archived;
1199 stored.record.historical_state = MemoryHistoricalState::Historical;
1200 stored.record.updated_at_unix_ms = now_unix_ms;
1201 self.persist_record(stored)?;
1202 Ok(true)
1203 }
1204
1205 fn apply_retention_for_namespace(
1206 &self,
1207 tenant_id: &str,
1208 namespace: &str,
1209 ) -> Result<(u64, u64)> {
1210 let now_unix_ms = Self::now_unix_ms()?;
1211 let retention = &self.config.engine_config.retention;
1212 let ttl_window_ms = u64::from(retention.ttl_days).saturating_mul(24 * 60 * 60 * 1_000);
1213 let archive_window_ms =
1214 u64::from(retention.archive_after_days).saturating_mul(24 * 60 * 60 * 1_000);
1215
1216 let mut archived_records = 0u64;
1217 let mut deleted_records = 0u64;
1218 let mut namespace_records = self
1219 .iterate_records()?
1220 .into_iter()
1221 .filter(|stored| {
1222 stored.record.scope.tenant_id == tenant_id
1223 && stored.record.scope.namespace == namespace
1224 })
1225 .collect::<Vec<_>>();
1226
1227 for stored in &mut namespace_records {
1228 if self.retention_exempt(&stored.record) {
1229 continue;
1230 }
1231
1232 let expired_by_explicit_deadline = stored
1233 .record
1234 .expires_at_unix_ms
1235 .is_some_and(|expires_at| expires_at <= now_unix_ms);
1236 let expired_by_ttl = ttl_window_ms > 0
1237 && now_unix_ms.saturating_sub(stored.record.created_at_unix_ms) > ttl_window_ms;
1238
1239 if expired_by_explicit_deadline || expired_by_ttl {
1240 if !matches!(stored.record.quality_state, MemoryQualityState::Deleted) {
1241 self.retention_delete(stored.clone())?;
1242 deleted_records += 1;
1243 }
1244 continue;
1245 }
1246
1247 let should_archive_by_age = archive_window_ms > 0
1248 && now_unix_ms.saturating_sub(stored.record.created_at_unix_ms) > archive_window_ms
1249 && matches!(
1250 stored.record.quality_state,
1251 MemoryQualityState::Draft
1252 | MemoryQualityState::Active
1253 | MemoryQualityState::Verified
1254 );
1255
1256 if should_archive_by_age && self.retention_archive(stored, now_unix_ms)? {
1257 archived_records += 1;
1258 }
1259 }
1260
1261 if retention.max_records_per_namespace > 0 {
1262 let mut candidates = self
1263 .iterate_records()?
1264 .into_iter()
1265 .filter(|stored| {
1266 stored.record.scope.tenant_id == tenant_id
1267 && stored.record.scope.namespace == namespace
1268 && !self.retention_exempt(&stored.record)
1269 && matches!(
1270 stored.record.quality_state,
1271 MemoryQualityState::Draft
1272 | MemoryQualityState::Active
1273 | MemoryQualityState::Verified
1274 )
1275 })
1276 .collect::<Vec<_>>();
1277
1278 if candidates.len() > retention.max_records_per_namespace {
1279 candidates.sort_by(|left, right| {
1280 left.record
1281 .updated_at_unix_ms
1282 .cmp(&right.record.updated_at_unix_ms)
1283 .then_with(|| {
1284 left.record
1285 .importance_score
1286 .total_cmp(&right.record.importance_score)
1287 })
1288 .then_with(|| left.record.id.cmp(&right.record.id))
1289 });
1290
1291 let archive_count = candidates.len() - retention.max_records_per_namespace;
1292 for stored in candidates.iter_mut().take(archive_count) {
1293 if self.retention_archive(stored, now_unix_ms)? {
1294 archived_records += 1;
1295 }
1296 }
1297 }
1298 }
1299
1300 Ok((archived_records, deleted_records))
1301 }
1302}
1303
1304#[async_trait]
1305impl MemoryStore for SledMemoryStore {
1306 fn backend_kind(&self) -> &'static str {
1307 "sled"
1308 }
1309
1310 async fn upsert(&self, request: UpsertRequest) -> Result<UpsertReceipt> {
1311 self.validate_upsert_request(&request)?;
1312
1313 if let Some(idempotency_key) = &request.idempotency_key {
1314 let scoped_key = Self::idempotency_scope_key(&request.record.scope, idempotency_key);
1315 if let Some(existing_record_id) = self
1316 .idempotency
1317 .get(scoped_key.as_bytes())
1318 .map_err(|err| Error::Backend(format!("failed to read idempotency key: {err}")))?
1319 {
1320 let existing_record_id =
1321 String::from_utf8(existing_record_id.to_vec()).map_err(|err| {
1322 Error::Backend(format!(
1323 "stored idempotency mapping was not valid utf-8: {err}"
1324 ))
1325 })?;
1326 if existing_record_id != request.record.id {
1327 return Err(Error::Conflict(format!(
1328 "idempotency key already belongs to record {}",
1329 existing_record_id
1330 )));
1331 }
1332 if self.fetch_record(&existing_record_id)?.is_some() {
1333 return Ok(UpsertReceipt {
1334 record_id: existing_record_id,
1335 deduplicated: true,
1336 summary_refreshed: false,
1337 });
1338 }
1339 self.idempotency
1340 .remove(scoped_key.as_bytes())
1341 .map_err(|err| {
1342 Error::Backend(format!("failed to clear stale idempotency key: {err}"))
1343 })?;
1344 }
1345 }
1346
1347 let key = request.record.id.clone();
1348 let tenant_id = request.record.scope.tenant_id.clone();
1349 let namespace = request.record.scope.namespace.clone();
1350 let existing = self.fetch_record(&key)?;
1351 let deduplicated = existing.is_some();
1352 let stored = StoredRecord {
1353 record: request.record,
1354 idempotency_key: request.idempotency_key,
1355 };
1356 if let Some(existing) = existing
1357 && existing.idempotency_key != stored.idempotency_key
1358 {
1359 self.remove_idempotency_mapping(&existing)?;
1360 }
1361 self.persist_record(&stored)?;
1362 if let Some(idempotency_key) = &stored.idempotency_key {
1363 let scoped_key = Self::idempotency_scope_key(&stored.record.scope, idempotency_key);
1364 self.idempotency
1365 .insert(scoped_key.as_bytes(), key.as_bytes())
1366 .map_err(|err| Error::Backend(format!("failed to write idempotency key: {err}")))?;
1367 }
1368 self.apply_retention_for_namespace(&tenant_id, &namespace)?;
1369 self.db
1370 .flush_async()
1371 .await
1372 .map_err(|err| Error::Backend(format!("failed to flush sled db: {err}")))?;
1373 Ok(UpsertReceipt {
1374 record_id: key,
1375 deduplicated,
1376 summary_refreshed: false,
1377 })
1378 }
1379
1380 async fn batch_upsert(&self, request: BatchUpsertRequest) -> Result<Vec<UpsertReceipt>> {
1381 if request.requests.len() > self.config.engine_config.max_batch_size {
1382 return Err(Error::InvalidRequest(format!(
1383 "batch size {} exceeds configured max_batch_size {}",
1384 request.requests.len(),
1385 self.config.engine_config.max_batch_size
1386 )));
1387 }
1388 for item in &request.requests {
1389 self.validate_upsert_request(item)?;
1390 }
1391 let mut receipts = Vec::with_capacity(request.requests.len());
1392 for item in request.requests {
1393 receipts.push(self.upsert(item).await?);
1394 }
1395 Ok(receipts)
1396 }
1397
1398 async fn recall(&self, query: RecallQuery) -> Result<RecallResult> {
1399 let empty_query = query.query_text.trim().is_empty();
1400 let planner = self.config.recall_planner();
1401 let scorer = planner.scorer();
1402 let planning_profile = planner.effective_profile(&query);
1403 let stored_records = self.iterate_records()?;
1404 let relative_bounds = Self::relative_temporal_bounds(&stored_records, &query)?;
1405 let records = stored_records
1406 .into_iter()
1407 .filter(|stored| Self::record_passes_filters(&stored.record, &query, relative_bounds))
1408 .map(|stored| stored.record)
1409 .collect::<Vec<_>>();
1410 let mut scored = planner.plan(&records, &query);
1411 match query.filters.temporal_order {
1412 RecallTemporalOrder::Relevance if empty_query => {
1413 scored.sort_by(|left, right| {
1414 Self::record_temporal_anchor(&right.hit.record)
1415 .cmp(&Self::record_temporal_anchor(&left.hit.record))
1416 .then_with(|| {
1417 right
1418 .hit
1419 .record
1420 .importance_score
1421 .total_cmp(&left.hit.record.importance_score)
1422 })
1423 .then_with(|| left.hit.record.id.cmp(&right.hit.record.id))
1424 });
1425 }
1426 RecallTemporalOrder::Relevance => {
1427 scored.sort_by(|left, right| {
1428 right
1429 .hit
1430 .breakdown
1431 .total
1432 .total_cmp(&left.hit.breakdown.total)
1433 .then_with(|| left.hit.record.id.cmp(&right.hit.record.id))
1434 });
1435 }
1436 RecallTemporalOrder::ChronologicalAsc => {
1437 scored.sort_by(|left, right| {
1438 Self::record_temporal_anchor(&left.hit.record)
1439 .cmp(&Self::record_temporal_anchor(&right.hit.record))
1440 .then_with(|| {
1441 right
1442 .hit
1443 .breakdown
1444 .total
1445 .total_cmp(&left.hit.breakdown.total)
1446 })
1447 .then_with(|| left.hit.record.id.cmp(&right.hit.record.id))
1448 });
1449 }
1450 RecallTemporalOrder::ChronologicalDesc => {
1451 scored.sort_by(|left, right| {
1452 Self::record_temporal_anchor(&right.hit.record)
1453 .cmp(&Self::record_temporal_anchor(&left.hit.record))
1454 .then_with(|| {
1455 right
1456 .hit
1457 .breakdown
1458 .total
1459 .total_cmp(&left.hit.breakdown.total)
1460 })
1461 .then_with(|| left.hit.record.id.cmp(&right.hit.record.id))
1462 });
1463 }
1464 }
1465
1466 let examined = scored.len();
1467 let mut selected_ids = Vec::with_capacity(query.max_items);
1468 let mut remaining_budget = query.token_budget.unwrap_or(usize::MAX);
1469 for candidate in &scored {
1470 if selected_ids.len() >= query.max_items {
1471 break;
1472 }
1473 let estimated_tokens = Self::approximate_tokens(&candidate.hit.record);
1474 if selected_ids.is_empty() || estimated_tokens <= remaining_budget {
1475 remaining_budget = remaining_budget.saturating_sub(estimated_tokens);
1476 selected_ids.push(candidate.hit.record.id.clone());
1477 }
1478 }
1479
1480 let trace_id = format!(
1481 "recall:{}:{}:{}",
1482 query.scope.tenant_id, query.scope.namespace, examined
1483 );
1484 let selected_set = selected_ids.iter().cloned().collect::<BTreeSet<_>>();
1485 let selected = scored
1486 .iter()
1487 .filter(|candidate| selected_set.contains(&candidate.hit.record.id))
1488 .map(|candidate| {
1489 let mut enriched = candidate.hit.clone();
1490 if query.include_explanation {
1491 let selected_channels =
1492 Self::selected_channels_for_hit(&candidate.hit, empty_query);
1493 enriched.explanation = Some(RecallExplanation {
1494 selected_channels,
1495 policy_notes: vec![if empty_query {
1496 "recent_scope_scan".to_string()
1497 } else {
1498 "initial_sled_backend_scoring".to_string()
1499 }],
1500 trace_id: Some(trace_id.clone()),
1501 planning_trace: None,
1502 planning_profile: Some(planning_profile),
1503 policy_profile: Some(scorer.policy_profile()),
1504 scorer_kind: Some(scorer.scorer_kind()),
1505 scoring_profile: Some(scorer.scoring_profile()),
1506 });
1507 if let Some(explanation) = enriched.explanation.as_mut() {
1508 explanation
1509 .policy_notes
1510 .push(scorer.profile_note().to_string());
1511 explanation
1512 .policy_notes
1513 .push(scorer.policy_profile_note().to_string());
1514 explanation
1515 .policy_notes
1516 .push(Self::planning_profile_note(planning_profile).to_string());
1517 if let Some(note) = scorer.embedding_note() {
1518 explanation.policy_notes.push(note.to_string());
1519 }
1520 if query.filters.episode_id.is_some() {
1521 explanation
1522 .policy_notes
1523 .push("episode_filter_applied".to_string());
1524 }
1525 if query.filters.unresolved_only {
1526 explanation
1527 .policy_notes
1528 .push("unresolved_only_filter_applied".to_string());
1529 }
1530 if !candidate.matched_terms.is_empty() {
1531 explanation.policy_notes.push(format!(
1532 "matched_terms={}",
1533 candidate.matched_terms.join(",")
1534 ));
1535 }
1536 }
1537 }
1538 enriched
1539 })
1540 .collect::<Vec<_>>();
1541
1542 let mut policy_notes = vec![if empty_query {
1543 "recent_scope_scan".to_string()
1544 } else {
1545 "initial_sled_backend_scoring".to_string()
1546 }];
1547 policy_notes.push(scorer.profile_note().to_string());
1548 policy_notes.push(scorer.policy_profile_note().to_string());
1549 policy_notes.push(Self::planning_profile_note(planning_profile).to_string());
1550 if let Some(note) = scorer.embedding_note() {
1551 policy_notes.push(note.to_string());
1552 }
1553 if query.token_budget.is_some() {
1554 policy_notes.push("token_budget_applied".to_string());
1555 }
1556 if query.filters.episode_id.is_some() {
1557 policy_notes.push("episode_filter_applied".to_string());
1558 }
1559 if query.filters.unresolved_only {
1560 policy_notes.push("unresolved_only_filter_applied".to_string());
1561 }
1562 if query.filters.before_record_id.is_some() || query.filters.after_record_id.is_some() {
1563 policy_notes.push("relative_temporal_filter_applied".to_string());
1564 }
1565 if !query.filters.boundary_labels.is_empty() || query.filters.recurrence_key.is_some() {
1566 policy_notes.push("episodic_boundary_filter_applied".to_string());
1567 }
1568 if !query.filters.conflict_states.is_empty()
1569 || !query.filters.resolution_kinds.is_empty()
1570 || query.filters.unresolved_conflicts_only
1571 {
1572 policy_notes.push("conflict_review_filter_applied".to_string());
1573 }
1574 let mut selected_channels = if empty_query {
1575 vec!["temporal".to_string(), "policy".to_string()]
1576 } else {
1577 vec!["lexical".to_string(), "policy".to_string()]
1578 };
1579 for channel in [
1580 "semantic", "metadata", "episodic", "salience", "curation", "conflict",
1581 ] {
1582 let present = scored.iter().any(|candidate| match channel {
1583 "semantic" => candidate.hit.breakdown.semantic > 0.0,
1584 "metadata" => candidate.hit.breakdown.metadata > 0.0,
1585 "episodic" => candidate.hit.breakdown.episodic > 0.0,
1586 "salience" => candidate.hit.breakdown.salience > 0.0,
1587 "curation" => candidate.hit.breakdown.curation > 0.0,
1588 "conflict" => candidate.hit.record.conflict.is_some(),
1589 _ => false,
1590 });
1591 if present && !selected_channels.iter().any(|existing| existing == channel) {
1592 selected_channels.push(channel.to_string());
1593 }
1594 }
1595
1596 Ok(RecallResult {
1597 hits: selected,
1598 total_candidates_examined: examined,
1599 explanation: query.include_explanation.then(|| RecallExplanation {
1600 selected_channels,
1601 policy_notes,
1602 trace_id: Some(trace_id.clone()),
1603 planning_profile: Some(planning_profile),
1604 policy_profile: Some(scorer.policy_profile()),
1605 planning_trace: Some(RecallPlanningTrace {
1606 trace_id,
1607 token_budget_applied: query.token_budget.is_some(),
1608 candidates: scored
1609 .into_iter()
1610 .enumerate()
1611 .map(|(index, candidate)| {
1612 let record_id = candidate.hit.record.id.clone();
1613 let selected = selected_set.contains(&record_id);
1614 let selection_rank = selected_ids
1615 .iter()
1616 .position(|candidate_id| candidate_id == &record_id)
1617 .map(|index| index as u32 + 1);
1618 let candidate_channels =
1619 Self::selected_channels_for_hit(&candidate.hit, empty_query);
1620 let mut filter_reasons = Vec::new();
1621 if selected {
1622 filter_reasons.push("retained".to_string());
1623 } else {
1624 if index >= query.max_items {
1625 filter_reasons.push("max_items_exhausted".to_string());
1626 }
1627 if query.token_budget.is_some() {
1628 filter_reasons.push("token_budget_exhausted".to_string());
1629 }
1630 }
1631 RecallTraceCandidate {
1632 record_id,
1633 kind: candidate.hit.record.kind,
1634 selected,
1635 planner_stage: candidate.planner_stage,
1636 candidate_sources: candidate.candidate_sources,
1637 selection_rank,
1638 matched_terms: candidate.matched_terms,
1639 selected_channels: candidate_channels,
1640 filter_reasons,
1641 decision_reason: if selected {
1642 "selected_by_rank".to_string()
1643 } else if query.token_budget.is_some() {
1644 "excluded_by_rank_or_budget".to_string()
1645 } else {
1646 "excluded_by_rank".to_string()
1647 },
1648 breakdown: candidate.hit.breakdown,
1649 }
1650 })
1651 .collect(),
1652 }),
1653 scorer_kind: Some(scorer.scorer_kind()),
1654 scoring_profile: Some(scorer.scoring_profile()),
1655 }),
1656 })
1657 }
1658
1659 async fn compact(&self, request: CompactionRequest) -> Result<CompactionReport> {
1660 if request.tenant_id.trim().is_empty() {
1661 return Err(Error::InvalidRequest(
1662 "compaction tenant_id is required".to_string(),
1663 ));
1664 }
1665
1666 let records = self.iterate_records()?;
1667 let mut groups: HashMap<String, Vec<StoredRecord>> = HashMap::new();
1668 for stored in records {
1669 if stored.record.scope.tenant_id != request.tenant_id {
1670 continue;
1671 }
1672 if let Some(namespace) = &request.namespace
1673 && stored.record.scope.namespace != *namespace
1674 {
1675 continue;
1676 }
1677 if matches!(
1678 stored.record.quality_state,
1679 MemoryQualityState::Archived
1680 | MemoryQualityState::Deleted
1681 | MemoryQualityState::Suppressed
1682 ) {
1683 continue;
1684 }
1685 groups
1686 .entry(Self::dedup_signature(&stored.record))
1687 .or_default()
1688 .push(stored);
1689 }
1690
1691 let mut deduplicated_records = 0u64;
1692 let mut archived_records = 0u64;
1693 let mut summarized_clusters = 0u64;
1694 let mut superseded_records = 0u64;
1695 let mut lineage_links_created = 0u64;
1696 let now_unix_ms = Self::now_unix_ms()?;
1697 for group in groups.values_mut() {
1698 if group.len() < 2 {
1699 continue;
1700 }
1701 group.sort_by(|left, right| {
1702 right
1703 .record
1704 .updated_at_unix_ms
1705 .cmp(&left.record.updated_at_unix_ms)
1706 .then_with(|| {
1707 right
1708 .record
1709 .importance_score
1710 .total_cmp(&left.record.importance_score)
1711 })
1712 .then_with(|| left.record.id.cmp(&right.record.id))
1713 });
1714
1715 let signature = Self::dedup_signature(&group[0].record);
1716 if self
1717 .config
1718 .engine_config
1719 .compaction
1720 .summarize_after_record_count
1721 > 0
1722 && group.len()
1723 >= self
1724 .config
1725 .engine_config
1726 .compaction
1727 .summarize_after_record_count
1728 {
1729 summarized_clusters += 1;
1730 lineage_links_created += group.len() as u64;
1731 if !request.dry_run {
1732 let summary = Self::compaction_summary_record(group, &signature, now_unix_ms);
1733 self.persist_record(&summary)?;
1734 }
1735 }
1736
1737 for duplicate in group.iter().skip(1) {
1738 deduplicated_records += 1;
1739 archived_records += 1;
1740 superseded_records += 1;
1741 if request.dry_run {
1742 continue;
1743 }
1744 let mut archived = duplicate.clone();
1745 archived.record.quality_state = MemoryQualityState::Archived;
1746 archived.record.historical_state = MemoryHistoricalState::Superseded;
1747 archived.record.lineage.push(LineageLink {
1748 record_id: Self::summary_record_id(&signature),
1749 relation: LineageRelationKind::SupersededBy,
1750 confidence: 1.0,
1751 });
1752 lineage_links_created += 1;
1753 archived.record.updated_at_unix_ms = Self::now_unix_ms()?;
1754 self.records
1755 .insert(
1756 archived.record.id.as_bytes(),
1757 Self::encode_record(&archived)?,
1758 )
1759 .map_err(|err| {
1760 Error::Backend(format!("failed to archive duplicate record: {err}"))
1761 })?;
1762 }
1763 }
1764
1765 for candidate in self.cold_tiering_candidates(
1766 &request.tenant_id,
1767 request.namespace.as_deref(),
1768 now_unix_ms,
1769 )? {
1770 archived_records += 1;
1771 if request.dry_run {
1772 continue;
1773 }
1774 let mut archived = candidate;
1775 archived.record.quality_state = MemoryQualityState::Archived;
1776 archived.record.historical_state = MemoryHistoricalState::Historical;
1777 archived.record.updated_at_unix_ms = now_unix_ms;
1778 self.persist_record(&archived)?;
1779 }
1780
1781 if !request.dry_run {
1782 self.db
1783 .flush_async()
1784 .await
1785 .map_err(|err| Error::Backend(format!("failed to flush sled db: {err}")))?;
1786 }
1787
1788 Ok(CompactionReport {
1789 deduplicated_records,
1790 archived_records,
1791 summarized_clusters,
1792 pruned_graph_edges: 0,
1793 superseded_records,
1794 lineage_links_created,
1795 dry_run: request.dry_run,
1796 })
1797 }
1798
1799 async fn delete(&self, request: DeleteRequest) -> Result<DeleteReceipt> {
1800 Self::validate_delete_request(&request)?;
1801
1802 let Some(stored) = self.fetch_record(&request.record_id)? else {
1803 return Ok(DeleteReceipt {
1804 record_id: request.record_id,
1805 tombstoned: false,
1806 hard_deleted: false,
1807 });
1808 };
1809
1810 if stored.record.scope.tenant_id != request.tenant_id {
1811 return Err(Error::InvalidRequest(format!(
1812 "record {} does not belong to tenant {}",
1813 request.record_id, request.tenant_id
1814 )));
1815 }
1816 if stored.record.scope.namespace != request.namespace {
1817 return Err(Error::InvalidRequest(format!(
1818 "record {} does not belong to namespace {}",
1819 request.record_id, request.namespace
1820 )));
1821 }
1822
1823 if request.hard_delete {
1824 self.records
1825 .remove(request.record_id.as_bytes())
1826 .map_err(|err| Error::Backend(format!("failed to delete record: {err}")))?;
1827 self.remove_idempotency_mapping(&stored)?;
1828 } else {
1829 let mut tombstone = stored;
1830 tombstone.record.quality_state = MemoryQualityState::Deleted;
1831 tombstone.record.updated_at_unix_ms = Self::now_unix_ms()?;
1832 self.records
1833 .insert(
1834 tombstone.record.id.as_bytes(),
1835 Self::encode_record(&tombstone)?,
1836 )
1837 .map_err(|err| Error::Backend(format!("failed to write tombstone: {err}")))?;
1838 }
1839
1840 self.db
1841 .flush_async()
1842 .await
1843 .map_err(|err| Error::Backend(format!("failed to flush sled db: {err}")))?;
1844 Ok(DeleteReceipt {
1845 record_id: request.record_id,
1846 tombstoned: !request.hard_delete,
1847 hard_deleted: request.hard_delete,
1848 })
1849 }
1850
1851 async fn archive(&self, request: ArchiveRequest) -> Result<ArchiveReceipt> {
1852 Self::validate_archive_request(&request)?;
1853
1854 let Some(mut stored) = self.fetch_record(&request.record_id)? else {
1855 return Err(Error::InvalidRequest(format!(
1856 "record {} was not found",
1857 request.record_id
1858 )));
1859 };
1860 Self::validate_record_scope(&stored, &request.tenant_id, &request.namespace)?;
1861
1862 let previous_quality_state = stored.record.quality_state;
1863 let previous_historical_state = stored.record.historical_state;
1864 let changed = previous_quality_state != MemoryQualityState::Archived
1865 || previous_historical_state == MemoryHistoricalState::Current;
1866 let historical_state = match previous_historical_state {
1867 MemoryHistoricalState::Current => MemoryHistoricalState::Historical,
1868 other => other,
1869 };
1870
1871 if changed && !request.dry_run {
1872 stored.record.quality_state = MemoryQualityState::Archived;
1873 stored.record.historical_state = historical_state;
1874 stored.record.updated_at_unix_ms = Self::now_unix_ms()?;
1875 self.records
1876 .insert(stored.record.id.as_bytes(), Self::encode_record(&stored)?)
1877 .map_err(|err| Error::Backend(format!("failed to archive record: {err}")))?;
1878 self.db
1879 .flush_async()
1880 .await
1881 .map_err(|err| Error::Backend(format!("failed to flush sled db: {err}")))?;
1882 }
1883
1884 Ok(ArchiveReceipt {
1885 record_id: request.record_id,
1886 previous_quality_state,
1887 previous_historical_state,
1888 quality_state: MemoryQualityState::Archived,
1889 historical_state,
1890 changed,
1891 dry_run: request.dry_run,
1892 })
1893 }
1894
1895 async fn suppress(&self, request: SuppressRequest) -> Result<SuppressReceipt> {
1896 Self::validate_suppress_request(&request)?;
1897
1898 let Some(mut stored) = self.fetch_record(&request.record_id)? else {
1899 return Err(Error::InvalidRequest(format!(
1900 "record {} was not found",
1901 request.record_id
1902 )));
1903 };
1904 Self::validate_record_scope(&stored, &request.tenant_id, &request.namespace)?;
1905
1906 let previous_quality_state = stored.record.quality_state;
1907 let previous_historical_state = stored.record.historical_state;
1908 let changed = previous_quality_state != MemoryQualityState::Suppressed;
1909
1910 if changed && !request.dry_run {
1911 stored.record.quality_state = MemoryQualityState::Suppressed;
1912 stored.record.updated_at_unix_ms = Self::now_unix_ms()?;
1913 self.records
1914 .insert(stored.record.id.as_bytes(), Self::encode_record(&stored)?)
1915 .map_err(|err| Error::Backend(format!("failed to suppress record: {err}")))?;
1916 self.db
1917 .flush_async()
1918 .await
1919 .map_err(|err| Error::Backend(format!("failed to flush sled db: {err}")))?;
1920 }
1921
1922 Ok(SuppressReceipt {
1923 record_id: request.record_id,
1924 previous_quality_state,
1925 previous_historical_state,
1926 quality_state: MemoryQualityState::Suppressed,
1927 historical_state: previous_historical_state,
1928 changed,
1929 dry_run: request.dry_run,
1930 })
1931 }
1932
1933 async fn recover(&self, request: RecoverRequest) -> Result<RecoverReceipt> {
1934 Self::validate_recover_request(&request)?;
1935
1936 let Some(mut stored) = self.fetch_record(&request.record_id)? else {
1937 return Err(Error::InvalidRequest(format!(
1938 "record {} was not found",
1939 request.record_id
1940 )));
1941 };
1942 Self::validate_record_scope(&stored, &request.tenant_id, &request.namespace)?;
1943
1944 let previous_quality_state = stored.record.quality_state;
1945 let previous_historical_state = stored.record.historical_state;
1946 let historical_state = request
1947 .historical_state
1948 .unwrap_or(MemoryHistoricalState::Current);
1949 let changed = previous_quality_state != request.quality_state
1950 || previous_historical_state != historical_state;
1951
1952 if changed && !request.dry_run {
1953 stored.record.quality_state = request.quality_state;
1954 stored.record.historical_state = historical_state;
1955 stored.record.updated_at_unix_ms = Self::now_unix_ms()?;
1956 self.records
1957 .insert(stored.record.id.as_bytes(), Self::encode_record(&stored)?)
1958 .map_err(|err| Error::Backend(format!("failed to recover record: {err}")))?;
1959 self.db
1960 .flush_async()
1961 .await
1962 .map_err(|err| Error::Backend(format!("failed to flush sled db: {err}")))?;
1963 }
1964
1965 Ok(RecoverReceipt {
1966 record_id: request.record_id,
1967 previous_quality_state,
1968 previous_historical_state,
1969 quality_state: request.quality_state,
1970 historical_state,
1971 changed,
1972 dry_run: request.dry_run,
1973 })
1974 }
1975
1976 async fn snapshot(&self) -> Result<SnapshotManifest> {
1977 let records = self.iterate_records()?;
1978 let namespaces = records
1979 .iter()
1980 .map(|stored| stored.record.scope.namespace.clone())
1981 .collect::<BTreeSet<_>>()
1982 .into_iter()
1983 .collect::<Vec<_>>();
1984 let created_at_unix_ms = Self::now_unix_ms()?;
1985 let storage_bytes = self
1986 .db
1987 .size_on_disk()
1988 .map_err(|err| Error::Backend(format!("failed to determine db size: {err}")))?;
1989
1990 Ok(SnapshotManifest {
1991 snapshot_id: format!("snapshot-{created_at_unix_ms}"),
1992 created_at_unix_ms,
1993 namespaces,
1994 record_count: records.len() as u64,
1995 storage_bytes,
1996 engine: self.config.engine_config.tuning_info(),
1997 })
1998 }
1999
2000 async fn stats(&self, request: StoreStatsRequest) -> Result<StoreStatsReport> {
2001 self.build_stats_report(&request)
2002 }
2003
2004 async fn integrity_check(
2005 &self,
2006 request: IntegrityCheckRequest,
2007 ) -> Result<IntegrityCheckReport> {
2008 let summary = self
2009 .build_integrity_summary(request.tenant_id.as_deref(), request.namespace.as_deref())?;
2010 Ok(IntegrityCheckReport {
2011 generated_at_unix_ms: Self::now_unix_ms()?,
2012 healthy: summary.stale_idempotency_keys == 0
2013 && summary.missing_idempotency_keys == 0
2014 && summary.duplicate_active_records == 0,
2015 scanned_records: summary.scanned_records,
2016 scanned_idempotency_keys: summary.scanned_idempotency_keys,
2017 stale_idempotency_keys: summary.stale_idempotency_keys,
2018 missing_idempotency_keys: summary.missing_idempotency_keys,
2019 duplicate_active_records: summary.duplicate_active_records,
2020 })
2021 }
2022
2023 async fn repair(&self, request: RepairRequest) -> Result<RepairReport> {
2024 if request.reason.trim().is_empty() {
2025 return Err(Error::InvalidRequest(
2026 "repair reason is required".to_string(),
2027 ));
2028 }
2029 if !request.remove_stale_idempotency_keys && !request.rebuild_missing_idempotency_keys {
2030 return Err(Error::InvalidRequest(
2031 "repair requires at least one enabled action".to_string(),
2032 ));
2033 }
2034
2035 let tenant_filter = request.tenant_id.as_deref();
2036 let namespace_filter = request.namespace.as_deref();
2037 let summary = self.build_integrity_summary(tenant_filter, namespace_filter)?;
2038 let records = self.iterate_records()?;
2039 let mappings = self.iterate_idempotency_mappings()?;
2040 let mut removed_stale_idempotency_keys = 0u64;
2041 let mut rebuilt_missing_idempotency_keys = 0u64;
2042
2043 if request.remove_stale_idempotency_keys {
2044 for mapping in &mappings {
2045 let Some((tenant_id, namespace, _, _, _, idempotency_key)) =
2046 Self::parse_scope_key(&mapping.scoped_key)
2047 else {
2048 continue;
2049 };
2050 if !Self::scope_matches_filters(
2051 &tenant_id,
2052 &namespace,
2053 tenant_filter,
2054 namespace_filter,
2055 ) {
2056 continue;
2057 }
2058 let stale = match self.fetch_record(&mapping.record_id)? {
2059 Some(stored) => {
2060 stored.record.scope.tenant_id != tenant_id
2061 || stored.record.scope.namespace != namespace
2062 || stored.idempotency_key.as_deref() != Some(idempotency_key.as_str())
2063 || Self::idempotency_scope_key(&stored.record.scope, &idempotency_key)
2064 != mapping.scoped_key
2065 }
2066 None => true,
2067 };
2068 if stale {
2069 removed_stale_idempotency_keys += 1;
2070 if !request.dry_run {
2071 self.idempotency
2072 .remove(mapping.scoped_key.as_bytes())
2073 .map_err(|err| {
2074 Error::Backend(format!(
2075 "failed to remove stale idempotency key: {err}"
2076 ))
2077 })?;
2078 }
2079 }
2080 }
2081 }
2082
2083 if request.rebuild_missing_idempotency_keys {
2084 let existing = self.iterate_idempotency_mappings()?;
2085 let existing_lookup = existing
2086 .into_iter()
2087 .map(|mapping| (mapping.scoped_key, mapping.record_id))
2088 .collect::<HashMap<_, _>>();
2089
2090 for stored in &records {
2091 if !Self::scope_matches_filters(
2092 &stored.record.scope.tenant_id,
2093 &stored.record.scope.namespace,
2094 tenant_filter,
2095 namespace_filter,
2096 ) {
2097 continue;
2098 }
2099 let Some(idempotency_key) = &stored.idempotency_key else {
2100 continue;
2101 };
2102 let scoped_key = Self::idempotency_scope_key(&stored.record.scope, idempotency_key);
2103 if existing_lookup.get(&scoped_key) == Some(&stored.record.id) {
2104 continue;
2105 }
2106 rebuilt_missing_idempotency_keys += 1;
2107 if !request.dry_run {
2108 self.idempotency
2109 .insert(scoped_key.as_bytes(), stored.record.id.as_bytes())
2110 .map_err(|err| {
2111 Error::Backend(format!("failed to rebuild idempotency key: {err}"))
2112 })?;
2113 }
2114 }
2115 }
2116
2117 if !request.dry_run {
2118 self.db
2119 .flush_async()
2120 .await
2121 .map_err(|err| Error::Backend(format!("failed to flush sled db: {err}")))?;
2122 }
2123
2124 let stale_after = if request.remove_stale_idempotency_keys {
2125 0
2126 } else {
2127 summary.stale_idempotency_keys
2128 };
2129 let missing_after = if request.rebuild_missing_idempotency_keys {
2130 0
2131 } else {
2132 summary.missing_idempotency_keys
2133 };
2134
2135 Ok(RepairReport {
2136 dry_run: request.dry_run,
2137 scanned_records: summary.scanned_records,
2138 scanned_idempotency_keys: summary.scanned_idempotency_keys,
2139 removed_stale_idempotency_keys,
2140 rebuilt_missing_idempotency_keys,
2141 healthy_after: stale_after == 0
2142 && missing_after == 0
2143 && summary.duplicate_active_records == 0,
2144 })
2145 }
2146
2147 async fn export(&self, request: ExportRequest) -> Result<PortableStorePackage> {
2148 let exported_at_unix_ms = Self::now_unix_ms()?;
2149 let mut namespaces = BTreeSet::new();
2150 let mut records = Vec::new();
2151 for stored in self.iterate_records()? {
2152 if request
2153 .tenant_id
2154 .as_deref()
2155 .is_some_and(|tenant_id| stored.record.scope.tenant_id != tenant_id)
2156 {
2157 continue;
2158 }
2159 if request
2160 .namespace
2161 .as_deref()
2162 .is_some_and(|namespace| stored.record.scope.namespace != namespace)
2163 {
2164 continue;
2165 }
2166 if !request.include_archived
2167 && stored.record.quality_state == MemoryQualityState::Archived
2168 {
2169 continue;
2170 }
2171 namespaces.insert(format!(
2172 "{}:{}",
2173 stored.record.scope.tenant_id, stored.record.scope.namespace
2174 ));
2175 records.push(PortableRecord {
2176 record: stored.record,
2177 idempotency_key: stored.idempotency_key,
2178 });
2179 }
2180
2181 let storage_bytes = records
2182 .iter()
2183 .map(|entry| {
2184 entry.record.content.len()
2185 + entry.record.summary.as_deref().map(str::len).unwrap_or(0)
2186 })
2187 .sum::<usize>() as u64;
2188
2189 Ok(PortableStorePackage {
2190 package_version: PORTABLE_PACKAGE_VERSION,
2191 exported_at_unix_ms,
2192 manifest: SnapshotManifest {
2193 snapshot_id: format!("portable-export-{exported_at_unix_ms}"),
2194 created_at_unix_ms: exported_at_unix_ms,
2195 namespaces: namespaces.into_iter().collect(),
2196 record_count: records.len() as u64,
2197 storage_bytes,
2198 engine: self.config.engine_config.tuning_info(),
2199 },
2200 records,
2201 })
2202 }
2203
2204 async fn import(&self, request: ImportRequest) -> Result<ImportReport> {
2205 let snapshot_id = request.package.manifest.snapshot_id.clone();
2206 let package_version = request.package.package_version;
2207 let (validated_records, compatible_package, failed_records, entries) =
2208 self.validate_import_request(&request);
2209 let apply_changes = compatible_package
2210 && failed_records.is_empty()
2211 && !request.dry_run
2212 && !matches!(request.mode, ImportMode::Validate);
2213 let mut imported_records = 0u64;
2214 let mut skipped_records = 0u64;
2215 if apply_changes && matches!(request.mode, ImportMode::Replace) {
2216 self.clear_all_records()?;
2217 }
2218
2219 for entry in entries {
2220 if matches!(request.mode, ImportMode::Merge)
2221 && self
2222 .records
2223 .contains_key(entry.record.id.as_bytes())
2224 .map_err(|err| {
2225 Error::Backend(format!("failed to check record presence: {err}"))
2226 })?
2227 {
2228 skipped_records += 1;
2229 continue;
2230 }
2231 if apply_changes {
2232 self.persist_imported_record(&StoredRecord {
2233 record: entry.record,
2234 idempotency_key: entry.idempotency_key,
2235 })?;
2236 }
2237 imported_records += 1;
2238 }
2239 if apply_changes {
2240 self.db
2241 .flush_async()
2242 .await
2243 .map_err(|err| Error::Backend(format!("failed to flush sled db: {err}")))?;
2244 }
2245
2246 Ok(ImportReport {
2247 mode: request.mode,
2248 dry_run: request.dry_run,
2249 applied: apply_changes,
2250 compatible_package,
2251 package_version,
2252 validated_records,
2253 imported_records,
2254 skipped_records,
2255 replaced_existing: matches!(request.mode, ImportMode::Replace),
2256 snapshot_id,
2257 failed_records,
2258 })
2259 }
2260}