Skip to main content

mnemara_store_sled/
lib.rs

1use async_trait::async_trait;
2use mnemara_core::{
3    ArchiveReceipt, ArchiveRequest, BatchUpsertRequest, CompactionReport, CompactionRequest,
4    DeleteReceipt, DeleteRequest, EngineConfig, Error, ExportRequest, ImportFailure, ImportMode,
5    ImportReport, ImportRequest, IntegrityCheckReport, IntegrityCheckRequest, LineageLink,
6    LineageRelationKind, MaintenanceStats, MemoryHistoricalState, MemoryQualityState, MemoryRecord,
7    MemoryScope, MemoryStore, MemoryTrustLevel, NamespaceStats, PortableRecord,
8    PortableStorePackage, RecallExplanation, RecallHistoricalMode, RecallHit, RecallPlanner,
9    RecallPlanningProfile, RecallPlanningTrace, RecallQuery, RecallResult, RecallScorer,
10    RecallTemporalOrder, RecallTraceCandidate, RecoverReceipt, RecoverRequest, RepairReport,
11    RepairRequest, Result, SemanticEmbedder, SnapshotManifest, StoreStatsReport, StoreStatsRequest,
12    SuppressReceipt, SuppressRequest, UpsertReceipt, UpsertRequest,
13};
14use serde::{Deserialize, Serialize};
15use sled::{Db, Tree};
16use std::collections::{BTreeMap, BTreeSet, HashMap};
17use std::fmt;
18use std::hash::{Hash, Hasher};
19use std::path::{Path, PathBuf};
20use std::sync::Arc;
21use std::time::{SystemTime, UNIX_EPOCH};
22
23const RECORDS_TREE: &str = "records";
24const IDEMPOTENCY_TREE: &str = "idempotency";
25
26#[derive(Clone)]
27pub struct SledStoreConfig {
28    pub data_dir: PathBuf,
29    pub engine_config: EngineConfig,
30    pub shared_embedder: Option<SharedEmbedderConfig>,
31}
32
33#[derive(Clone)]
34pub struct SharedEmbedderConfig {
35    pub embedder: Arc<dyn SemanticEmbedder>,
36    pub provider_note: String,
37}
38
39impl SharedEmbedderConfig {
40    fn new(embedder: Arc<dyn SemanticEmbedder>, provider_note: impl Into<String>) -> Self {
41        Self {
42            embedder,
43            provider_note: provider_note.into(),
44        }
45    }
46}
47
48impl fmt::Debug for SharedEmbedderConfig {
49    fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result {
50        formatter
51            .debug_struct("SharedEmbedderConfig")
52            .field("provider_note", &self.provider_note)
53            .finish()
54    }
55}
56
57impl fmt::Debug for SledStoreConfig {
58    fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result {
59        formatter
60            .debug_struct("SledStoreConfig")
61            .field("data_dir", &self.data_dir)
62            .field("engine_config", &self.engine_config)
63            .field("shared_embedder", &self.shared_embedder)
64            .finish()
65    }
66}
67
68impl SledStoreConfig {
69    pub fn new(data_dir: impl AsRef<Path>) -> Self {
70        Self {
71            data_dir: data_dir.as_ref().to_path_buf(),
72            engine_config: EngineConfig::default(),
73            shared_embedder: None,
74        }
75    }
76
77    pub fn with_engine_config(mut self, engine_config: EngineConfig) -> Self {
78        self.engine_config = engine_config;
79        self
80    }
81
82    pub fn with_shared_embedder(
83        mut self,
84        embedder: Arc<dyn SemanticEmbedder>,
85        provider_note: impl Into<String>,
86    ) -> Self {
87        self.shared_embedder = Some(SharedEmbedderConfig::new(embedder, provider_note));
88        self
89    }
90
91    fn recall_planner(&self) -> RecallPlanner {
92        if let Some(shared_embedder) = &self.shared_embedder {
93            RecallPlanner::with_shared_embedder(
94                self.engine_config.recall_planning_profile,
95                self.engine_config.graph_expansion_max_hops,
96                self.engine_config.recall_scorer_kind,
97                self.engine_config.recall_scoring_profile,
98                self.engine_config.recall_policy_profile,
99                Arc::clone(&shared_embedder.embedder),
100                shared_embedder.provider_note.clone(),
101            )
102        } else {
103            RecallPlanner::from_engine_config(&self.engine_config)
104        }
105    }
106}
107
108#[derive(Debug)]
109pub struct SledMemoryStore {
110    db: Db,
111    records: Tree,
112    idempotency: Tree,
113    config: SledStoreConfig,
114}
115
116#[derive(Debug, Clone, Serialize, Deserialize)]
117struct StoredRecord {
118    record: MemoryRecord,
119    idempotency_key: Option<String>,
120}
121
122#[derive(Debug, Clone)]
123struct IdempotencyMapping {
124    scoped_key: String,
125    record_id: String,
126}
127
128type ScopeKeyParts = (
129    String,
130    String,
131    String,
132    Option<String>,
133    Option<String>,
134    String,
135);
136
137#[derive(Debug, Clone, Copy, Default)]
138struct IntegritySummary {
139    scanned_records: u64,
140    scanned_idempotency_keys: u64,
141    stale_idempotency_keys: u64,
142    missing_idempotency_keys: u64,
143    duplicate_active_records: u64,
144}
145
146#[derive(Debug, Clone, Copy, Default)]
147struct RelativeTemporalBounds {
148    after_unix_ms: Option<u64>,
149    before_unix_ms: Option<u64>,
150}
151
152const PORTABLE_PACKAGE_VERSION: u32 = 1;
153
154impl SledMemoryStore {
155    pub fn open(config: SledStoreConfig) -> Result<Self> {
156        std::fs::create_dir_all(&config.data_dir).map_err(|err| {
157            Error::Backend(format!(
158                "failed to create sled store dir {}: {err}",
159                config.data_dir.display()
160            ))
161        })?;
162        let db = sled::open(&config.data_dir).map_err(|err| {
163            Error::Backend(format!(
164                "failed to open sled db {}: {err}",
165                config.data_dir.display()
166            ))
167        })?;
168        let records = db
169            .open_tree(RECORDS_TREE)
170            .map_err(|err| Error::Backend(format!("failed to open records tree: {err}")))?;
171        let idempotency = db
172            .open_tree(IDEMPOTENCY_TREE)
173            .map_err(|err| Error::Backend(format!("failed to open idempotency tree: {err}")))?;
174        Ok(Self {
175            db,
176            records,
177            idempotency,
178            config,
179        })
180    }
181
182    fn validate_record(record: &MemoryRecord) -> Result<()> {
183        record.validate()
184    }
185
186    fn validate_upsert_request(&self, request: &UpsertRequest) -> Result<()> {
187        Self::validate_record(&request.record)?;
188        if request.idempotency_key.is_none()
189            && self
190                .config
191                .engine_config
192                .ingestion
193                .idempotent_writes_required
194        {
195            return Err(Error::InvalidRequest(
196                "idempotency_key is required by the current ingestion policy".to_string(),
197            ));
198        }
199        if self.config.engine_config.ingestion.require_source_labels
200            && request.record.scope.labels.is_empty()
201        {
202            return Err(Error::InvalidRequest(
203                "at least one source label is required by the current ingestion policy".to_string(),
204            ));
205        }
206        Ok(())
207    }
208
209    fn is_pinned(record: &MemoryRecord) -> bool {
210        record.scope.trust_level == MemoryTrustLevel::Pinned
211    }
212
213    fn retention_exempt(&self, record: &MemoryRecord) -> bool {
214        self.config.engine_config.retention.pinned_records_exempt && Self::is_pinned(record)
215    }
216
217    fn validate_delete_request(request: &DeleteRequest) -> Result<()> {
218        if request.tenant_id.trim().is_empty() {
219            return Err(Error::InvalidRequest(
220                "delete tenant_id is required".to_string(),
221            ));
222        }
223        if request.namespace.trim().is_empty() {
224            return Err(Error::InvalidRequest(
225                "delete namespace is required".to_string(),
226            ));
227        }
228        if request.record_id.trim().is_empty() {
229            return Err(Error::InvalidRequest(
230                "delete record_id is required".to_string(),
231            ));
232        }
233        if request.audit_reason.trim().is_empty() {
234            return Err(Error::InvalidRequest(
235                "delete audit_reason is required".to_string(),
236            ));
237        }
238        Ok(())
239    }
240
241    fn validate_archive_request(request: &ArchiveRequest) -> Result<()> {
242        Self::validate_lifecycle_request(
243            "archive",
244            &request.tenant_id,
245            &request.namespace,
246            &request.record_id,
247            &request.audit_reason,
248        )
249    }
250
251    fn validate_suppress_request(request: &SuppressRequest) -> Result<()> {
252        Self::validate_lifecycle_request(
253            "suppress",
254            &request.tenant_id,
255            &request.namespace,
256            &request.record_id,
257            &request.audit_reason,
258        )
259    }
260
261    fn validate_recover_request(request: &RecoverRequest) -> Result<()> {
262        Self::validate_lifecycle_request(
263            "recover",
264            &request.tenant_id,
265            &request.namespace,
266            &request.record_id,
267            &request.audit_reason,
268        )?;
269        match request.quality_state {
270            MemoryQualityState::Active | MemoryQualityState::Verified => {}
271            _ => {
272                return Err(Error::InvalidRequest(
273                    "recover quality_state must be Active or Verified".to_string(),
274                ));
275            }
276        }
277        if matches!(
278            request.historical_state,
279            Some(MemoryHistoricalState::Superseded)
280        ) {
281            return Err(Error::InvalidRequest(
282                "recover historical_state cannot be Superseded".to_string(),
283            ));
284        }
285        Ok(())
286    }
287
288    fn validate_lifecycle_request(
289        action: &str,
290        tenant_id: &str,
291        namespace: &str,
292        record_id: &str,
293        audit_reason: &str,
294    ) -> Result<()> {
295        if tenant_id.trim().is_empty() {
296            return Err(Error::InvalidRequest(format!(
297                "{action} tenant_id is required"
298            )));
299        }
300        if namespace.trim().is_empty() {
301            return Err(Error::InvalidRequest(format!(
302                "{action} namespace is required"
303            )));
304        }
305        if record_id.trim().is_empty() {
306            return Err(Error::InvalidRequest(format!(
307                "{action} record_id is required"
308            )));
309        }
310        if audit_reason.trim().is_empty() {
311            return Err(Error::InvalidRequest(format!(
312                "{action} audit_reason is required"
313            )));
314        }
315        Ok(())
316    }
317
318    fn validate_record_scope(
319        stored: &StoredRecord,
320        tenant_id: &str,
321        namespace: &str,
322    ) -> Result<()> {
323        if stored.record.scope.tenant_id != tenant_id {
324            return Err(Error::InvalidRequest(format!(
325                "record {} does not belong to tenant {}",
326                stored.record.id, tenant_id
327            )));
328        }
329        if stored.record.scope.namespace != namespace {
330            return Err(Error::InvalidRequest(format!(
331                "record {} does not belong to namespace {}",
332                stored.record.id, namespace
333            )));
334        }
335        Ok(())
336    }
337
338    fn validate_import_request(
339        &self,
340        request: &ImportRequest,
341    ) -> (u64, bool, Vec<ImportFailure>, Vec<PortableRecord>) {
342        let mut validated_records = 0u64;
343        let mut failures = Vec::new();
344        let mut entries = Vec::with_capacity(request.package.records.len());
345
346        if request.package.package_version != PORTABLE_PACKAGE_VERSION {
347            failures.push(ImportFailure {
348                record_id: None,
349                reason: format!(
350                    "unsupported portable package version {}; expected {}",
351                    request.package.package_version, PORTABLE_PACKAGE_VERSION
352                ),
353            });
354        }
355
356        if request.package.manifest.record_count != request.package.records.len() as u64 {
357            failures.push(ImportFailure {
358                record_id: None,
359                reason: format!(
360                    "portable package manifest record_count={} does not match payload records={}",
361                    request.package.manifest.record_count,
362                    request.package.records.len()
363                ),
364            });
365        }
366
367        for entry in &request.package.records {
368            match Self::validate_record(&entry.record) {
369                Ok(()) => {
370                    validated_records += 1;
371                    entries.push(entry.clone());
372                }
373                Err(error) => failures.push(ImportFailure {
374                    record_id: Some(entry.record.id.clone()),
375                    reason: error.to_string(),
376                }),
377            }
378        }
379
380        (validated_records, failures.is_empty(), failures, entries)
381    }
382
383    fn now_unix_ms() -> Result<u64> {
384        SystemTime::now()
385            .duration_since(UNIX_EPOCH)
386            .map_err(|err| Error::Backend(format!("system clock error: {err}")))
387            .map(|value| value.as_millis() as u64)
388    }
389
390    fn encode_record(stored: &StoredRecord) -> Result<Vec<u8>> {
391        serde_json::to_vec(stored)
392            .map_err(|err| Error::Backend(format!("failed to encode record: {err}")))
393    }
394
395    fn decode_record(value: &[u8]) -> Result<StoredRecord> {
396        serde_json::from_slice::<StoredRecord>(value)
397            .map_err(|err| Error::Backend(format!("failed to decode stored record: {err}")))
398    }
399
400    fn idempotency_scope_key(scope: &MemoryScope, key: &str) -> String {
401        format!(
402            "{}\u{1f}{}\u{1f}{}\u{1f}{}\u{1f}{}\u{1f}{}",
403            scope.tenant_id,
404            scope.namespace,
405            scope.actor_id,
406            scope.conversation_id.as_deref().unwrap_or(""),
407            scope.session_id.as_deref().unwrap_or(""),
408            key
409        )
410    }
411
412    fn fetch_record(&self, record_id: &str) -> Result<Option<StoredRecord>> {
413        self.records
414            .get(record_id.as_bytes())
415            .map_err(|err| Error::Backend(format!("failed to read record: {err}")))?
416            .map(|value| Self::decode_record(&value))
417            .transpose()
418    }
419
420    fn remove_idempotency_mapping(&self, record: &StoredRecord) -> Result<()> {
421        if let Some(idempotency_key) = &record.idempotency_key {
422            let scoped_key = Self::idempotency_scope_key(&record.record.scope, idempotency_key);
423            self.idempotency
424                .remove(scoped_key.as_bytes())
425                .map_err(|err| {
426                    Error::Backend(format!("failed to remove idempotency key: {err}"))
427                })?;
428        }
429        Ok(())
430    }
431
432    fn matches_scope(candidate: &MemoryScope, query: &MemoryScope) -> bool {
433        candidate.tenant_id == query.tenant_id
434            && candidate.namespace == query.namespace
435            && candidate.actor_id == query.actor_id
436            && (query.conversation_id.is_none()
437                || candidate.conversation_id == query.conversation_id)
438            && (query.session_id.is_none() || candidate.session_id == query.session_id)
439    }
440
441    fn record_passes_filters(
442        record: &MemoryRecord,
443        query: &RecallQuery,
444        relative_bounds: RelativeTemporalBounds,
445    ) -> bool {
446        if !Self::matches_scope(&record.scope, &query.scope) {
447            return false;
448        }
449
450        if let Some(expires_at_unix_ms) = record.expires_at_unix_ms
451            && expires_at_unix_ms <= Self::now_unix_ms().unwrap_or(u64::MAX)
452        {
453            return false;
454        }
455
456        if let Some(min_score) = query.filters.min_importance_score
457            && record.importance_score < min_score
458        {
459            return false;
460        }
461
462        if let Some(source) = &query.filters.source
463            && &record.scope.source != source
464        {
465            return false;
466        }
467
468        if let Some(from_unix_ms) = query.filters.from_unix_ms
469            && record.updated_at_unix_ms < from_unix_ms
470        {
471            return false;
472        }
473
474        if let Some(to_unix_ms) = query.filters.to_unix_ms
475            && record.updated_at_unix_ms > to_unix_ms
476        {
477            return false;
478        }
479
480        if let Some(after_unix_ms) = relative_bounds.after_unix_ms
481            && Self::record_temporal_anchor(record) <= after_unix_ms
482        {
483            return false;
484        }
485
486        if let Some(before_unix_ms) = relative_bounds.before_unix_ms
487            && Self::record_temporal_anchor(record) >= before_unix_ms
488        {
489            return false;
490        }
491
492        if !query.filters.trust_levels.is_empty()
493            && !query
494                .filters
495                .trust_levels
496                .contains(&record.scope.trust_level)
497        {
498            return false;
499        }
500
501        if !query.filters.required_labels.is_empty()
502            && !query.filters.required_labels.iter().all(|label| {
503                record
504                    .scope
505                    .labels
506                    .iter()
507                    .any(|candidate| candidate == label)
508            })
509        {
510            return false;
511        }
512
513        if !query.filters.kinds.is_empty() && !query.filters.kinds.contains(&record.kind) {
514            return false;
515        }
516
517        if let Some(episode_id) = &query.filters.episode_id
518            && record.episode.as_ref().map(|episode| &episode.episode_id) != Some(episode_id)
519        {
520            return false;
521        }
522
523        if !query.filters.continuity_states.is_empty()
524            && !record.episode.as_ref().is_some_and(|episode| {
525                query
526                    .filters
527                    .continuity_states
528                    .contains(&episode.continuity_state)
529            })
530        {
531            return false;
532        }
533
534        if query.filters.unresolved_only
535            && !record
536                .episode
537                .as_ref()
538                .is_some_and(|episode| episode.continuity_state.is_unresolved())
539        {
540            return false;
541        }
542
543        if let Some(lineage_record_id) = &query.filters.lineage_record_id
544            && record.id != *lineage_record_id
545            && !record
546                .lineage
547                .iter()
548                .any(|link| &link.record_id == lineage_record_id)
549        {
550            return false;
551        }
552
553        if !query.filters.boundary_labels.is_empty()
554            && !record.episode.as_ref().is_some_and(|episode| {
555                episode.boundary_label.as_ref().is_some_and(|label| {
556                    query
557                        .filters
558                        .boundary_labels
559                        .iter()
560                        .any(|expected| expected == label)
561                })
562            })
563        {
564            return false;
565        }
566
567        if let Some(recurrence_key) = &query.filters.recurrence_key
568            && record
569                .episode
570                .as_ref()
571                .and_then(|episode| episode.recurrence_key.as_ref())
572                != Some(recurrence_key)
573        {
574            return false;
575        }
576
577        if !query.filters.conflict_states.is_empty()
578            && !record
579                .conflict
580                .as_ref()
581                .is_some_and(|conflict| query.filters.conflict_states.contains(&conflict.state))
582        {
583            return false;
584        }
585
586        if !query.filters.resolution_kinds.is_empty()
587            && !record.conflict.as_ref().is_some_and(|conflict| {
588                query
589                    .filters
590                    .resolution_kinds
591                    .contains(&conflict.resolution)
592            })
593        {
594            return false;
595        }
596
597        if query.filters.unresolved_conflicts_only
598            && !record.conflict.as_ref().is_some_and(|conflict| {
599                matches!(
600                    conflict.state,
601                    mnemara_core::ConflictReviewState::PotentialConflict
602                        | mnemara_core::ConflictReviewState::UnderReview
603                )
604            })
605        {
606            return false;
607        }
608
609        match query.filters.historical_mode {
610            RecallHistoricalMode::CurrentOnly => {
611                if !matches!(record.historical_state, MemoryHistoricalState::Current) {
612                    return false;
613                }
614            }
615            RecallHistoricalMode::HistoricalOnly => {
616                if matches!(record.historical_state, MemoryHistoricalState::Current) {
617                    return false;
618                }
619            }
620            RecallHistoricalMode::IncludeHistorical => {}
621        }
622
623        if !query.filters.states.is_empty() {
624            if !query.filters.states.contains(&record.quality_state) {
625                return false;
626            }
627        } else {
628            match record.quality_state {
629                MemoryQualityState::Archived if !query.filters.include_archived => return false,
630                MemoryQualityState::Deleted | MemoryQualityState::Suppressed => return false,
631                _ => {}
632            }
633        }
634
635        true
636    }
637
638    fn relative_temporal_bounds(
639        records: &[StoredRecord],
640        query: &RecallQuery,
641    ) -> Result<RelativeTemporalBounds> {
642        let mut bounds = RelativeTemporalBounds::default();
643        if let Some(after_record_id) = &query.filters.after_record_id {
644            let Some(anchor) = records
645                .iter()
646                .find(|stored| {
647                    stored.record.id == *after_record_id
648                        && Self::matches_scope(&stored.record.scope, &query.scope)
649                })
650                .map(|stored| Self::record_temporal_anchor(&stored.record))
651            else {
652                return Err(Error::InvalidRequest(format!(
653                    "after_record_id '{after_record_id}' was not found in recall scope"
654                )));
655            };
656            bounds.after_unix_ms = Some(anchor);
657        }
658        if let Some(before_record_id) = &query.filters.before_record_id {
659            let Some(anchor) = records
660                .iter()
661                .find(|stored| {
662                    stored.record.id == *before_record_id
663                        && Self::matches_scope(&stored.record.scope, &query.scope)
664                })
665                .map(|stored| Self::record_temporal_anchor(&stored.record))
666            else {
667                return Err(Error::InvalidRequest(format!(
668                    "before_record_id '{before_record_id}' was not found in recall scope"
669                )));
670            };
671            bounds.before_unix_ms = Some(anchor);
672        }
673        if let (Some(after), Some(before)) = (bounds.after_unix_ms, bounds.before_unix_ms)
674            && after >= before
675        {
676            return Err(Error::InvalidRequest(
677                "after_record_id must refer to an earlier record than before_record_id".to_string(),
678            ));
679        }
680        Ok(bounds)
681    }
682
683    fn iterate_records(&self) -> Result<Vec<StoredRecord>> {
684        let mut records = Vec::new();
685        for item in self.records.iter() {
686            let (_, value) =
687                item.map_err(|err| Error::Backend(format!("sled iteration failed: {err}")))?;
688            records.push(Self::decode_record(&value)?);
689        }
690        Ok(records)
691    }
692
693    fn iterate_idempotency_mappings(&self) -> Result<Vec<IdempotencyMapping>> {
694        let mut mappings = Vec::new();
695        for item in self.idempotency.iter() {
696            let (key, value) =
697                item.map_err(|err| Error::Backend(format!("sled iteration failed: {err}")))?;
698            let scoped_key = String::from_utf8(key.to_vec()).map_err(|err| {
699                Error::Backend(format!("invalid idempotency key encoding: {err}"))
700            })?;
701            let record_id = String::from_utf8(value.to_vec()).map_err(|err| {
702                Error::Backend(format!("invalid idempotency value encoding: {err}"))
703            })?;
704            mappings.push(IdempotencyMapping {
705                scoped_key,
706                record_id,
707            });
708        }
709        Ok(mappings)
710    }
711
712    fn parse_scope_key(scoped_key: &str) -> Option<ScopeKeyParts> {
713        let parts = scoped_key.split('\u{1f}').collect::<Vec<_>>();
714        if parts.len() != 6 {
715            return None;
716        }
717        Some((
718            parts[0].to_string(),
719            parts[1].to_string(),
720            parts[2].to_string(),
721            (!parts[3].is_empty()).then(|| parts[3].to_string()),
722            (!parts[4].is_empty()).then(|| parts[4].to_string()),
723            parts[5].to_string(),
724        ))
725    }
726
727    fn scope_matches_filters(
728        tenant_id: &str,
729        namespace: &str,
730        tenant_filter: Option<&str>,
731        namespace_filter: Option<&str>,
732    ) -> bool {
733        tenant_filter.is_none_or(|expected| tenant_id == expected)
734            && namespace_filter.is_none_or(|expected| namespace == expected)
735    }
736
737    fn build_integrity_summary(
738        &self,
739        tenant_filter: Option<&str>,
740        namespace_filter: Option<&str>,
741    ) -> Result<IntegritySummary> {
742        let records = self.iterate_records()?;
743        let mappings = self.iterate_idempotency_mappings()?;
744        let now_unix_ms = Self::now_unix_ms()?;
745
746        let filtered_records = records
747            .iter()
748            .filter(|stored| {
749                Self::scope_matches_filters(
750                    &stored.record.scope.tenant_id,
751                    &stored.record.scope.namespace,
752                    tenant_filter,
753                    namespace_filter,
754                )
755            })
756            .collect::<Vec<_>>();
757
758        let mut mapping_lookup = HashMap::new();
759        let mut stale_idempotency_keys = 0u64;
760        let mut scanned_idempotency_keys = 0u64;
761
762        for mapping in &mappings {
763            let Some((tenant_id, namespace, _, _, _, idempotency_key)) =
764                Self::parse_scope_key(&mapping.scoped_key)
765            else {
766                stale_idempotency_keys += 1;
767                scanned_idempotency_keys += 1;
768                continue;
769            };
770            if !Self::scope_matches_filters(&tenant_id, &namespace, tenant_filter, namespace_filter)
771            {
772                continue;
773            }
774            scanned_idempotency_keys += 1;
775            let Some(stored) = self.fetch_record(&mapping.record_id)? else {
776                stale_idempotency_keys += 1;
777                continue;
778            };
779            if stored.record.scope.tenant_id != tenant_id
780                || stored.record.scope.namespace != namespace
781                || stored.idempotency_key.as_deref() != Some(idempotency_key.as_str())
782                || Self::idempotency_scope_key(&stored.record.scope, &idempotency_key)
783                    != mapping.scoped_key
784            {
785                stale_idempotency_keys += 1;
786                continue;
787            }
788            mapping_lookup.insert(mapping.scoped_key.clone(), mapping.record_id.clone());
789        }
790
791        let mut duplicate_groups = HashMap::<String, usize>::new();
792        let mut missing_idempotency_keys = 0u64;
793        let mut duplicate_active_records = 0u64;
794
795        for stored in &filtered_records {
796            if let Some(idempotency_key) = &stored.idempotency_key {
797                let scoped_key = Self::idempotency_scope_key(&stored.record.scope, idempotency_key);
798                if mapping_lookup.get(&scoped_key) != Some(&stored.record.id) {
799                    missing_idempotency_keys += 1;
800                }
801            }
802
803            if stored
804                .record
805                .expires_at_unix_ms
806                .is_some_and(|value| value <= now_unix_ms)
807            {
808                // Counted in stats, not integrity.
809            }
810
811            if !matches!(
812                stored.record.quality_state,
813                MemoryQualityState::Archived
814                    | MemoryQualityState::Deleted
815                    | MemoryQualityState::Suppressed
816            ) {
817                *duplicate_groups
818                    .entry(Self::dedup_signature(&stored.record))
819                    .or_default() += 1;
820            }
821        }
822
823        for group_size in duplicate_groups.into_values() {
824            if group_size > 1 {
825                duplicate_active_records += (group_size - 1) as u64;
826            }
827        }
828
829        Ok(IntegritySummary {
830            scanned_records: filtered_records.len() as u64,
831            scanned_idempotency_keys,
832            stale_idempotency_keys,
833            missing_idempotency_keys,
834            duplicate_active_records,
835        })
836    }
837
838    fn build_stats_report(&self, request: &StoreStatsRequest) -> Result<StoreStatsReport> {
839        let records = self.iterate_records()?;
840        let tenant_filter = request.tenant_id.as_deref();
841        let namespace_filter = request.namespace.as_deref();
842        let filtered_records = records
843            .iter()
844            .filter(|stored| {
845                Self::scope_matches_filters(
846                    &stored.record.scope.tenant_id,
847                    &stored.record.scope.namespace,
848                    tenant_filter,
849                    namespace_filter,
850                )
851            })
852            .collect::<Vec<_>>();
853        let now_unix_ms = Self::now_unix_ms()?;
854        let integrity = self.build_integrity_summary(tenant_filter, namespace_filter)?;
855        let mut namespace_map = BTreeMap::<(String, String), NamespaceStats>::new();
856        let mut duplicate_groups = HashMap::<String, usize>::new();
857        let mut tombstoned_records = 0u64;
858        let mut expired_records = 0u64;
859
860        for stored in &filtered_records {
861            let key = (
862                stored.record.scope.tenant_id.clone(),
863                stored.record.scope.namespace.clone(),
864            );
865            let entry = namespace_map
866                .entry(key.clone())
867                .or_insert_with(|| NamespaceStats {
868                    tenant_id: key.0.clone(),
869                    namespace: key.1.clone(),
870                    active_records: 0,
871                    archived_records: 0,
872                    deleted_records: 0,
873                    suppressed_records: 0,
874                    pinned_records: 0,
875                });
876            match stored.record.quality_state {
877                MemoryQualityState::Archived => entry.archived_records += 1,
878                MemoryQualityState::Deleted => {
879                    entry.deleted_records += 1;
880                    tombstoned_records += 1;
881                }
882                MemoryQualityState::Suppressed => entry.suppressed_records += 1,
883                _ => entry.active_records += 1,
884            }
885            if stored.record.scope.trust_level == MemoryTrustLevel::Pinned {
886                entry.pinned_records += 1;
887            }
888            if stored
889                .record
890                .expires_at_unix_ms
891                .is_some_and(|value| value <= now_unix_ms)
892            {
893                expired_records += 1;
894            }
895            if !matches!(
896                stored.record.quality_state,
897                MemoryQualityState::Archived
898                    | MemoryQualityState::Deleted
899                    | MemoryQualityState::Suppressed
900            ) {
901                *duplicate_groups
902                    .entry(Self::dedup_signature(&stored.record))
903                    .or_default() += 1;
904            }
905        }
906
907        let mut duplicate_candidate_groups = 0u64;
908        let mut duplicate_candidate_records = 0u64;
909        let mut historical_records = 0u64;
910        let mut superseded_records = 0u64;
911        let mut lineage_links = 0u64;
912
913        for stored in &filtered_records {
914            if matches!(
915                stored.record.historical_state,
916                MemoryHistoricalState::Historical
917            ) {
918                historical_records += 1;
919            }
920            if matches!(
921                stored.record.historical_state,
922                MemoryHistoricalState::Superseded
923            ) {
924                superseded_records += 1;
925            }
926            lineage_links += stored.record.lineage.len() as u64;
927        }
928
929        for group_size in duplicate_groups.into_values() {
930            if group_size > 1 {
931                duplicate_candidate_groups += 1;
932                duplicate_candidate_records += (group_size - 1) as u64;
933            }
934        }
935
936        Ok(StoreStatsReport {
937            generated_at_unix_ms: now_unix_ms,
938            total_records: filtered_records.len() as u64,
939            storage_bytes: self
940                .db
941                .size_on_disk()
942                .map_err(|err| Error::Backend(format!("failed to determine db size: {err}")))?,
943            namespaces: namespace_map.into_values().collect(),
944            maintenance: MaintenanceStats {
945                duplicate_candidate_groups,
946                duplicate_candidate_records,
947                tombstoned_records,
948                expired_records,
949                stale_idempotency_keys: integrity.stale_idempotency_keys,
950                historical_records,
951                superseded_records,
952                lineage_links,
953            },
954            engine: self.config.engine_config.tuning_info(),
955        })
956    }
957
958    fn approximate_tokens(record: &MemoryRecord) -> usize {
959        let content_tokens = record.content.split_whitespace().count();
960        let summary_tokens = record
961            .summary
962            .as_deref()
963            .map(|summary| summary.split_whitespace().count())
964            .unwrap_or(0);
965        content_tokens + summary_tokens
966    }
967
968    fn record_temporal_anchor(record: &MemoryRecord) -> u64 {
969        record
970            .episode
971            .as_ref()
972            .and_then(|episode| episode.last_active_unix_ms.or(episode.started_at_unix_ms))
973            .unwrap_or(record.updated_at_unix_ms)
974    }
975
976    fn selected_channels_for_hit(hit: &RecallHit, empty_query: bool) -> Vec<String> {
977        let mut selected_channels = if empty_query {
978            vec!["temporal".to_string(), "policy".to_string()]
979        } else {
980            vec!["lexical".to_string(), "policy".to_string()]
981        };
982        if hit.breakdown.semantic > 0.0 {
983            selected_channels.push("semantic".to_string());
984        }
985        if hit.breakdown.metadata > 0.0 {
986            selected_channels.push("metadata".to_string());
987        }
988        if hit.breakdown.episodic > 0.0 {
989            selected_channels.push("episodic".to_string());
990        }
991        if hit.breakdown.salience > 0.0 {
992            selected_channels.push("salience".to_string());
993        }
994        if hit.breakdown.curation > 0.0 {
995            selected_channels.push("curation".to_string());
996        }
997        if hit.record.conflict.is_some() {
998            selected_channels.push("conflict".to_string());
999        }
1000        selected_channels.sort();
1001        selected_channels.dedup();
1002        selected_channels
1003    }
1004
1005    fn planning_profile_note(profile: RecallPlanningProfile) -> &'static str {
1006        match profile {
1007            RecallPlanningProfile::FastPath => "planning_profile=fast_path",
1008            RecallPlanningProfile::ContinuityAware => "planning_profile=continuity_aware",
1009        }
1010    }
1011
1012    fn dedup_signature(record: &MemoryRecord) -> String {
1013        format!(
1014            "{}\u{1f}{}\u{1f}{}\u{1f}{}\u{1f}{}\u{1f}{}",
1015            record.scope.tenant_id,
1016            record.scope.namespace,
1017            record.scope.actor_id,
1018            record.kind as u8,
1019            record.content.trim().to_ascii_lowercase(),
1020            record
1021                .summary
1022                .clone()
1023                .unwrap_or_default()
1024                .trim()
1025                .to_ascii_lowercase()
1026        )
1027    }
1028
1029    fn summary_record_id(signature: &str) -> String {
1030        let mut hasher = std::collections::hash_map::DefaultHasher::new();
1031        signature.hash(&mut hasher);
1032        format!("compacted-summary-{:016x}", hasher.finish())
1033    }
1034
1035    fn compaction_summary_record(
1036        group: &[StoredRecord],
1037        signature: &str,
1038        now_unix_ms: u64,
1039    ) -> StoredRecord {
1040        let canonical = &group[0].record;
1041        let representative_summary = canonical
1042            .summary
1043            .clone()
1044            .filter(|value| !value.trim().is_empty())
1045            .unwrap_or_else(|| canonical.content.clone());
1046        let cluster_size = group.len();
1047        let max_importance_score = group
1048            .iter()
1049            .map(|stored| stored.record.importance_score)
1050            .fold(canonical.importance_score, f32::max);
1051
1052        let mut metadata = BTreeMap::new();
1053        metadata.insert(
1054            "compaction_reason".to_string(),
1055            "duplicate_cluster_rollup".to_string(),
1056        );
1057        metadata.insert(
1058            "compaction_cluster_size".to_string(),
1059            cluster_size.to_string(),
1060        );
1061        metadata.insert("representative_record_id".to_string(), canonical.id.clone());
1062
1063        let mut labels = canonical.scope.labels.clone();
1064        if !labels.iter().any(|label| label == "compacted") {
1065            labels.push("compacted".to_string());
1066        }
1067
1068        StoredRecord {
1069            record: MemoryRecord {
1070                id: Self::summary_record_id(signature),
1071                scope: MemoryScope {
1072                    tenant_id: canonical.scope.tenant_id.clone(),
1073                    namespace: canonical.scope.namespace.clone(),
1074                    actor_id: canonical.scope.actor_id.clone(),
1075                    conversation_id: canonical.scope.conversation_id.clone(),
1076                    session_id: canonical.scope.session_id.clone(),
1077                    source: canonical.scope.source.clone(),
1078                    labels,
1079                    trust_level: canonical.scope.trust_level,
1080                },
1081                kind: mnemara_core::MemoryRecordKind::Summary,
1082                content: format!(
1083                    "Compacted {} related records into a durable summary. Representative memory: {}",
1084                    cluster_size, representative_summary
1085                ),
1086                summary: Some(format!(
1087                    "{} related records: {}",
1088                    cluster_size, representative_summary
1089                )),
1090                source_id: None,
1091                metadata,
1092                quality_state: if matches!(canonical.quality_state, MemoryQualityState::Verified) {
1093                    MemoryQualityState::Verified
1094                } else {
1095                    MemoryQualityState::Active
1096                },
1097                created_at_unix_ms: now_unix_ms,
1098                updated_at_unix_ms: now_unix_ms,
1099                expires_at_unix_ms: None,
1100                importance_score: max_importance_score,
1101                artifact: canonical.artifact.clone(),
1102                episode: canonical.episode.clone(),
1103                historical_state: MemoryHistoricalState::Current,
1104                lineage: group
1105                    .iter()
1106                    .map(|stored| LineageLink {
1107                        record_id: stored.record.id.clone(),
1108                        relation: LineageRelationKind::ConsolidatedFrom,
1109                        confidence: 1.0,
1110                    })
1111                    .collect(),
1112                conflict: canonical.conflict.clone(),
1113            },
1114            idempotency_key: None,
1115        }
1116    }
1117
1118    fn cold_tiering_candidates(
1119        &self,
1120        tenant_id: &str,
1121        namespace: Option<&str>,
1122        now_unix_ms: u64,
1123    ) -> Result<Vec<StoredRecord>> {
1124        let cold_archive_after_days = self.config.engine_config.compaction.cold_archive_after_days;
1125        if cold_archive_after_days == 0 {
1126            return Ok(Vec::new());
1127        }
1128        let archive_threshold_ms =
1129            u64::from(cold_archive_after_days).saturating_mul(24 * 60 * 60 * 1_000);
1130        let max_importance = f32::from(
1131            self.config
1132                .engine_config
1133                .compaction
1134                .cold_archive_importance_threshold_per_mille,
1135        ) / 1000.0;
1136
1137        Ok(self
1138            .iterate_records()?
1139            .into_iter()
1140            .filter(|stored| stored.record.scope.tenant_id == tenant_id)
1141            .filter(|stored| namespace.is_none_or(|value| stored.record.scope.namespace == value))
1142            .filter(|stored| {
1143                matches!(
1144                    stored.record.quality_state,
1145                    MemoryQualityState::Draft
1146                        | MemoryQualityState::Active
1147                        | MemoryQualityState::Verified
1148                )
1149            })
1150            .filter(|stored| !Self::is_pinned(&stored.record))
1151            .filter(|stored| {
1152                now_unix_ms.saturating_sub(stored.record.updated_at_unix_ms) > archive_threshold_ms
1153                    && stored.record.importance_score <= max_importance
1154            })
1155            .collect())
1156    }
1157
1158    fn persist_record(&self, stored: &StoredRecord) -> Result<()> {
1159        self.records
1160            .insert(stored.record.id.as_bytes(), Self::encode_record(stored)?)
1161            .map_err(|err| Error::Backend(format!("failed to write record: {err}")))?;
1162        Ok(())
1163    }
1164
1165    fn persist_imported_record(&self, stored: &StoredRecord) -> Result<()> {
1166        self.persist_record(stored)?;
1167        if let Some(idempotency_key) = &stored.idempotency_key {
1168            let scoped_key = Self::idempotency_scope_key(&stored.record.scope, idempotency_key);
1169            self.idempotency
1170                .insert(scoped_key.as_bytes(), stored.record.id.as_bytes())
1171                .map_err(|err| Error::Backend(format!("failed to write idempotency key: {err}")))?;
1172        }
1173        Ok(())
1174    }
1175
1176    fn retention_delete(&self, stored: StoredRecord) -> Result<()> {
1177        self.records
1178            .remove(stored.record.id.as_bytes())
1179            .map_err(|err| Error::Backend(format!("failed to delete expired record: {err}")))?;
1180        self.remove_idempotency_mapping(&stored)?;
1181        Ok(())
1182    }
1183
1184    fn clear_all_records(&self) -> Result<()> {
1185        for stored in self.iterate_records()? {
1186            self.records
1187                .remove(stored.record.id.as_bytes())
1188                .map_err(|err| Error::Backend(format!("failed to clear record: {err}")))?;
1189            self.remove_idempotency_mapping(&stored)?;
1190        }
1191        Ok(())
1192    }
1193
1194    fn retention_archive(&self, stored: &mut StoredRecord, now_unix_ms: u64) -> Result<bool> {
1195        if stored.record.quality_state == MemoryQualityState::Archived {
1196            return Ok(false);
1197        }
1198        stored.record.quality_state = MemoryQualityState::Archived;
1199        stored.record.historical_state = MemoryHistoricalState::Historical;
1200        stored.record.updated_at_unix_ms = now_unix_ms;
1201        self.persist_record(stored)?;
1202        Ok(true)
1203    }
1204
1205    fn apply_retention_for_namespace(
1206        &self,
1207        tenant_id: &str,
1208        namespace: &str,
1209    ) -> Result<(u64, u64)> {
1210        let now_unix_ms = Self::now_unix_ms()?;
1211        let retention = &self.config.engine_config.retention;
1212        let ttl_window_ms = u64::from(retention.ttl_days).saturating_mul(24 * 60 * 60 * 1_000);
1213        let archive_window_ms =
1214            u64::from(retention.archive_after_days).saturating_mul(24 * 60 * 60 * 1_000);
1215
1216        let mut archived_records = 0u64;
1217        let mut deleted_records = 0u64;
1218        let mut namespace_records = self
1219            .iterate_records()?
1220            .into_iter()
1221            .filter(|stored| {
1222                stored.record.scope.tenant_id == tenant_id
1223                    && stored.record.scope.namespace == namespace
1224            })
1225            .collect::<Vec<_>>();
1226
1227        for stored in &mut namespace_records {
1228            if self.retention_exempt(&stored.record) {
1229                continue;
1230            }
1231
1232            let expired_by_explicit_deadline = stored
1233                .record
1234                .expires_at_unix_ms
1235                .is_some_and(|expires_at| expires_at <= now_unix_ms);
1236            let expired_by_ttl = ttl_window_ms > 0
1237                && now_unix_ms.saturating_sub(stored.record.created_at_unix_ms) > ttl_window_ms;
1238
1239            if expired_by_explicit_deadline || expired_by_ttl {
1240                if !matches!(stored.record.quality_state, MemoryQualityState::Deleted) {
1241                    self.retention_delete(stored.clone())?;
1242                    deleted_records += 1;
1243                }
1244                continue;
1245            }
1246
1247            let should_archive_by_age = archive_window_ms > 0
1248                && now_unix_ms.saturating_sub(stored.record.created_at_unix_ms) > archive_window_ms
1249                && matches!(
1250                    stored.record.quality_state,
1251                    MemoryQualityState::Draft
1252                        | MemoryQualityState::Active
1253                        | MemoryQualityState::Verified
1254                );
1255
1256            if should_archive_by_age && self.retention_archive(stored, now_unix_ms)? {
1257                archived_records += 1;
1258            }
1259        }
1260
1261        if retention.max_records_per_namespace > 0 {
1262            let mut candidates = self
1263                .iterate_records()?
1264                .into_iter()
1265                .filter(|stored| {
1266                    stored.record.scope.tenant_id == tenant_id
1267                        && stored.record.scope.namespace == namespace
1268                        && !self.retention_exempt(&stored.record)
1269                        && matches!(
1270                            stored.record.quality_state,
1271                            MemoryQualityState::Draft
1272                                | MemoryQualityState::Active
1273                                | MemoryQualityState::Verified
1274                        )
1275                })
1276                .collect::<Vec<_>>();
1277
1278            if candidates.len() > retention.max_records_per_namespace {
1279                candidates.sort_by(|left, right| {
1280                    left.record
1281                        .updated_at_unix_ms
1282                        .cmp(&right.record.updated_at_unix_ms)
1283                        .then_with(|| {
1284                            left.record
1285                                .importance_score
1286                                .total_cmp(&right.record.importance_score)
1287                        })
1288                        .then_with(|| left.record.id.cmp(&right.record.id))
1289                });
1290
1291                let archive_count = candidates.len() - retention.max_records_per_namespace;
1292                for stored in candidates.iter_mut().take(archive_count) {
1293                    if self.retention_archive(stored, now_unix_ms)? {
1294                        archived_records += 1;
1295                    }
1296                }
1297            }
1298        }
1299
1300        Ok((archived_records, deleted_records))
1301    }
1302}
1303
1304#[async_trait]
1305impl MemoryStore for SledMemoryStore {
1306    fn backend_kind(&self) -> &'static str {
1307        "sled"
1308    }
1309
1310    async fn upsert(&self, request: UpsertRequest) -> Result<UpsertReceipt> {
1311        self.validate_upsert_request(&request)?;
1312
1313        if let Some(idempotency_key) = &request.idempotency_key {
1314            let scoped_key = Self::idempotency_scope_key(&request.record.scope, idempotency_key);
1315            if let Some(existing_record_id) = self
1316                .idempotency
1317                .get(scoped_key.as_bytes())
1318                .map_err(|err| Error::Backend(format!("failed to read idempotency key: {err}")))?
1319            {
1320                let existing_record_id =
1321                    String::from_utf8(existing_record_id.to_vec()).map_err(|err| {
1322                        Error::Backend(format!(
1323                            "stored idempotency mapping was not valid utf-8: {err}"
1324                        ))
1325                    })?;
1326                if existing_record_id != request.record.id {
1327                    return Err(Error::Conflict(format!(
1328                        "idempotency key already belongs to record {}",
1329                        existing_record_id
1330                    )));
1331                }
1332                if self.fetch_record(&existing_record_id)?.is_some() {
1333                    return Ok(UpsertReceipt {
1334                        record_id: existing_record_id,
1335                        deduplicated: true,
1336                        summary_refreshed: false,
1337                    });
1338                }
1339                self.idempotency
1340                    .remove(scoped_key.as_bytes())
1341                    .map_err(|err| {
1342                        Error::Backend(format!("failed to clear stale idempotency key: {err}"))
1343                    })?;
1344            }
1345        }
1346
1347        let key = request.record.id.clone();
1348        let tenant_id = request.record.scope.tenant_id.clone();
1349        let namespace = request.record.scope.namespace.clone();
1350        let existing = self.fetch_record(&key)?;
1351        let deduplicated = existing.is_some();
1352        let stored = StoredRecord {
1353            record: request.record,
1354            idempotency_key: request.idempotency_key,
1355        };
1356        if let Some(existing) = existing
1357            && existing.idempotency_key != stored.idempotency_key
1358        {
1359            self.remove_idempotency_mapping(&existing)?;
1360        }
1361        self.persist_record(&stored)?;
1362        if let Some(idempotency_key) = &stored.idempotency_key {
1363            let scoped_key = Self::idempotency_scope_key(&stored.record.scope, idempotency_key);
1364            self.idempotency
1365                .insert(scoped_key.as_bytes(), key.as_bytes())
1366                .map_err(|err| Error::Backend(format!("failed to write idempotency key: {err}")))?;
1367        }
1368        self.apply_retention_for_namespace(&tenant_id, &namespace)?;
1369        self.db
1370            .flush_async()
1371            .await
1372            .map_err(|err| Error::Backend(format!("failed to flush sled db: {err}")))?;
1373        Ok(UpsertReceipt {
1374            record_id: key,
1375            deduplicated,
1376            summary_refreshed: false,
1377        })
1378    }
1379
1380    async fn batch_upsert(&self, request: BatchUpsertRequest) -> Result<Vec<UpsertReceipt>> {
1381        if request.requests.len() > self.config.engine_config.max_batch_size {
1382            return Err(Error::InvalidRequest(format!(
1383                "batch size {} exceeds configured max_batch_size {}",
1384                request.requests.len(),
1385                self.config.engine_config.max_batch_size
1386            )));
1387        }
1388        for item in &request.requests {
1389            self.validate_upsert_request(item)?;
1390        }
1391        let mut receipts = Vec::with_capacity(request.requests.len());
1392        for item in request.requests {
1393            receipts.push(self.upsert(item).await?);
1394        }
1395        Ok(receipts)
1396    }
1397
1398    async fn recall(&self, query: RecallQuery) -> Result<RecallResult> {
1399        let empty_query = query.query_text.trim().is_empty();
1400        let planner = self.config.recall_planner();
1401        let scorer = planner.scorer();
1402        let planning_profile = planner.effective_profile(&query);
1403        let stored_records = self.iterate_records()?;
1404        let relative_bounds = Self::relative_temporal_bounds(&stored_records, &query)?;
1405        let records = stored_records
1406            .into_iter()
1407            .filter(|stored| Self::record_passes_filters(&stored.record, &query, relative_bounds))
1408            .map(|stored| stored.record)
1409            .collect::<Vec<_>>();
1410        let mut scored = planner.plan(&records, &query);
1411        match query.filters.temporal_order {
1412            RecallTemporalOrder::Relevance if empty_query => {
1413                scored.sort_by(|left, right| {
1414                    Self::record_temporal_anchor(&right.hit.record)
1415                        .cmp(&Self::record_temporal_anchor(&left.hit.record))
1416                        .then_with(|| {
1417                            right
1418                                .hit
1419                                .record
1420                                .importance_score
1421                                .total_cmp(&left.hit.record.importance_score)
1422                        })
1423                        .then_with(|| left.hit.record.id.cmp(&right.hit.record.id))
1424                });
1425            }
1426            RecallTemporalOrder::Relevance => {
1427                scored.sort_by(|left, right| {
1428                    right
1429                        .hit
1430                        .breakdown
1431                        .total
1432                        .total_cmp(&left.hit.breakdown.total)
1433                        .then_with(|| left.hit.record.id.cmp(&right.hit.record.id))
1434                });
1435            }
1436            RecallTemporalOrder::ChronologicalAsc => {
1437                scored.sort_by(|left, right| {
1438                    Self::record_temporal_anchor(&left.hit.record)
1439                        .cmp(&Self::record_temporal_anchor(&right.hit.record))
1440                        .then_with(|| {
1441                            right
1442                                .hit
1443                                .breakdown
1444                                .total
1445                                .total_cmp(&left.hit.breakdown.total)
1446                        })
1447                        .then_with(|| left.hit.record.id.cmp(&right.hit.record.id))
1448                });
1449            }
1450            RecallTemporalOrder::ChronologicalDesc => {
1451                scored.sort_by(|left, right| {
1452                    Self::record_temporal_anchor(&right.hit.record)
1453                        .cmp(&Self::record_temporal_anchor(&left.hit.record))
1454                        .then_with(|| {
1455                            right
1456                                .hit
1457                                .breakdown
1458                                .total
1459                                .total_cmp(&left.hit.breakdown.total)
1460                        })
1461                        .then_with(|| left.hit.record.id.cmp(&right.hit.record.id))
1462                });
1463            }
1464        }
1465
1466        let examined = scored.len();
1467        let mut selected_ids = Vec::with_capacity(query.max_items);
1468        let mut remaining_budget = query.token_budget.unwrap_or(usize::MAX);
1469        for candidate in &scored {
1470            if selected_ids.len() >= query.max_items {
1471                break;
1472            }
1473            let estimated_tokens = Self::approximate_tokens(&candidate.hit.record);
1474            if selected_ids.is_empty() || estimated_tokens <= remaining_budget {
1475                remaining_budget = remaining_budget.saturating_sub(estimated_tokens);
1476                selected_ids.push(candidate.hit.record.id.clone());
1477            }
1478        }
1479
1480        let trace_id = format!(
1481            "recall:{}:{}:{}",
1482            query.scope.tenant_id, query.scope.namespace, examined
1483        );
1484        let selected_set = selected_ids.iter().cloned().collect::<BTreeSet<_>>();
1485        let selected = scored
1486            .iter()
1487            .filter(|candidate| selected_set.contains(&candidate.hit.record.id))
1488            .map(|candidate| {
1489                let mut enriched = candidate.hit.clone();
1490                if query.include_explanation {
1491                    let selected_channels =
1492                        Self::selected_channels_for_hit(&candidate.hit, empty_query);
1493                    enriched.explanation = Some(RecallExplanation {
1494                        selected_channels,
1495                        policy_notes: vec![if empty_query {
1496                            "recent_scope_scan".to_string()
1497                        } else {
1498                            "initial_sled_backend_scoring".to_string()
1499                        }],
1500                        trace_id: Some(trace_id.clone()),
1501                        planning_trace: None,
1502                        planning_profile: Some(planning_profile),
1503                        policy_profile: Some(scorer.policy_profile()),
1504                        scorer_kind: Some(scorer.scorer_kind()),
1505                        scoring_profile: Some(scorer.scoring_profile()),
1506                    });
1507                    if let Some(explanation) = enriched.explanation.as_mut() {
1508                        explanation
1509                            .policy_notes
1510                            .push(scorer.profile_note().to_string());
1511                        explanation
1512                            .policy_notes
1513                            .push(scorer.policy_profile_note().to_string());
1514                        explanation
1515                            .policy_notes
1516                            .push(Self::planning_profile_note(planning_profile).to_string());
1517                        if let Some(note) = scorer.embedding_note() {
1518                            explanation.policy_notes.push(note.to_string());
1519                        }
1520                        if query.filters.episode_id.is_some() {
1521                            explanation
1522                                .policy_notes
1523                                .push("episode_filter_applied".to_string());
1524                        }
1525                        if query.filters.unresolved_only {
1526                            explanation
1527                                .policy_notes
1528                                .push("unresolved_only_filter_applied".to_string());
1529                        }
1530                        if !candidate.matched_terms.is_empty() {
1531                            explanation.policy_notes.push(format!(
1532                                "matched_terms={}",
1533                                candidate.matched_terms.join(",")
1534                            ));
1535                        }
1536                    }
1537                }
1538                enriched
1539            })
1540            .collect::<Vec<_>>();
1541
1542        let mut policy_notes = vec![if empty_query {
1543            "recent_scope_scan".to_string()
1544        } else {
1545            "initial_sled_backend_scoring".to_string()
1546        }];
1547        policy_notes.push(scorer.profile_note().to_string());
1548        policy_notes.push(scorer.policy_profile_note().to_string());
1549        policy_notes.push(Self::planning_profile_note(planning_profile).to_string());
1550        if let Some(note) = scorer.embedding_note() {
1551            policy_notes.push(note.to_string());
1552        }
1553        if query.token_budget.is_some() {
1554            policy_notes.push("token_budget_applied".to_string());
1555        }
1556        if query.filters.episode_id.is_some() {
1557            policy_notes.push("episode_filter_applied".to_string());
1558        }
1559        if query.filters.unresolved_only {
1560            policy_notes.push("unresolved_only_filter_applied".to_string());
1561        }
1562        if query.filters.before_record_id.is_some() || query.filters.after_record_id.is_some() {
1563            policy_notes.push("relative_temporal_filter_applied".to_string());
1564        }
1565        if !query.filters.boundary_labels.is_empty() || query.filters.recurrence_key.is_some() {
1566            policy_notes.push("episodic_boundary_filter_applied".to_string());
1567        }
1568        if !query.filters.conflict_states.is_empty()
1569            || !query.filters.resolution_kinds.is_empty()
1570            || query.filters.unresolved_conflicts_only
1571        {
1572            policy_notes.push("conflict_review_filter_applied".to_string());
1573        }
1574        let mut selected_channels = if empty_query {
1575            vec!["temporal".to_string(), "policy".to_string()]
1576        } else {
1577            vec!["lexical".to_string(), "policy".to_string()]
1578        };
1579        for channel in [
1580            "semantic", "metadata", "episodic", "salience", "curation", "conflict",
1581        ] {
1582            let present = scored.iter().any(|candidate| match channel {
1583                "semantic" => candidate.hit.breakdown.semantic > 0.0,
1584                "metadata" => candidate.hit.breakdown.metadata > 0.0,
1585                "episodic" => candidate.hit.breakdown.episodic > 0.0,
1586                "salience" => candidate.hit.breakdown.salience > 0.0,
1587                "curation" => candidate.hit.breakdown.curation > 0.0,
1588                "conflict" => candidate.hit.record.conflict.is_some(),
1589                _ => false,
1590            });
1591            if present && !selected_channels.iter().any(|existing| existing == channel) {
1592                selected_channels.push(channel.to_string());
1593            }
1594        }
1595
1596        Ok(RecallResult {
1597            hits: selected,
1598            total_candidates_examined: examined,
1599            explanation: query.include_explanation.then(|| RecallExplanation {
1600                selected_channels,
1601                policy_notes,
1602                trace_id: Some(trace_id.clone()),
1603                planning_profile: Some(planning_profile),
1604                policy_profile: Some(scorer.policy_profile()),
1605                planning_trace: Some(RecallPlanningTrace {
1606                    trace_id,
1607                    token_budget_applied: query.token_budget.is_some(),
1608                    candidates: scored
1609                        .into_iter()
1610                        .enumerate()
1611                        .map(|(index, candidate)| {
1612                            let record_id = candidate.hit.record.id.clone();
1613                            let selected = selected_set.contains(&record_id);
1614                            let selection_rank = selected_ids
1615                                .iter()
1616                                .position(|candidate_id| candidate_id == &record_id)
1617                                .map(|index| index as u32 + 1);
1618                            let candidate_channels =
1619                                Self::selected_channels_for_hit(&candidate.hit, empty_query);
1620                            let mut filter_reasons = Vec::new();
1621                            if selected {
1622                                filter_reasons.push("retained".to_string());
1623                            } else {
1624                                if index >= query.max_items {
1625                                    filter_reasons.push("max_items_exhausted".to_string());
1626                                }
1627                                if query.token_budget.is_some() {
1628                                    filter_reasons.push("token_budget_exhausted".to_string());
1629                                }
1630                            }
1631                            RecallTraceCandidate {
1632                                record_id,
1633                                kind: candidate.hit.record.kind,
1634                                selected,
1635                                planner_stage: candidate.planner_stage,
1636                                candidate_sources: candidate.candidate_sources,
1637                                selection_rank,
1638                                matched_terms: candidate.matched_terms,
1639                                selected_channels: candidate_channels,
1640                                filter_reasons,
1641                                decision_reason: if selected {
1642                                    "selected_by_rank".to_string()
1643                                } else if query.token_budget.is_some() {
1644                                    "excluded_by_rank_or_budget".to_string()
1645                                } else {
1646                                    "excluded_by_rank".to_string()
1647                                },
1648                                breakdown: candidate.hit.breakdown,
1649                            }
1650                        })
1651                        .collect(),
1652                }),
1653                scorer_kind: Some(scorer.scorer_kind()),
1654                scoring_profile: Some(scorer.scoring_profile()),
1655            }),
1656        })
1657    }
1658
1659    async fn compact(&self, request: CompactionRequest) -> Result<CompactionReport> {
1660        if request.tenant_id.trim().is_empty() {
1661            return Err(Error::InvalidRequest(
1662                "compaction tenant_id is required".to_string(),
1663            ));
1664        }
1665
1666        let records = self.iterate_records()?;
1667        let mut groups: HashMap<String, Vec<StoredRecord>> = HashMap::new();
1668        for stored in records {
1669            if stored.record.scope.tenant_id != request.tenant_id {
1670                continue;
1671            }
1672            if let Some(namespace) = &request.namespace
1673                && stored.record.scope.namespace != *namespace
1674            {
1675                continue;
1676            }
1677            if matches!(
1678                stored.record.quality_state,
1679                MemoryQualityState::Archived
1680                    | MemoryQualityState::Deleted
1681                    | MemoryQualityState::Suppressed
1682            ) {
1683                continue;
1684            }
1685            groups
1686                .entry(Self::dedup_signature(&stored.record))
1687                .or_default()
1688                .push(stored);
1689        }
1690
1691        let mut deduplicated_records = 0u64;
1692        let mut archived_records = 0u64;
1693        let mut summarized_clusters = 0u64;
1694        let mut superseded_records = 0u64;
1695        let mut lineage_links_created = 0u64;
1696        let now_unix_ms = Self::now_unix_ms()?;
1697        for group in groups.values_mut() {
1698            if group.len() < 2 {
1699                continue;
1700            }
1701            group.sort_by(|left, right| {
1702                right
1703                    .record
1704                    .updated_at_unix_ms
1705                    .cmp(&left.record.updated_at_unix_ms)
1706                    .then_with(|| {
1707                        right
1708                            .record
1709                            .importance_score
1710                            .total_cmp(&left.record.importance_score)
1711                    })
1712                    .then_with(|| left.record.id.cmp(&right.record.id))
1713            });
1714
1715            let signature = Self::dedup_signature(&group[0].record);
1716            if self
1717                .config
1718                .engine_config
1719                .compaction
1720                .summarize_after_record_count
1721                > 0
1722                && group.len()
1723                    >= self
1724                        .config
1725                        .engine_config
1726                        .compaction
1727                        .summarize_after_record_count
1728            {
1729                summarized_clusters += 1;
1730                lineage_links_created += group.len() as u64;
1731                if !request.dry_run {
1732                    let summary = Self::compaction_summary_record(group, &signature, now_unix_ms);
1733                    self.persist_record(&summary)?;
1734                }
1735            }
1736
1737            for duplicate in group.iter().skip(1) {
1738                deduplicated_records += 1;
1739                archived_records += 1;
1740                superseded_records += 1;
1741                if request.dry_run {
1742                    continue;
1743                }
1744                let mut archived = duplicate.clone();
1745                archived.record.quality_state = MemoryQualityState::Archived;
1746                archived.record.historical_state = MemoryHistoricalState::Superseded;
1747                archived.record.lineage.push(LineageLink {
1748                    record_id: Self::summary_record_id(&signature),
1749                    relation: LineageRelationKind::SupersededBy,
1750                    confidence: 1.0,
1751                });
1752                lineage_links_created += 1;
1753                archived.record.updated_at_unix_ms = Self::now_unix_ms()?;
1754                self.records
1755                    .insert(
1756                        archived.record.id.as_bytes(),
1757                        Self::encode_record(&archived)?,
1758                    )
1759                    .map_err(|err| {
1760                        Error::Backend(format!("failed to archive duplicate record: {err}"))
1761                    })?;
1762            }
1763        }
1764
1765        for candidate in self.cold_tiering_candidates(
1766            &request.tenant_id,
1767            request.namespace.as_deref(),
1768            now_unix_ms,
1769        )? {
1770            archived_records += 1;
1771            if request.dry_run {
1772                continue;
1773            }
1774            let mut archived = candidate;
1775            archived.record.quality_state = MemoryQualityState::Archived;
1776            archived.record.historical_state = MemoryHistoricalState::Historical;
1777            archived.record.updated_at_unix_ms = now_unix_ms;
1778            self.persist_record(&archived)?;
1779        }
1780
1781        if !request.dry_run {
1782            self.db
1783                .flush_async()
1784                .await
1785                .map_err(|err| Error::Backend(format!("failed to flush sled db: {err}")))?;
1786        }
1787
1788        Ok(CompactionReport {
1789            deduplicated_records,
1790            archived_records,
1791            summarized_clusters,
1792            pruned_graph_edges: 0,
1793            superseded_records,
1794            lineage_links_created,
1795            dry_run: request.dry_run,
1796        })
1797    }
1798
1799    async fn delete(&self, request: DeleteRequest) -> Result<DeleteReceipt> {
1800        Self::validate_delete_request(&request)?;
1801
1802        let Some(stored) = self.fetch_record(&request.record_id)? else {
1803            return Ok(DeleteReceipt {
1804                record_id: request.record_id,
1805                tombstoned: false,
1806                hard_deleted: false,
1807            });
1808        };
1809
1810        if stored.record.scope.tenant_id != request.tenant_id {
1811            return Err(Error::InvalidRequest(format!(
1812                "record {} does not belong to tenant {}",
1813                request.record_id, request.tenant_id
1814            )));
1815        }
1816        if stored.record.scope.namespace != request.namespace {
1817            return Err(Error::InvalidRequest(format!(
1818                "record {} does not belong to namespace {}",
1819                request.record_id, request.namespace
1820            )));
1821        }
1822
1823        if request.hard_delete {
1824            self.records
1825                .remove(request.record_id.as_bytes())
1826                .map_err(|err| Error::Backend(format!("failed to delete record: {err}")))?;
1827            self.remove_idempotency_mapping(&stored)?;
1828        } else {
1829            let mut tombstone = stored;
1830            tombstone.record.quality_state = MemoryQualityState::Deleted;
1831            tombstone.record.updated_at_unix_ms = Self::now_unix_ms()?;
1832            self.records
1833                .insert(
1834                    tombstone.record.id.as_bytes(),
1835                    Self::encode_record(&tombstone)?,
1836                )
1837                .map_err(|err| Error::Backend(format!("failed to write tombstone: {err}")))?;
1838        }
1839
1840        self.db
1841            .flush_async()
1842            .await
1843            .map_err(|err| Error::Backend(format!("failed to flush sled db: {err}")))?;
1844        Ok(DeleteReceipt {
1845            record_id: request.record_id,
1846            tombstoned: !request.hard_delete,
1847            hard_deleted: request.hard_delete,
1848        })
1849    }
1850
1851    async fn archive(&self, request: ArchiveRequest) -> Result<ArchiveReceipt> {
1852        Self::validate_archive_request(&request)?;
1853
1854        let Some(mut stored) = self.fetch_record(&request.record_id)? else {
1855            return Err(Error::InvalidRequest(format!(
1856                "record {} was not found",
1857                request.record_id
1858            )));
1859        };
1860        Self::validate_record_scope(&stored, &request.tenant_id, &request.namespace)?;
1861
1862        let previous_quality_state = stored.record.quality_state;
1863        let previous_historical_state = stored.record.historical_state;
1864        let changed = previous_quality_state != MemoryQualityState::Archived
1865            || previous_historical_state == MemoryHistoricalState::Current;
1866        let historical_state = match previous_historical_state {
1867            MemoryHistoricalState::Current => MemoryHistoricalState::Historical,
1868            other => other,
1869        };
1870
1871        if changed && !request.dry_run {
1872            stored.record.quality_state = MemoryQualityState::Archived;
1873            stored.record.historical_state = historical_state;
1874            stored.record.updated_at_unix_ms = Self::now_unix_ms()?;
1875            self.records
1876                .insert(stored.record.id.as_bytes(), Self::encode_record(&stored)?)
1877                .map_err(|err| Error::Backend(format!("failed to archive record: {err}")))?;
1878            self.db
1879                .flush_async()
1880                .await
1881                .map_err(|err| Error::Backend(format!("failed to flush sled db: {err}")))?;
1882        }
1883
1884        Ok(ArchiveReceipt {
1885            record_id: request.record_id,
1886            previous_quality_state,
1887            previous_historical_state,
1888            quality_state: MemoryQualityState::Archived,
1889            historical_state,
1890            changed,
1891            dry_run: request.dry_run,
1892        })
1893    }
1894
1895    async fn suppress(&self, request: SuppressRequest) -> Result<SuppressReceipt> {
1896        Self::validate_suppress_request(&request)?;
1897
1898        let Some(mut stored) = self.fetch_record(&request.record_id)? else {
1899            return Err(Error::InvalidRequest(format!(
1900                "record {} was not found",
1901                request.record_id
1902            )));
1903        };
1904        Self::validate_record_scope(&stored, &request.tenant_id, &request.namespace)?;
1905
1906        let previous_quality_state = stored.record.quality_state;
1907        let previous_historical_state = stored.record.historical_state;
1908        let changed = previous_quality_state != MemoryQualityState::Suppressed;
1909
1910        if changed && !request.dry_run {
1911            stored.record.quality_state = MemoryQualityState::Suppressed;
1912            stored.record.updated_at_unix_ms = Self::now_unix_ms()?;
1913            self.records
1914                .insert(stored.record.id.as_bytes(), Self::encode_record(&stored)?)
1915                .map_err(|err| Error::Backend(format!("failed to suppress record: {err}")))?;
1916            self.db
1917                .flush_async()
1918                .await
1919                .map_err(|err| Error::Backend(format!("failed to flush sled db: {err}")))?;
1920        }
1921
1922        Ok(SuppressReceipt {
1923            record_id: request.record_id,
1924            previous_quality_state,
1925            previous_historical_state,
1926            quality_state: MemoryQualityState::Suppressed,
1927            historical_state: previous_historical_state,
1928            changed,
1929            dry_run: request.dry_run,
1930        })
1931    }
1932
1933    async fn recover(&self, request: RecoverRequest) -> Result<RecoverReceipt> {
1934        Self::validate_recover_request(&request)?;
1935
1936        let Some(mut stored) = self.fetch_record(&request.record_id)? else {
1937            return Err(Error::InvalidRequest(format!(
1938                "record {} was not found",
1939                request.record_id
1940            )));
1941        };
1942        Self::validate_record_scope(&stored, &request.tenant_id, &request.namespace)?;
1943
1944        let previous_quality_state = stored.record.quality_state;
1945        let previous_historical_state = stored.record.historical_state;
1946        let historical_state = request
1947            .historical_state
1948            .unwrap_or(MemoryHistoricalState::Current);
1949        let changed = previous_quality_state != request.quality_state
1950            || previous_historical_state != historical_state;
1951
1952        if changed && !request.dry_run {
1953            stored.record.quality_state = request.quality_state;
1954            stored.record.historical_state = historical_state;
1955            stored.record.updated_at_unix_ms = Self::now_unix_ms()?;
1956            self.records
1957                .insert(stored.record.id.as_bytes(), Self::encode_record(&stored)?)
1958                .map_err(|err| Error::Backend(format!("failed to recover record: {err}")))?;
1959            self.db
1960                .flush_async()
1961                .await
1962                .map_err(|err| Error::Backend(format!("failed to flush sled db: {err}")))?;
1963        }
1964
1965        Ok(RecoverReceipt {
1966            record_id: request.record_id,
1967            previous_quality_state,
1968            previous_historical_state,
1969            quality_state: request.quality_state,
1970            historical_state,
1971            changed,
1972            dry_run: request.dry_run,
1973        })
1974    }
1975
1976    async fn snapshot(&self) -> Result<SnapshotManifest> {
1977        let records = self.iterate_records()?;
1978        let namespaces = records
1979            .iter()
1980            .map(|stored| stored.record.scope.namespace.clone())
1981            .collect::<BTreeSet<_>>()
1982            .into_iter()
1983            .collect::<Vec<_>>();
1984        let created_at_unix_ms = Self::now_unix_ms()?;
1985        let storage_bytes = self
1986            .db
1987            .size_on_disk()
1988            .map_err(|err| Error::Backend(format!("failed to determine db size: {err}")))?;
1989
1990        Ok(SnapshotManifest {
1991            snapshot_id: format!("snapshot-{created_at_unix_ms}"),
1992            created_at_unix_ms,
1993            namespaces,
1994            record_count: records.len() as u64,
1995            storage_bytes,
1996            engine: self.config.engine_config.tuning_info(),
1997        })
1998    }
1999
2000    async fn stats(&self, request: StoreStatsRequest) -> Result<StoreStatsReport> {
2001        self.build_stats_report(&request)
2002    }
2003
2004    async fn integrity_check(
2005        &self,
2006        request: IntegrityCheckRequest,
2007    ) -> Result<IntegrityCheckReport> {
2008        let summary = self
2009            .build_integrity_summary(request.tenant_id.as_deref(), request.namespace.as_deref())?;
2010        Ok(IntegrityCheckReport {
2011            generated_at_unix_ms: Self::now_unix_ms()?,
2012            healthy: summary.stale_idempotency_keys == 0
2013                && summary.missing_idempotency_keys == 0
2014                && summary.duplicate_active_records == 0,
2015            scanned_records: summary.scanned_records,
2016            scanned_idempotency_keys: summary.scanned_idempotency_keys,
2017            stale_idempotency_keys: summary.stale_idempotency_keys,
2018            missing_idempotency_keys: summary.missing_idempotency_keys,
2019            duplicate_active_records: summary.duplicate_active_records,
2020        })
2021    }
2022
2023    async fn repair(&self, request: RepairRequest) -> Result<RepairReport> {
2024        if request.reason.trim().is_empty() {
2025            return Err(Error::InvalidRequest(
2026                "repair reason is required".to_string(),
2027            ));
2028        }
2029        if !request.remove_stale_idempotency_keys && !request.rebuild_missing_idempotency_keys {
2030            return Err(Error::InvalidRequest(
2031                "repair requires at least one enabled action".to_string(),
2032            ));
2033        }
2034
2035        let tenant_filter = request.tenant_id.as_deref();
2036        let namespace_filter = request.namespace.as_deref();
2037        let summary = self.build_integrity_summary(tenant_filter, namespace_filter)?;
2038        let records = self.iterate_records()?;
2039        let mappings = self.iterate_idempotency_mappings()?;
2040        let mut removed_stale_idempotency_keys = 0u64;
2041        let mut rebuilt_missing_idempotency_keys = 0u64;
2042
2043        if request.remove_stale_idempotency_keys {
2044            for mapping in &mappings {
2045                let Some((tenant_id, namespace, _, _, _, idempotency_key)) =
2046                    Self::parse_scope_key(&mapping.scoped_key)
2047                else {
2048                    continue;
2049                };
2050                if !Self::scope_matches_filters(
2051                    &tenant_id,
2052                    &namespace,
2053                    tenant_filter,
2054                    namespace_filter,
2055                ) {
2056                    continue;
2057                }
2058                let stale = match self.fetch_record(&mapping.record_id)? {
2059                    Some(stored) => {
2060                        stored.record.scope.tenant_id != tenant_id
2061                            || stored.record.scope.namespace != namespace
2062                            || stored.idempotency_key.as_deref() != Some(idempotency_key.as_str())
2063                            || Self::idempotency_scope_key(&stored.record.scope, &idempotency_key)
2064                                != mapping.scoped_key
2065                    }
2066                    None => true,
2067                };
2068                if stale {
2069                    removed_stale_idempotency_keys += 1;
2070                    if !request.dry_run {
2071                        self.idempotency
2072                            .remove(mapping.scoped_key.as_bytes())
2073                            .map_err(|err| {
2074                                Error::Backend(format!(
2075                                    "failed to remove stale idempotency key: {err}"
2076                                ))
2077                            })?;
2078                    }
2079                }
2080            }
2081        }
2082
2083        if request.rebuild_missing_idempotency_keys {
2084            let existing = self.iterate_idempotency_mappings()?;
2085            let existing_lookup = existing
2086                .into_iter()
2087                .map(|mapping| (mapping.scoped_key, mapping.record_id))
2088                .collect::<HashMap<_, _>>();
2089
2090            for stored in &records {
2091                if !Self::scope_matches_filters(
2092                    &stored.record.scope.tenant_id,
2093                    &stored.record.scope.namespace,
2094                    tenant_filter,
2095                    namespace_filter,
2096                ) {
2097                    continue;
2098                }
2099                let Some(idempotency_key) = &stored.idempotency_key else {
2100                    continue;
2101                };
2102                let scoped_key = Self::idempotency_scope_key(&stored.record.scope, idempotency_key);
2103                if existing_lookup.get(&scoped_key) == Some(&stored.record.id) {
2104                    continue;
2105                }
2106                rebuilt_missing_idempotency_keys += 1;
2107                if !request.dry_run {
2108                    self.idempotency
2109                        .insert(scoped_key.as_bytes(), stored.record.id.as_bytes())
2110                        .map_err(|err| {
2111                            Error::Backend(format!("failed to rebuild idempotency key: {err}"))
2112                        })?;
2113                }
2114            }
2115        }
2116
2117        if !request.dry_run {
2118            self.db
2119                .flush_async()
2120                .await
2121                .map_err(|err| Error::Backend(format!("failed to flush sled db: {err}")))?;
2122        }
2123
2124        let stale_after = if request.remove_stale_idempotency_keys {
2125            0
2126        } else {
2127            summary.stale_idempotency_keys
2128        };
2129        let missing_after = if request.rebuild_missing_idempotency_keys {
2130            0
2131        } else {
2132            summary.missing_idempotency_keys
2133        };
2134
2135        Ok(RepairReport {
2136            dry_run: request.dry_run,
2137            scanned_records: summary.scanned_records,
2138            scanned_idempotency_keys: summary.scanned_idempotency_keys,
2139            removed_stale_idempotency_keys,
2140            rebuilt_missing_idempotency_keys,
2141            healthy_after: stale_after == 0
2142                && missing_after == 0
2143                && summary.duplicate_active_records == 0,
2144        })
2145    }
2146
2147    async fn export(&self, request: ExportRequest) -> Result<PortableStorePackage> {
2148        let exported_at_unix_ms = Self::now_unix_ms()?;
2149        let mut namespaces = BTreeSet::new();
2150        let mut records = Vec::new();
2151        for stored in self.iterate_records()? {
2152            if request
2153                .tenant_id
2154                .as_deref()
2155                .is_some_and(|tenant_id| stored.record.scope.tenant_id != tenant_id)
2156            {
2157                continue;
2158            }
2159            if request
2160                .namespace
2161                .as_deref()
2162                .is_some_and(|namespace| stored.record.scope.namespace != namespace)
2163            {
2164                continue;
2165            }
2166            if !request.include_archived
2167                && stored.record.quality_state == MemoryQualityState::Archived
2168            {
2169                continue;
2170            }
2171            namespaces.insert(format!(
2172                "{}:{}",
2173                stored.record.scope.tenant_id, stored.record.scope.namespace
2174            ));
2175            records.push(PortableRecord {
2176                record: stored.record,
2177                idempotency_key: stored.idempotency_key,
2178            });
2179        }
2180
2181        let storage_bytes = records
2182            .iter()
2183            .map(|entry| {
2184                entry.record.content.len()
2185                    + entry.record.summary.as_deref().map(str::len).unwrap_or(0)
2186            })
2187            .sum::<usize>() as u64;
2188
2189        Ok(PortableStorePackage {
2190            package_version: PORTABLE_PACKAGE_VERSION,
2191            exported_at_unix_ms,
2192            manifest: SnapshotManifest {
2193                snapshot_id: format!("portable-export-{exported_at_unix_ms}"),
2194                created_at_unix_ms: exported_at_unix_ms,
2195                namespaces: namespaces.into_iter().collect(),
2196                record_count: records.len() as u64,
2197                storage_bytes,
2198                engine: self.config.engine_config.tuning_info(),
2199            },
2200            records,
2201        })
2202    }
2203
2204    async fn import(&self, request: ImportRequest) -> Result<ImportReport> {
2205        let snapshot_id = request.package.manifest.snapshot_id.clone();
2206        let package_version = request.package.package_version;
2207        let (validated_records, compatible_package, failed_records, entries) =
2208            self.validate_import_request(&request);
2209        let apply_changes = compatible_package
2210            && failed_records.is_empty()
2211            && !request.dry_run
2212            && !matches!(request.mode, ImportMode::Validate);
2213        let mut imported_records = 0u64;
2214        let mut skipped_records = 0u64;
2215        if apply_changes && matches!(request.mode, ImportMode::Replace) {
2216            self.clear_all_records()?;
2217        }
2218
2219        for entry in entries {
2220            if matches!(request.mode, ImportMode::Merge)
2221                && self
2222                    .records
2223                    .contains_key(entry.record.id.as_bytes())
2224                    .map_err(|err| {
2225                        Error::Backend(format!("failed to check record presence: {err}"))
2226                    })?
2227            {
2228                skipped_records += 1;
2229                continue;
2230            }
2231            if apply_changes {
2232                self.persist_imported_record(&StoredRecord {
2233                    record: entry.record,
2234                    idempotency_key: entry.idempotency_key,
2235                })?;
2236            }
2237            imported_records += 1;
2238        }
2239        if apply_changes {
2240            self.db
2241                .flush_async()
2242                .await
2243                .map_err(|err| Error::Backend(format!("failed to flush sled db: {err}")))?;
2244        }
2245
2246        Ok(ImportReport {
2247            mode: request.mode,
2248            dry_run: request.dry_run,
2249            applied: apply_changes,
2250            compatible_package,
2251            package_version,
2252            validated_records,
2253            imported_records,
2254            skipped_records,
2255            replaced_existing: matches!(request.mode, ImportMode::Replace),
2256            snapshot_id,
2257            failed_records,
2258        })
2259    }
2260}