Skip to main content

mnemara_store_file/
lib.rs

1use async_trait::async_trait;
2use mnemara_core::{
3    ArchiveReceipt, ArchiveRequest, BatchUpsertRequest, CompactionReport, CompactionRequest,
4    ConfiguredRecallScorer, DeleteReceipt, DeleteRequest, EngineConfig, Error, ExportRequest,
5    ImportFailure, ImportMode, ImportReport, ImportRequest, IntegrityCheckReport,
6    IntegrityCheckRequest, LineageLink, LineageRelationKind, MaintenanceStats,
7    MemoryHistoricalState, MemoryQualityState, MemoryRecord, MemoryScope, MemoryStore,
8    MemoryTrustLevel, NamespaceStats, PlannedRecallCandidate, PortableRecord, PortableStorePackage,
9    RecallExplanation, RecallHistoricalMode, RecallHit, RecallPlanner, RecallPlanningProfile,
10    RecallPlanningTrace, RecallQuery, RecallResult, RecallScorer, RecallTemporalOrder,
11    RecallTraceCandidate, RecoverReceipt, RecoverRequest, RepairReport, RepairRequest, Result,
12    SemanticEmbedder, SnapshotManifest, StoreStatsReport, StoreStatsRequest, SuppressReceipt,
13    SuppressRequest, UpsertReceipt, UpsertRequest,
14};
15use serde::{Deserialize, Serialize};
16use std::collections::{BTreeMap, BTreeSet, HashMap};
17use std::fmt;
18use std::fs;
19use std::hash::{Hash, Hasher};
20use std::path::{Path, PathBuf};
21use std::sync::Arc;
22use std::time::{SystemTime, UNIX_EPOCH};
23
24#[derive(Clone)]
25pub struct FileStoreConfig {
26    pub data_dir: PathBuf,
27    pub engine_config: EngineConfig,
28    pub shared_embedder: Option<SharedEmbedderConfig>,
29}
30
31#[derive(Clone)]
32pub struct SharedEmbedderConfig {
33    pub embedder: Arc<dyn SemanticEmbedder>,
34    pub provider_note: String,
35}
36
37impl SharedEmbedderConfig {
38    fn new(embedder: Arc<dyn SemanticEmbedder>, provider_note: impl Into<String>) -> Self {
39        Self {
40            embedder,
41            provider_note: provider_note.into(),
42        }
43    }
44}
45
46impl fmt::Debug for SharedEmbedderConfig {
47    fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result {
48        formatter
49            .debug_struct("SharedEmbedderConfig")
50            .field("provider_note", &self.provider_note)
51            .finish()
52    }
53}
54
55impl fmt::Debug for FileStoreConfig {
56    fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result {
57        formatter
58            .debug_struct("FileStoreConfig")
59            .field("data_dir", &self.data_dir)
60            .field("engine_config", &self.engine_config)
61            .field("shared_embedder", &self.shared_embedder)
62            .finish()
63    }
64}
65
66impl FileStoreConfig {
67    pub fn new(data_dir: impl AsRef<Path>) -> Self {
68        Self {
69            data_dir: data_dir.as_ref().to_path_buf(),
70            engine_config: EngineConfig::default(),
71            shared_embedder: None,
72        }
73    }
74
75    pub fn with_engine_config(mut self, engine_config: EngineConfig) -> Self {
76        self.engine_config = engine_config;
77        self
78    }
79
80    pub fn with_shared_embedder(
81        mut self,
82        embedder: Arc<dyn SemanticEmbedder>,
83        provider_note: impl Into<String>,
84    ) -> Self {
85        self.shared_embedder = Some(SharedEmbedderConfig::new(embedder, provider_note));
86        self
87    }
88
89    fn recall_planner(&self) -> RecallPlanner {
90        if let Some(shared_embedder) = &self.shared_embedder {
91            RecallPlanner::with_shared_embedder(
92                self.engine_config.recall_planning_profile,
93                self.engine_config.graph_expansion_max_hops,
94                self.engine_config.recall_scorer_kind,
95                self.engine_config.recall_scoring_profile,
96                self.engine_config.recall_policy_profile,
97                Arc::clone(&shared_embedder.embedder),
98                shared_embedder.provider_note.clone(),
99            )
100        } else {
101            RecallPlanner::from_engine_config(&self.engine_config)
102        }
103    }
104}
105
106#[derive(Debug)]
107pub struct FileMemoryStore {
108    config: FileStoreConfig,
109}
110
111#[derive(Debug, Clone, Serialize, Deserialize)]
112struct StoredRecord {
113    record: MemoryRecord,
114    idempotency_key: Option<String>,
115}
116
117#[derive(Debug, Clone)]
118struct IdempotencyMapping {
119    scoped_key: String,
120    record_id: String,
121}
122
123type ScopeKeyParts = (
124    String,
125    String,
126    String,
127    Option<String>,
128    Option<String>,
129    String,
130);
131
132#[derive(Debug, Clone, Copy, Default)]
133struct IntegritySummary {
134    scanned_records: u64,
135    scanned_idempotency_keys: u64,
136    stale_idempotency_keys: u64,
137    missing_idempotency_keys: u64,
138    duplicate_active_records: u64,
139}
140
141#[derive(Debug, Clone, Copy, Default)]
142struct RelativeTemporalBounds {
143    after_unix_ms: Option<u64>,
144    before_unix_ms: Option<u64>,
145}
146
147const PORTABLE_PACKAGE_VERSION: u32 = 1;
148
149impl FileMemoryStore {
150    pub fn open(config: FileStoreConfig) -> Result<Self> {
151        fs::create_dir_all(Self::records_dir(&config.data_dir)).map_err(|err| {
152            Error::Backend(format!(
153                "failed to create records dir {}: {err}",
154                Self::records_dir(&config.data_dir).display()
155            ))
156        })?;
157        fs::create_dir_all(Self::idempotency_dir(&config.data_dir)).map_err(|err| {
158            Error::Backend(format!(
159                "failed to create idempotency dir {}: {err}",
160                Self::idempotency_dir(&config.data_dir).display()
161            ))
162        })?;
163        Ok(Self { config })
164    }
165
166    fn records_dir(data_dir: &Path) -> PathBuf {
167        data_dir.join("records")
168    }
169
170    fn idempotency_dir(data_dir: &Path) -> PathBuf {
171        data_dir.join("idempotency")
172    }
173
174    fn record_path(&self, record_id: &str) -> PathBuf {
175        Self::records_dir(&self.config.data_dir).join(format!("{}.json", hex_key(record_id)))
176    }
177
178    fn idempotency_scope_key(scope: &MemoryScope, key: &str) -> String {
179        format!(
180            "{}\u{1f}{}\u{1f}{}\u{1f}{}\u{1f}{}\u{1f}{}",
181            scope.tenant_id,
182            scope.namespace,
183            scope.actor_id,
184            scope.conversation_id.as_deref().unwrap_or(""),
185            scope.session_id.as_deref().unwrap_or(""),
186            key
187        )
188    }
189
190    fn idempotency_path(&self, scope: &MemoryScope, key: &str) -> PathBuf {
191        let scoped = Self::idempotency_scope_key(scope, key);
192        Self::idempotency_dir(&self.config.data_dir).join(format!("{}.txt", hex_key(&scoped)))
193    }
194
195    fn validate_record(&self, record: &MemoryRecord) -> Result<()> {
196        record.validate()
197    }
198
199    fn validate_upsert_request(&self, request: &UpsertRequest) -> Result<()> {
200        self.validate_record(&request.record)?;
201        if request.idempotency_key.is_none()
202            && self
203                .config
204                .engine_config
205                .ingestion
206                .idempotent_writes_required
207        {
208            return Err(Error::InvalidRequest(
209                "idempotency_key is required by the current ingestion policy".to_string(),
210            ));
211        }
212        if self.config.engine_config.ingestion.require_source_labels
213            && request.record.scope.labels.is_empty()
214        {
215            return Err(Error::InvalidRequest(
216                "at least one source label is required by the current ingestion policy".to_string(),
217            ));
218        }
219        Ok(())
220    }
221
222    fn validate_delete_request(&self, request: &DeleteRequest) -> Result<()> {
223        if request.tenant_id.trim().is_empty() {
224            return Err(Error::InvalidRequest(
225                "delete tenant_id is required".to_string(),
226            ));
227        }
228        if request.namespace.trim().is_empty() {
229            return Err(Error::InvalidRequest(
230                "delete namespace is required".to_string(),
231            ));
232        }
233        if request.record_id.trim().is_empty() {
234            return Err(Error::InvalidRequest(
235                "delete record_id is required".to_string(),
236            ));
237        }
238        if request.audit_reason.trim().is_empty() {
239            return Err(Error::InvalidRequest(
240                "delete audit_reason is required".to_string(),
241            ));
242        }
243        Ok(())
244    }
245
246    fn validate_archive_request(&self, request: &ArchiveRequest) -> Result<()> {
247        Self::validate_lifecycle_request(
248            "archive",
249            &request.tenant_id,
250            &request.namespace,
251            &request.record_id,
252            &request.audit_reason,
253        )
254    }
255
256    fn validate_suppress_request(&self, request: &SuppressRequest) -> Result<()> {
257        Self::validate_lifecycle_request(
258            "suppress",
259            &request.tenant_id,
260            &request.namespace,
261            &request.record_id,
262            &request.audit_reason,
263        )
264    }
265
266    fn validate_recover_request(&self, request: &RecoverRequest) -> Result<()> {
267        Self::validate_lifecycle_request(
268            "recover",
269            &request.tenant_id,
270            &request.namespace,
271            &request.record_id,
272            &request.audit_reason,
273        )?;
274        match request.quality_state {
275            MemoryQualityState::Active | MemoryQualityState::Verified => {}
276            _ => {
277                return Err(Error::InvalidRequest(
278                    "recover quality_state must be Active or Verified".to_string(),
279                ));
280            }
281        }
282        if matches!(
283            request.historical_state,
284            Some(MemoryHistoricalState::Superseded)
285        ) {
286            return Err(Error::InvalidRequest(
287                "recover historical_state cannot be Superseded".to_string(),
288            ));
289        }
290        Ok(())
291    }
292
293    fn validate_lifecycle_request(
294        action: &str,
295        tenant_id: &str,
296        namespace: &str,
297        record_id: &str,
298        audit_reason: &str,
299    ) -> Result<()> {
300        if tenant_id.trim().is_empty() {
301            return Err(Error::InvalidRequest(format!(
302                "{action} tenant_id is required"
303            )));
304        }
305        if namespace.trim().is_empty() {
306            return Err(Error::InvalidRequest(format!(
307                "{action} namespace is required"
308            )));
309        }
310        if record_id.trim().is_empty() {
311            return Err(Error::InvalidRequest(format!(
312                "{action} record_id is required"
313            )));
314        }
315        if audit_reason.trim().is_empty() {
316            return Err(Error::InvalidRequest(format!(
317                "{action} audit_reason is required"
318            )));
319        }
320        Ok(())
321    }
322
323    fn validate_record_scope(
324        stored: &StoredRecord,
325        tenant_id: &str,
326        namespace: &str,
327    ) -> Result<()> {
328        if stored.record.scope.tenant_id != tenant_id {
329            return Err(Error::InvalidRequest(format!(
330                "record {} does not belong to tenant {}",
331                stored.record.id, tenant_id
332            )));
333        }
334        if stored.record.scope.namespace != namespace {
335            return Err(Error::InvalidRequest(format!(
336                "record {} does not belong to namespace {}",
337                stored.record.id, namespace
338            )));
339        }
340        Ok(())
341    }
342
343    fn validate_import_request(
344        &self,
345        request: &ImportRequest,
346    ) -> (u64, bool, Vec<ImportFailure>, Vec<PortableRecord>) {
347        let mut validated_records = 0u64;
348        let mut failures = Vec::new();
349        let mut entries = Vec::with_capacity(request.package.records.len());
350
351        if request.package.package_version != PORTABLE_PACKAGE_VERSION {
352            failures.push(ImportFailure {
353                record_id: None,
354                reason: format!(
355                    "unsupported portable package version {}; expected {}",
356                    request.package.package_version, PORTABLE_PACKAGE_VERSION
357                ),
358            });
359        }
360
361        if request.package.manifest.record_count != request.package.records.len() as u64 {
362            failures.push(ImportFailure {
363                record_id: None,
364                reason: format!(
365                    "portable package manifest record_count={} does not match payload records={}",
366                    request.package.manifest.record_count,
367                    request.package.records.len()
368                ),
369            });
370        }
371
372        for entry in &request.package.records {
373            match self.validate_record(&entry.record) {
374                Ok(()) => {
375                    validated_records += 1;
376                    entries.push(entry.clone());
377                }
378                Err(error) => failures.push(ImportFailure {
379                    record_id: Some(entry.record.id.clone()),
380                    reason: error.to_string(),
381                }),
382            }
383        }
384
385        (validated_records, failures.is_empty(), failures, entries)
386    }
387
388    fn is_pinned(record: &MemoryRecord) -> bool {
389        record.scope.trust_level == MemoryTrustLevel::Pinned
390    }
391
392    fn retention_exempt(&self, record: &MemoryRecord) -> bool {
393        self.config.engine_config.retention.pinned_records_exempt && Self::is_pinned(record)
394    }
395
396    fn now_unix_ms() -> Result<u64> {
397        SystemTime::now()
398            .duration_since(UNIX_EPOCH)
399            .map_err(|err| Error::Backend(format!("system clock error: {err}")))
400            .map(|value| value.as_millis() as u64)
401    }
402
403    fn load_record(&self, record_id: &str) -> Result<Option<StoredRecord>> {
404        let path = self.record_path(record_id);
405        if !path.exists() {
406            return Ok(None);
407        }
408        let raw = fs::read(&path).map_err(|err| {
409            Error::Backend(format!("failed to read record {}: {err}", path.display()))
410        })?;
411        let stored = serde_json::from_slice::<StoredRecord>(&raw).map_err(|err| {
412            Error::Backend(format!("failed to decode record {}: {err}", path.display()))
413        })?;
414        Ok(Some(stored))
415    }
416
417    fn persist_record(&self, stored: &StoredRecord) -> Result<()> {
418        let path = self.record_path(&stored.record.id);
419        let encoded = serde_json::to_vec(stored)
420            .map_err(|err| Error::Backend(format!("failed to encode record: {err}")))?;
421        fs::write(&path, encoded).map_err(|err| {
422            Error::Backend(format!("failed to write record {}: {err}", path.display()))
423        })?;
424        Ok(())
425    }
426
427    fn persist_imported_record(&self, stored: &StoredRecord) -> Result<()> {
428        self.persist_record(stored)?;
429        if let Some(idempotency_key) = &stored.idempotency_key {
430            fs::write(
431                self.idempotency_path(&stored.record.scope, idempotency_key),
432                stored.record.id.as_bytes(),
433            )
434            .map_err(|err| Error::Backend(format!("failed to write idempotency mapping: {err}")))?;
435        }
436        Ok(())
437    }
438
439    fn remove_record(&self, record_id: &str) -> Result<bool> {
440        let path = self.record_path(record_id);
441        if !path.exists() {
442            return Ok(false);
443        }
444        fs::remove_file(&path).map_err(|err| {
445            Error::Backend(format!("failed to remove record {}: {err}", path.display()))
446        })?;
447        Ok(true)
448    }
449
450    fn remove_idempotency_mapping(&self, stored: &StoredRecord) -> Result<()> {
451        if let Some(idempotency_key) = &stored.idempotency_key {
452            let path = self.idempotency_path(&stored.record.scope, idempotency_key);
453            if path.exists() {
454                fs::remove_file(&path).map_err(|err| {
455                    Error::Backend(format!(
456                        "failed to remove idempotency mapping {}: {err}",
457                        path.display()
458                    ))
459                })?;
460            }
461        }
462        Ok(())
463    }
464
465    fn clear_all_records(&self) -> Result<()> {
466        for stored in self.iterate_records()? {
467            self.remove_record(&stored.record.id)?;
468            self.remove_idempotency_mapping(&stored)?;
469        }
470        Ok(())
471    }
472
473    fn iterate_records(&self) -> Result<Vec<StoredRecord>> {
474        let mut records = Vec::new();
475        let dir = Self::records_dir(&self.config.data_dir);
476        for entry in fs::read_dir(&dir).map_err(|err| {
477            Error::Backend(format!(
478                "failed to read records dir {}: {err}",
479                dir.display()
480            ))
481        })? {
482            let entry = entry.map_err(|err| {
483                Error::Backend(format!(
484                    "failed to iterate records dir {}: {err}",
485                    dir.display()
486                ))
487            })?;
488            let path = entry.path();
489            if path.extension().and_then(|ext| ext.to_str()) != Some("json") {
490                continue;
491            }
492            let raw = fs::read(&path).map_err(|err| {
493                Error::Backend(format!("failed to read record {}: {err}", path.display()))
494            })?;
495            let stored = serde_json::from_slice::<StoredRecord>(&raw).map_err(|err| {
496                Error::Backend(format!("failed to decode record {}: {err}", path.display()))
497            })?;
498            records.push(stored);
499        }
500        Ok(records)
501    }
502
503    fn iterate_idempotency_mappings(&self) -> Result<Vec<IdempotencyMapping>> {
504        let mut mappings = Vec::new();
505        for entry in fs::read_dir(Self::idempotency_dir(&self.config.data_dir)).map_err(|err| {
506            Error::Backend(format!(
507                "failed to read idempotency dir {}: {err}",
508                Self::idempotency_dir(&self.config.data_dir).display()
509            ))
510        })? {
511            let entry = entry.map_err(|err| {
512                Error::Backend(format!("failed to iterate idempotency dir: {err}"))
513            })?;
514            let path = entry.path();
515            if path.extension().and_then(|value| value.to_str()) != Some("txt") {
516                continue;
517            }
518            let Some(stem) = path.file_stem().and_then(|value| value.to_str()) else {
519                continue;
520            };
521            let Some(scoped_key) = unhex_key(stem) else {
522                continue;
523            };
524            let record_id = fs::read_to_string(&path).map_err(|err| {
525                Error::Backend(format!(
526                    "failed to read idempotency mapping {}: {err}",
527                    path.display()
528                ))
529            })?;
530            mappings.push(IdempotencyMapping {
531                scoped_key,
532                record_id,
533            });
534        }
535        Ok(mappings)
536    }
537
538    fn parse_scope_key(scoped_key: &str) -> Option<ScopeKeyParts> {
539        let parts = scoped_key.split('\u{1f}').collect::<Vec<_>>();
540        if parts.len() != 6 {
541            return None;
542        }
543        Some((
544            parts[0].to_string(),
545            parts[1].to_string(),
546            parts[2].to_string(),
547            (!parts[3].is_empty()).then(|| parts[3].to_string()),
548            (!parts[4].is_empty()).then(|| parts[4].to_string()),
549            parts[5].to_string(),
550        ))
551    }
552
553    fn scope_matches_filters(
554        tenant_id: &str,
555        namespace: &str,
556        tenant_filter: Option<&str>,
557        namespace_filter: Option<&str>,
558    ) -> bool {
559        tenant_filter.is_none_or(|expected| tenant_id == expected)
560            && namespace_filter.is_none_or(|expected| namespace == expected)
561    }
562
563    fn build_integrity_summary(
564        &self,
565        tenant_filter: Option<&str>,
566        namespace_filter: Option<&str>,
567    ) -> Result<IntegritySummary> {
568        let records = self.iterate_records()?;
569        let mappings = self.iterate_idempotency_mappings()?;
570        let filtered_records = records
571            .iter()
572            .filter(|stored| {
573                Self::scope_matches_filters(
574                    &stored.record.scope.tenant_id,
575                    &stored.record.scope.namespace,
576                    tenant_filter,
577                    namespace_filter,
578                )
579            })
580            .collect::<Vec<_>>();
581
582        let mut mapping_lookup = HashMap::new();
583        let mut stale_idempotency_keys = 0u64;
584        let mut scanned_idempotency_keys = 0u64;
585        for mapping in &mappings {
586            let Some((tenant_id, namespace, _, _, _, idempotency_key)) =
587                Self::parse_scope_key(&mapping.scoped_key)
588            else {
589                stale_idempotency_keys += 1;
590                scanned_idempotency_keys += 1;
591                continue;
592            };
593            if !Self::scope_matches_filters(&tenant_id, &namespace, tenant_filter, namespace_filter)
594            {
595                continue;
596            }
597            scanned_idempotency_keys += 1;
598            let Some(stored) = self.load_record(&mapping.record_id)? else {
599                stale_idempotency_keys += 1;
600                continue;
601            };
602            if stored.record.scope.tenant_id != tenant_id
603                || stored.record.scope.namespace != namespace
604                || stored.idempotency_key.as_deref() != Some(idempotency_key.as_str())
605                || Self::idempotency_scope_key(&stored.record.scope, &idempotency_key)
606                    != mapping.scoped_key
607            {
608                stale_idempotency_keys += 1;
609                continue;
610            }
611            mapping_lookup.insert(mapping.scoped_key.clone(), mapping.record_id.clone());
612        }
613
614        let mut duplicate_groups = HashMap::<String, usize>::new();
615        let mut missing_idempotency_keys = 0u64;
616        let mut duplicate_active_records = 0u64;
617        for stored in &filtered_records {
618            if let Some(idempotency_key) = &stored.idempotency_key {
619                let scoped_key = Self::idempotency_scope_key(&stored.record.scope, idempotency_key);
620                if mapping_lookup.get(&scoped_key) != Some(&stored.record.id) {
621                    missing_idempotency_keys += 1;
622                }
623            }
624
625            if !matches!(
626                stored.record.quality_state,
627                MemoryQualityState::Archived
628                    | MemoryQualityState::Deleted
629                    | MemoryQualityState::Suppressed
630            ) {
631                *duplicate_groups
632                    .entry(Self::dedup_signature(&stored.record))
633                    .or_default() += 1;
634            }
635        }
636
637        for group_size in duplicate_groups.into_values() {
638            if group_size > 1 {
639                duplicate_active_records += (group_size - 1) as u64;
640            }
641        }
642
643        Ok(IntegritySummary {
644            scanned_records: filtered_records.len() as u64,
645            scanned_idempotency_keys,
646            stale_idempotency_keys,
647            missing_idempotency_keys,
648            duplicate_active_records,
649        })
650    }
651
652    fn build_stats_report(&self, request: &StoreStatsRequest) -> Result<StoreStatsReport> {
653        let records = self.iterate_records()?;
654        let tenant_filter = request.tenant_id.as_deref();
655        let namespace_filter = request.namespace.as_deref();
656        let filtered_records = records
657            .iter()
658            .filter(|stored| {
659                Self::scope_matches_filters(
660                    &stored.record.scope.tenant_id,
661                    &stored.record.scope.namespace,
662                    tenant_filter,
663                    namespace_filter,
664                )
665            })
666            .collect::<Vec<_>>();
667        let now_unix_ms = Self::now_unix_ms()?;
668        let integrity = self.build_integrity_summary(tenant_filter, namespace_filter)?;
669        let mut namespace_map = BTreeMap::<(String, String), NamespaceStats>::new();
670        let mut duplicate_groups = HashMap::<String, usize>::new();
671        let mut tombstoned_records = 0u64;
672        let mut expired_records = 0u64;
673        let mut historical_records = 0u64;
674        let mut superseded_records = 0u64;
675        let mut lineage_links = 0u64;
676
677        for stored in &filtered_records {
678            let key = (
679                stored.record.scope.tenant_id.clone(),
680                stored.record.scope.namespace.clone(),
681            );
682            let entry = namespace_map
683                .entry(key.clone())
684                .or_insert_with(|| NamespaceStats {
685                    tenant_id: key.0.clone(),
686                    namespace: key.1.clone(),
687                    active_records: 0,
688                    archived_records: 0,
689                    deleted_records: 0,
690                    suppressed_records: 0,
691                    pinned_records: 0,
692                });
693            match stored.record.quality_state {
694                MemoryQualityState::Archived => entry.archived_records += 1,
695                MemoryQualityState::Deleted => {
696                    entry.deleted_records += 1;
697                    tombstoned_records += 1;
698                }
699                MemoryQualityState::Suppressed => entry.suppressed_records += 1,
700                _ => entry.active_records += 1,
701            }
702            if stored.record.scope.trust_level == MemoryTrustLevel::Pinned {
703                entry.pinned_records += 1;
704            }
705            if stored
706                .record
707                .expires_at_unix_ms
708                .is_some_and(|value| value <= now_unix_ms)
709            {
710                expired_records += 1;
711            }
712            if matches!(
713                stored.record.historical_state,
714                MemoryHistoricalState::Historical
715            ) {
716                historical_records += 1;
717            }
718            if matches!(
719                stored.record.historical_state,
720                MemoryHistoricalState::Superseded
721            ) {
722                superseded_records += 1;
723            }
724            lineage_links += stored.record.lineage.len() as u64;
725            if !matches!(
726                stored.record.quality_state,
727                MemoryQualityState::Archived
728                    | MemoryQualityState::Deleted
729                    | MemoryQualityState::Suppressed
730            ) {
731                *duplicate_groups
732                    .entry(Self::dedup_signature(&stored.record))
733                    .or_default() += 1;
734            }
735        }
736
737        let mut duplicate_candidate_groups = 0u64;
738        let mut duplicate_candidate_records = 0u64;
739        for group_size in duplicate_groups.into_values() {
740            if group_size > 1 {
741                duplicate_candidate_groups += 1;
742                duplicate_candidate_records += (group_size - 1) as u64;
743            }
744        }
745
746        Ok(StoreStatsReport {
747            generated_at_unix_ms: now_unix_ms,
748            total_records: filtered_records.len() as u64,
749            storage_bytes: dir_size(&self.config.data_dir)?,
750            namespaces: namespace_map.into_values().collect(),
751            maintenance: MaintenanceStats {
752                duplicate_candidate_groups,
753                duplicate_candidate_records,
754                tombstoned_records,
755                expired_records,
756                stale_idempotency_keys: integrity.stale_idempotency_keys,
757                historical_records,
758                superseded_records,
759                lineage_links,
760            },
761            engine: self.config.engine_config.tuning_info(),
762        })
763    }
764
765    fn matches_scope(candidate: &MemoryScope, query: &MemoryScope) -> bool {
766        candidate.tenant_id == query.tenant_id
767            && candidate.namespace == query.namespace
768            && candidate.actor_id == query.actor_id
769            && (query.conversation_id.is_none()
770                || candidate.conversation_id == query.conversation_id)
771            && (query.session_id.is_none() || candidate.session_id == query.session_id)
772    }
773
774    fn record_passes_filters(
775        record: &MemoryRecord,
776        query: &RecallQuery,
777        relative_bounds: RelativeTemporalBounds,
778    ) -> bool {
779        if !Self::matches_scope(&record.scope, &query.scope) {
780            return false;
781        }
782
783        if let Some(expires_at_unix_ms) = record.expires_at_unix_ms
784            && expires_at_unix_ms <= Self::now_unix_ms().unwrap_or(u64::MAX)
785        {
786            return false;
787        }
788
789        if let Some(min_score) = query.filters.min_importance_score
790            && record.importance_score < min_score
791        {
792            return false;
793        }
794
795        if let Some(source) = &query.filters.source
796            && &record.scope.source != source
797        {
798            return false;
799        }
800
801        if let Some(from_unix_ms) = query.filters.from_unix_ms
802            && record.updated_at_unix_ms < from_unix_ms
803        {
804            return false;
805        }
806
807        if let Some(to_unix_ms) = query.filters.to_unix_ms
808            && record.updated_at_unix_ms > to_unix_ms
809        {
810            return false;
811        }
812
813        if let Some(after_unix_ms) = relative_bounds.after_unix_ms
814            && Self::record_temporal_anchor(record) <= after_unix_ms
815        {
816            return false;
817        }
818
819        if let Some(before_unix_ms) = relative_bounds.before_unix_ms
820            && Self::record_temporal_anchor(record) >= before_unix_ms
821        {
822            return false;
823        }
824
825        if !query.filters.trust_levels.is_empty()
826            && !query
827                .filters
828                .trust_levels
829                .contains(&record.scope.trust_level)
830        {
831            return false;
832        }
833
834        if !query.filters.required_labels.is_empty()
835            && !query.filters.required_labels.iter().all(|label| {
836                record
837                    .scope
838                    .labels
839                    .iter()
840                    .any(|candidate| candidate == label)
841            })
842        {
843            return false;
844        }
845
846        if !query.filters.kinds.is_empty() && !query.filters.kinds.contains(&record.kind) {
847            return false;
848        }
849
850        if let Some(episode_id) = &query.filters.episode_id
851            && record.episode.as_ref().map(|episode| &episode.episode_id) != Some(episode_id)
852        {
853            return false;
854        }
855
856        if !query.filters.continuity_states.is_empty()
857            && !record.episode.as_ref().is_some_and(|episode| {
858                query
859                    .filters
860                    .continuity_states
861                    .contains(&episode.continuity_state)
862            })
863        {
864            return false;
865        }
866
867        if query.filters.unresolved_only
868            && !record
869                .episode
870                .as_ref()
871                .is_some_and(|episode| episode.continuity_state.is_unresolved())
872        {
873            return false;
874        }
875
876        if let Some(lineage_record_id) = &query.filters.lineage_record_id
877            && record.id != *lineage_record_id
878            && !record
879                .lineage
880                .iter()
881                .any(|link| &link.record_id == lineage_record_id)
882        {
883            return false;
884        }
885
886        if !query.filters.boundary_labels.is_empty()
887            && !record.episode.as_ref().is_some_and(|episode| {
888                episode.boundary_label.as_ref().is_some_and(|label| {
889                    query
890                        .filters
891                        .boundary_labels
892                        .iter()
893                        .any(|expected| expected == label)
894                })
895            })
896        {
897            return false;
898        }
899
900        if let Some(recurrence_key) = &query.filters.recurrence_key
901            && record
902                .episode
903                .as_ref()
904                .and_then(|episode| episode.recurrence_key.as_ref())
905                != Some(recurrence_key)
906        {
907            return false;
908        }
909
910        if !query.filters.conflict_states.is_empty()
911            && !record
912                .conflict
913                .as_ref()
914                .is_some_and(|conflict| query.filters.conflict_states.contains(&conflict.state))
915        {
916            return false;
917        }
918
919        if !query.filters.resolution_kinds.is_empty()
920            && !record.conflict.as_ref().is_some_and(|conflict| {
921                query
922                    .filters
923                    .resolution_kinds
924                    .contains(&conflict.resolution)
925            })
926        {
927            return false;
928        }
929
930        if query.filters.unresolved_conflicts_only
931            && !record.conflict.as_ref().is_some_and(|conflict| {
932                matches!(
933                    conflict.state,
934                    mnemara_core::ConflictReviewState::PotentialConflict
935                        | mnemara_core::ConflictReviewState::UnderReview
936                )
937            })
938        {
939            return false;
940        }
941
942        match query.filters.historical_mode {
943            RecallHistoricalMode::CurrentOnly => {
944                if !matches!(record.historical_state, MemoryHistoricalState::Current) {
945                    return false;
946                }
947            }
948            RecallHistoricalMode::HistoricalOnly => {
949                if matches!(record.historical_state, MemoryHistoricalState::Current) {
950                    return false;
951                }
952            }
953            RecallHistoricalMode::IncludeHistorical => {}
954        }
955
956        if !query.filters.states.is_empty() {
957            if !query.filters.states.contains(&record.quality_state) {
958                return false;
959            }
960        } else {
961            match record.quality_state {
962                MemoryQualityState::Archived if !query.filters.include_archived => return false,
963                MemoryQualityState::Deleted | MemoryQualityState::Suppressed => return false,
964                _ => {}
965            }
966        }
967
968        true
969    }
970
971    fn relative_temporal_bounds(
972        records: &[StoredRecord],
973        query: &RecallQuery,
974    ) -> Result<RelativeTemporalBounds> {
975        let mut bounds = RelativeTemporalBounds::default();
976        if let Some(after_record_id) = &query.filters.after_record_id {
977            let Some(anchor) = records
978                .iter()
979                .find(|stored| {
980                    stored.record.id == *after_record_id
981                        && Self::matches_scope(&stored.record.scope, &query.scope)
982                })
983                .map(|stored| Self::record_temporal_anchor(&stored.record))
984            else {
985                return Err(Error::InvalidRequest(format!(
986                    "after_record_id '{after_record_id}' was not found in recall scope"
987                )));
988            };
989            bounds.after_unix_ms = Some(anchor);
990        }
991        if let Some(before_record_id) = &query.filters.before_record_id {
992            let Some(anchor) = records
993                .iter()
994                .find(|stored| {
995                    stored.record.id == *before_record_id
996                        && Self::matches_scope(&stored.record.scope, &query.scope)
997                })
998                .map(|stored| Self::record_temporal_anchor(&stored.record))
999            else {
1000                return Err(Error::InvalidRequest(format!(
1001                    "before_record_id '{before_record_id}' was not found in recall scope"
1002                )));
1003            };
1004            bounds.before_unix_ms = Some(anchor);
1005        }
1006        if let (Some(after), Some(before)) = (bounds.after_unix_ms, bounds.before_unix_ms)
1007            && after >= before
1008        {
1009            return Err(Error::InvalidRequest(
1010                "after_record_id must refer to an earlier record than before_record_id".to_string(),
1011            ));
1012        }
1013        Ok(bounds)
1014    }
1015
1016    fn approximate_tokens(record: &MemoryRecord) -> usize {
1017        let content_tokens = record.content.split_whitespace().count();
1018        let summary_tokens = record
1019            .summary
1020            .as_deref()
1021            .map(|summary| summary.split_whitespace().count())
1022            .unwrap_or(0);
1023        content_tokens + summary_tokens
1024    }
1025
1026    fn record_temporal_anchor(record: &MemoryRecord) -> u64 {
1027        record
1028            .episode
1029            .as_ref()
1030            .and_then(|episode| episode.last_active_unix_ms.or(episode.started_at_unix_ms))
1031            .unwrap_or(record.updated_at_unix_ms)
1032    }
1033
1034    fn selected_channels_for_hit(hit: &RecallHit, empty_query: bool) -> Vec<String> {
1035        let mut selected_channels = if empty_query {
1036            vec!["temporal".to_string(), "policy".to_string()]
1037        } else {
1038            vec!["lexical".to_string(), "policy".to_string()]
1039        };
1040        if hit.breakdown.semantic > 0.0 {
1041            selected_channels.push("semantic".to_string());
1042        }
1043        if hit.breakdown.metadata > 0.0 {
1044            selected_channels.push("metadata".to_string());
1045        }
1046        if hit.breakdown.episodic > 0.0 {
1047            selected_channels.push("episodic".to_string());
1048        }
1049        if hit.breakdown.salience > 0.0 {
1050            selected_channels.push("salience".to_string());
1051        }
1052        if hit.breakdown.curation > 0.0 {
1053            selected_channels.push("curation".to_string());
1054        }
1055        if hit.record.conflict.is_some() {
1056            selected_channels.push("conflict".to_string());
1057        }
1058        selected_channels.sort();
1059        selected_channels.dedup();
1060        selected_channels
1061    }
1062
1063    fn planning_profile_note(profile: RecallPlanningProfile) -> &'static str {
1064        match profile {
1065            RecallPlanningProfile::FastPath => "planning_profile=fast_path",
1066            RecallPlanningProfile::ContinuityAware => "planning_profile=continuity_aware",
1067        }
1068    }
1069
1070    fn dedup_signature(record: &MemoryRecord) -> String {
1071        format!(
1072            "{}\u{1f}{}\u{1f}{}\u{1f}{}\u{1f}{}\u{1f}{}",
1073            record.scope.tenant_id,
1074            record.scope.namespace,
1075            record.scope.actor_id,
1076            record.kind as u8,
1077            record.content.trim().to_ascii_lowercase(),
1078            record
1079                .summary
1080                .clone()
1081                .unwrap_or_default()
1082                .trim()
1083                .to_ascii_lowercase()
1084        )
1085    }
1086
1087    fn summary_record_id(signature: &str) -> String {
1088        let mut hasher = std::collections::hash_map::DefaultHasher::new();
1089        signature.hash(&mut hasher);
1090        format!("compacted-summary-{:016x}", hasher.finish())
1091    }
1092
1093    fn compaction_summary_record(
1094        group: &[StoredRecord],
1095        signature: &str,
1096        now_unix_ms: u64,
1097    ) -> StoredRecord {
1098        let canonical = &group[0].record;
1099        let representative_summary = canonical
1100            .summary
1101            .clone()
1102            .filter(|value| !value.trim().is_empty())
1103            .unwrap_or_else(|| canonical.content.clone());
1104        let cluster_size = group.len();
1105        let max_importance_score = group
1106            .iter()
1107            .map(|stored| stored.record.importance_score)
1108            .fold(canonical.importance_score, f32::max);
1109
1110        let mut metadata = BTreeMap::new();
1111        metadata.insert(
1112            "compaction_reason".to_string(),
1113            "duplicate_cluster_rollup".to_string(),
1114        );
1115        metadata.insert(
1116            "compaction_cluster_size".to_string(),
1117            cluster_size.to_string(),
1118        );
1119        metadata.insert("representative_record_id".to_string(), canonical.id.clone());
1120
1121        let mut labels = canonical.scope.labels.clone();
1122        if !labels.iter().any(|label| label == "compacted") {
1123            labels.push("compacted".to_string());
1124        }
1125
1126        StoredRecord {
1127            record: MemoryRecord {
1128                id: Self::summary_record_id(signature),
1129                scope: MemoryScope {
1130                    tenant_id: canonical.scope.tenant_id.clone(),
1131                    namespace: canonical.scope.namespace.clone(),
1132                    actor_id: canonical.scope.actor_id.clone(),
1133                    conversation_id: canonical.scope.conversation_id.clone(),
1134                    session_id: canonical.scope.session_id.clone(),
1135                    source: canonical.scope.source.clone(),
1136                    labels,
1137                    trust_level: canonical.scope.trust_level,
1138                },
1139                kind: mnemara_core::MemoryRecordKind::Summary,
1140                content: format!(
1141                    "Compacted {} related records into a durable summary. Representative memory: {}",
1142                    cluster_size, representative_summary
1143                ),
1144                summary: Some(format!(
1145                    "{} related records: {}",
1146                    cluster_size, representative_summary
1147                )),
1148                source_id: None,
1149                metadata,
1150                quality_state: if matches!(canonical.quality_state, MemoryQualityState::Verified) {
1151                    MemoryQualityState::Verified
1152                } else {
1153                    MemoryQualityState::Active
1154                },
1155                created_at_unix_ms: now_unix_ms,
1156                updated_at_unix_ms: now_unix_ms,
1157                expires_at_unix_ms: None,
1158                importance_score: max_importance_score,
1159                artifact: canonical.artifact.clone(),
1160                episode: canonical.episode.clone(),
1161                historical_state: MemoryHistoricalState::Current,
1162                lineage: group
1163                    .iter()
1164                    .map(|stored| LineageLink {
1165                        record_id: stored.record.id.clone(),
1166                        relation: LineageRelationKind::ConsolidatedFrom,
1167                        confidence: 1.0,
1168                    })
1169                    .collect(),
1170                conflict: canonical.conflict.clone(),
1171            },
1172            idempotency_key: None,
1173        }
1174    }
1175
1176    fn cold_tiering_candidates(
1177        &self,
1178        tenant_id: &str,
1179        namespace: Option<&str>,
1180        now_unix_ms: u64,
1181    ) -> Result<Vec<StoredRecord>> {
1182        let cold_archive_after_days = self.config.engine_config.compaction.cold_archive_after_days;
1183        if cold_archive_after_days == 0 {
1184            return Ok(Vec::new());
1185        }
1186        let archive_threshold_ms =
1187            u64::from(cold_archive_after_days).saturating_mul(24 * 60 * 60 * 1_000);
1188        let max_importance = f32::from(
1189            self.config
1190                .engine_config
1191                .compaction
1192                .cold_archive_importance_threshold_per_mille,
1193        ) / 1000.0;
1194
1195        Ok(self
1196            .iterate_records()?
1197            .into_iter()
1198            .filter(|stored| stored.record.scope.tenant_id == tenant_id)
1199            .filter(|stored| namespace.is_none_or(|value| stored.record.scope.namespace == value))
1200            .filter(|stored| {
1201                matches!(
1202                    stored.record.quality_state,
1203                    MemoryQualityState::Draft
1204                        | MemoryQualityState::Active
1205                        | MemoryQualityState::Verified
1206                )
1207            })
1208            .filter(|stored| {
1209                stored.record.scope.trust_level != mnemara_core::MemoryTrustLevel::Pinned
1210            })
1211            .filter(|stored| {
1212                now_unix_ms.saturating_sub(stored.record.updated_at_unix_ms) > archive_threshold_ms
1213                    && stored.record.importance_score <= max_importance
1214            })
1215            .collect())
1216    }
1217
1218    fn build_explanations(
1219        &self,
1220        scorer: ConfiguredRecallScorer,
1221        planning_profile: RecallPlanningProfile,
1222        query: &RecallQuery,
1223        planned: &[PlannedRecallCandidate],
1224        selected_record_ids: &[String],
1225        trace_id: &str,
1226    ) -> (Vec<RecallHit>, Option<RecallExplanation>) {
1227        let selected_set = selected_record_ids.iter().cloned().collect::<BTreeSet<_>>();
1228        let hits = planned
1229            .iter()
1230            .filter(|candidate| selected_set.contains(&candidate.hit.record.id))
1231            .map(|candidate| {
1232                let mut enriched = candidate.hit.clone();
1233                if query.include_explanation {
1234                    let selected_channels = Self::selected_channels_for_hit(
1235                        &candidate.hit,
1236                        query.query_text.trim().is_empty(),
1237                    );
1238                    enriched.explanation = Some(RecallExplanation {
1239                        selected_channels,
1240                        policy_notes: vec![if query.query_text.trim().is_empty() {
1241                            "recent_scope_scan".to_string()
1242                        } else {
1243                            "initial_file_backend_scoring".to_string()
1244                        }],
1245                        trace_id: Some(trace_id.to_string()),
1246                        planning_trace: None,
1247                        planning_profile: Some(planning_profile),
1248                        policy_profile: Some(scorer.policy_profile()),
1249                        scorer_kind: Some(scorer.scorer_kind()),
1250                        scoring_profile: Some(scorer.scoring_profile()),
1251                    });
1252                    if let Some(explanation) = enriched.explanation.as_mut() {
1253                        explanation
1254                            .policy_notes
1255                            .push(scorer.profile_note().to_string());
1256                        explanation
1257                            .policy_notes
1258                            .push(scorer.policy_profile_note().to_string());
1259                        explanation
1260                            .policy_notes
1261                            .push(Self::planning_profile_note(planning_profile).to_string());
1262                        if let Some(note) = scorer.embedding_note() {
1263                            explanation.policy_notes.push(note.to_string());
1264                        }
1265                        if query.filters.episode_id.is_some() {
1266                            explanation
1267                                .policy_notes
1268                                .push("episode_filter_applied".to_string());
1269                        }
1270                        if query.filters.unresolved_only {
1271                            explanation
1272                                .policy_notes
1273                                .push("unresolved_only_filter_applied".to_string());
1274                        }
1275                        explanation.policy_notes.push(format!(
1276                            "matched_terms={}",
1277                            candidate.matched_terms.join(",")
1278                        ));
1279                    }
1280                }
1281                enriched
1282            })
1283            .collect::<Vec<_>>();
1284
1285        let planning_trace = query.include_explanation.then(|| RecallPlanningTrace {
1286            trace_id: trace_id.to_string(),
1287            token_budget_applied: query.token_budget.is_some(),
1288            candidates: planned
1289                .iter()
1290                .enumerate()
1291                .map(|(index, candidate)| {
1292                    let selected = selected_set.contains(&candidate.hit.record.id);
1293                    let selection_rank = selected_record_ids
1294                        .iter()
1295                        .position(|record_id| record_id == &candidate.hit.record.id)
1296                        .map(|position| position as u32 + 1);
1297                    let selected_channels = Self::selected_channels_for_hit(
1298                        &candidate.hit,
1299                        query.query_text.trim().is_empty(),
1300                    );
1301
1302                    let mut filter_reasons = Vec::new();
1303                    if selected {
1304                        filter_reasons.push("retained".to_string());
1305                    } else {
1306                        if index >= query.max_items {
1307                            filter_reasons.push("max_items_exhausted".to_string());
1308                        }
1309                        if query.token_budget.is_some() {
1310                            filter_reasons.push("token_budget_exhausted".to_string());
1311                        }
1312                    }
1313
1314                    RecallTraceCandidate {
1315                        record_id: candidate.hit.record.id.clone(),
1316                        kind: candidate.hit.record.kind,
1317                        selected,
1318                        planner_stage: candidate.planner_stage,
1319                        candidate_sources: candidate.candidate_sources.clone(),
1320                        selection_rank,
1321                        matched_terms: candidate.matched_terms.clone(),
1322                        selected_channels,
1323                        filter_reasons,
1324                        decision_reason: if selected {
1325                            "selected_by_rank".to_string()
1326                        } else if query.token_budget.is_some() {
1327                            "excluded_by_rank_or_budget".to_string()
1328                        } else {
1329                            "excluded_by_rank".to_string()
1330                        },
1331                        breakdown: candidate.hit.breakdown.clone(),
1332                    }
1333                })
1334                .collect(),
1335        });
1336
1337        let explanation = query.include_explanation.then(|| {
1338            let mut selected_channels = if query.query_text.trim().is_empty() {
1339                vec!["temporal".to_string(), "policy".to_string()]
1340            } else {
1341                vec!["lexical".to_string(), "policy".to_string()]
1342            };
1343            for channel in [
1344                "semantic", "metadata", "episodic", "salience", "curation", "conflict",
1345            ] {
1346                let present = planned.iter().any(|candidate| match channel {
1347                    "semantic" => candidate.hit.breakdown.semantic > 0.0,
1348                    "metadata" => candidate.hit.breakdown.metadata > 0.0,
1349                    "episodic" => candidate.hit.breakdown.episodic > 0.0,
1350                    "salience" => candidate.hit.breakdown.salience > 0.0,
1351                    "curation" => candidate.hit.breakdown.curation > 0.0,
1352                    "conflict" => candidate.hit.record.conflict.is_some(),
1353                    _ => false,
1354                });
1355                if present && !selected_channels.iter().any(|existing| existing == channel) {
1356                    selected_channels.push(channel.to_string());
1357                }
1358            }
1359            let mut policy_notes = vec![if query.query_text.trim().is_empty() {
1360                "recent_scope_scan".to_string()
1361            } else {
1362                "initial_file_backend_scoring".to_string()
1363            }];
1364            policy_notes.push(scorer.profile_note().to_string());
1365            policy_notes.push(scorer.policy_profile_note().to_string());
1366            policy_notes.push(Self::planning_profile_note(planning_profile).to_string());
1367            if let Some(note) = scorer.embedding_note() {
1368                policy_notes.push(note.to_string());
1369            }
1370            if query.filters.episode_id.is_some() {
1371                policy_notes.push("episode_filter_applied".to_string());
1372            }
1373            if query.filters.unresolved_only {
1374                policy_notes.push("unresolved_only_filter_applied".to_string());
1375            }
1376            if query.filters.before_record_id.is_some() || query.filters.after_record_id.is_some() {
1377                policy_notes.push("relative_temporal_filter_applied".to_string());
1378            }
1379            if !query.filters.boundary_labels.is_empty() || query.filters.recurrence_key.is_some() {
1380                policy_notes.push("episodic_boundary_filter_applied".to_string());
1381            }
1382            if !query.filters.conflict_states.is_empty()
1383                || !query.filters.resolution_kinds.is_empty()
1384                || query.filters.unresolved_conflicts_only
1385            {
1386                policy_notes.push("conflict_review_filter_applied".to_string());
1387            }
1388            RecallExplanation {
1389                selected_channels,
1390                policy_notes,
1391                trace_id: Some(trace_id.to_string()),
1392                planning_trace,
1393                planning_profile: Some(planning_profile),
1394                policy_profile: Some(scorer.policy_profile()),
1395                scorer_kind: Some(scorer.scorer_kind()),
1396                scoring_profile: Some(scorer.scoring_profile()),
1397            }
1398        });
1399
1400        (hits, explanation)
1401    }
1402
1403    fn apply_retention_for_namespace(&self, tenant_id: &str, namespace: &str) -> Result<()> {
1404        let now_unix_ms = Self::now_unix_ms()?;
1405        let retention = &self.config.engine_config.retention;
1406        let ttl_window_ms = u64::from(retention.ttl_days).saturating_mul(24 * 60 * 60 * 1_000);
1407        let archive_window_ms =
1408            u64::from(retention.archive_after_days).saturating_mul(24 * 60 * 60 * 1_000);
1409
1410        let mut namespace_records = self
1411            .iterate_records()?
1412            .into_iter()
1413            .filter(|stored| {
1414                stored.record.scope.tenant_id == tenant_id
1415                    && stored.record.scope.namespace == namespace
1416            })
1417            .collect::<Vec<_>>();
1418
1419        for stored in &mut namespace_records {
1420            if self.retention_exempt(&stored.record) {
1421                continue;
1422            }
1423
1424            let expired_by_explicit_deadline = stored
1425                .record
1426                .expires_at_unix_ms
1427                .is_some_and(|expires_at| expires_at <= now_unix_ms);
1428            let expired_by_ttl = ttl_window_ms > 0
1429                && now_unix_ms.saturating_sub(stored.record.created_at_unix_ms) > ttl_window_ms;
1430
1431            if expired_by_explicit_deadline || expired_by_ttl {
1432                self.remove_record(&stored.record.id)?;
1433                self.remove_idempotency_mapping(stored)?;
1434                continue;
1435            }
1436
1437            let should_archive_by_age = archive_window_ms > 0
1438                && now_unix_ms.saturating_sub(stored.record.created_at_unix_ms) > archive_window_ms
1439                && matches!(
1440                    stored.record.quality_state,
1441                    MemoryQualityState::Draft
1442                        | MemoryQualityState::Active
1443                        | MemoryQualityState::Verified
1444                );
1445            if should_archive_by_age && stored.record.quality_state != MemoryQualityState::Archived
1446            {
1447                stored.record.quality_state = MemoryQualityState::Archived;
1448                stored.record.historical_state = MemoryHistoricalState::Historical;
1449                stored.record.updated_at_unix_ms = now_unix_ms;
1450                self.persist_record(stored)?;
1451            }
1452        }
1453
1454        if retention.max_records_per_namespace > 0 {
1455            let mut active = self
1456                .iterate_records()?
1457                .into_iter()
1458                .filter(|stored| {
1459                    stored.record.scope.tenant_id == tenant_id
1460                        && stored.record.scope.namespace == namespace
1461                        && !self.retention_exempt(&stored.record)
1462                        && matches!(
1463                            stored.record.quality_state,
1464                            MemoryQualityState::Draft
1465                                | MemoryQualityState::Active
1466                                | MemoryQualityState::Verified
1467                        )
1468                })
1469                .collect::<Vec<_>>();
1470            if active.len() > retention.max_records_per_namespace {
1471                active.sort_by(|left, right| {
1472                    left.record
1473                        .updated_at_unix_ms
1474                        .cmp(&right.record.updated_at_unix_ms)
1475                        .then_with(|| {
1476                            left.record
1477                                .importance_score
1478                                .total_cmp(&right.record.importance_score)
1479                        })
1480                        .then_with(|| left.record.id.cmp(&right.record.id))
1481                });
1482                let archive_count = active.len() - retention.max_records_per_namespace;
1483                for stored in active.iter_mut().take(archive_count) {
1484                    stored.record.quality_state = MemoryQualityState::Archived;
1485                    stored.record.historical_state = MemoryHistoricalState::Historical;
1486                    stored.record.updated_at_unix_ms = now_unix_ms;
1487                    self.persist_record(stored)?;
1488                }
1489            }
1490        }
1491        Ok(())
1492    }
1493}
1494
1495#[async_trait]
1496impl MemoryStore for FileMemoryStore {
1497    fn backend_kind(&self) -> &'static str {
1498        "file"
1499    }
1500
1501    async fn upsert(&self, request: UpsertRequest) -> Result<UpsertReceipt> {
1502        self.validate_upsert_request(&request)?;
1503
1504        if let Some(idempotency_key) = &request.idempotency_key {
1505            let path = self.idempotency_path(&request.record.scope, idempotency_key);
1506            if path.exists() {
1507                let existing_record_id = fs::read_to_string(&path).map_err(|err| {
1508                    Error::Backend(format!(
1509                        "failed to read idempotency mapping {}: {err}",
1510                        path.display()
1511                    ))
1512                })?;
1513                if existing_record_id != request.record.id {
1514                    return Err(Error::Conflict(format!(
1515                        "idempotency key already belongs to record {}",
1516                        existing_record_id
1517                    )));
1518                }
1519                if self.load_record(&existing_record_id)?.is_some() {
1520                    return Ok(UpsertReceipt {
1521                        record_id: existing_record_id,
1522                        deduplicated: true,
1523                        summary_refreshed: false,
1524                    });
1525                }
1526                fs::remove_file(&path).map_err(|err| {
1527                    Error::Backend(format!(
1528                        "failed to clear stale idempotency mapping {}: {err}",
1529                        path.display()
1530                    ))
1531                })?;
1532            }
1533        }
1534
1535        let key = request.record.id.clone();
1536        let tenant_id = request.record.scope.tenant_id.clone();
1537        let namespace = request.record.scope.namespace.clone();
1538        let existing = self.load_record(&key)?;
1539        let deduplicated = existing.is_some();
1540        let stored = StoredRecord {
1541            record: request.record,
1542            idempotency_key: request.idempotency_key,
1543        };
1544        if let Some(existing) = existing
1545            && existing.idempotency_key != stored.idempotency_key
1546        {
1547            self.remove_idempotency_mapping(&existing)?;
1548        }
1549        self.persist_record(&stored)?;
1550        if let Some(idempotency_key) = &stored.idempotency_key {
1551            fs::write(
1552                self.idempotency_path(&stored.record.scope, idempotency_key),
1553                key.as_bytes(),
1554            )
1555            .map_err(|err| Error::Backend(format!("failed to write idempotency mapping: {err}")))?;
1556        }
1557        self.apply_retention_for_namespace(&tenant_id, &namespace)?;
1558        Ok(UpsertReceipt {
1559            record_id: key,
1560            deduplicated,
1561            summary_refreshed: false,
1562        })
1563    }
1564
1565    async fn batch_upsert(&self, request: BatchUpsertRequest) -> Result<Vec<UpsertReceipt>> {
1566        if request.requests.len() > self.config.engine_config.max_batch_size {
1567            return Err(Error::InvalidRequest(format!(
1568                "batch size {} exceeds configured max_batch_size {}",
1569                request.requests.len(),
1570                self.config.engine_config.max_batch_size
1571            )));
1572        }
1573        for request in &request.requests {
1574            self.validate_upsert_request(request)?;
1575        }
1576        let mut receipts = Vec::with_capacity(request.requests.len());
1577        for request in request.requests {
1578            receipts.push(self.upsert(request).await?);
1579        }
1580        Ok(receipts)
1581    }
1582
1583    async fn recall(&self, query: RecallQuery) -> Result<RecallResult> {
1584        let empty_query = query.query_text.trim().is_empty();
1585        let planner = self.config.recall_planner();
1586        let scorer = planner.scorer();
1587        let planning_profile = planner.effective_profile(&query);
1588        let stored_records = self.iterate_records()?;
1589        let relative_bounds = Self::relative_temporal_bounds(&stored_records, &query)?;
1590        let records = stored_records
1591            .into_iter()
1592            .filter(|stored| Self::record_passes_filters(&stored.record, &query, relative_bounds))
1593            .map(|stored| stored.record)
1594            .collect::<Vec<_>>();
1595        let mut scored = planner.plan(&records, &query);
1596        match query.filters.temporal_order {
1597            RecallTemporalOrder::Relevance if empty_query => {
1598                scored.sort_by(|left, right| {
1599                    Self::record_temporal_anchor(&right.hit.record)
1600                        .cmp(&Self::record_temporal_anchor(&left.hit.record))
1601                        .then_with(|| {
1602                            right
1603                                .hit
1604                                .record
1605                                .importance_score
1606                                .total_cmp(&left.hit.record.importance_score)
1607                        })
1608                        .then_with(|| left.hit.record.id.cmp(&right.hit.record.id))
1609                });
1610            }
1611            RecallTemporalOrder::Relevance => {
1612                scored.sort_by(|left, right| {
1613                    right
1614                        .hit
1615                        .breakdown
1616                        .total
1617                        .total_cmp(&left.hit.breakdown.total)
1618                        .then_with(|| left.hit.record.id.cmp(&right.hit.record.id))
1619                });
1620            }
1621            RecallTemporalOrder::ChronologicalAsc => {
1622                scored.sort_by(|left, right| {
1623                    Self::record_temporal_anchor(&left.hit.record)
1624                        .cmp(&Self::record_temporal_anchor(&right.hit.record))
1625                        .then_with(|| {
1626                            right
1627                                .hit
1628                                .breakdown
1629                                .total
1630                                .total_cmp(&left.hit.breakdown.total)
1631                        })
1632                        .then_with(|| left.hit.record.id.cmp(&right.hit.record.id))
1633                });
1634            }
1635            RecallTemporalOrder::ChronologicalDesc => {
1636                scored.sort_by(|left, right| {
1637                    Self::record_temporal_anchor(&right.hit.record)
1638                        .cmp(&Self::record_temporal_anchor(&left.hit.record))
1639                        .then_with(|| {
1640                            right
1641                                .hit
1642                                .breakdown
1643                                .total
1644                                .total_cmp(&left.hit.breakdown.total)
1645                        })
1646                        .then_with(|| left.hit.record.id.cmp(&right.hit.record.id))
1647                });
1648            }
1649        }
1650
1651        let examined = scored.len();
1652        let mut selected_ids = Vec::with_capacity(query.max_items);
1653        let mut remaining_budget = query.token_budget.unwrap_or(usize::MAX);
1654        for candidate in &scored {
1655            if selected_ids.len() >= query.max_items {
1656                break;
1657            }
1658            let estimated_tokens = Self::approximate_tokens(&candidate.hit.record);
1659            if selected_ids.is_empty() || estimated_tokens <= remaining_budget {
1660                remaining_budget = remaining_budget.saturating_sub(estimated_tokens);
1661                selected_ids.push(candidate.hit.record.id.clone());
1662            }
1663        }
1664
1665        let trace_id = format!(
1666            "recall:{}:{}:{}",
1667            query.scope.tenant_id, query.scope.namespace, examined
1668        );
1669        let (hits, explanation) = self.build_explanations(
1670            scorer,
1671            planning_profile,
1672            &query,
1673            &scored,
1674            &selected_ids,
1675            &trace_id,
1676        );
1677        Ok(RecallResult {
1678            hits,
1679            total_candidates_examined: examined,
1680            explanation,
1681        })
1682    }
1683
1684    async fn compact(&self, request: CompactionRequest) -> Result<CompactionReport> {
1685        if request.tenant_id.trim().is_empty() {
1686            return Err(Error::InvalidRequest(
1687                "compaction tenant_id is required".to_string(),
1688            ));
1689        }
1690
1691        let mut groups: HashMap<String, Vec<StoredRecord>> = HashMap::new();
1692        for stored in self.iterate_records()? {
1693            if stored.record.scope.tenant_id != request.tenant_id {
1694                continue;
1695            }
1696            if let Some(namespace) = &request.namespace
1697                && stored.record.scope.namespace != *namespace
1698            {
1699                continue;
1700            }
1701            if matches!(
1702                stored.record.quality_state,
1703                MemoryQualityState::Archived
1704                    | MemoryQualityState::Deleted
1705                    | MemoryQualityState::Suppressed
1706            ) {
1707                continue;
1708            }
1709            groups
1710                .entry(Self::dedup_signature(&stored.record))
1711                .or_default()
1712                .push(stored);
1713        }
1714
1715        let mut deduplicated_records = 0u64;
1716        let mut archived_records = 0u64;
1717        let mut summarized_clusters = 0u64;
1718        let mut superseded_records = 0u64;
1719        let mut lineage_links_created = 0u64;
1720        let now_unix_ms = Self::now_unix_ms()?;
1721        for group in groups.values_mut() {
1722            if group.len() < 2 {
1723                continue;
1724            }
1725            group.sort_by(|left, right| {
1726                right
1727                    .record
1728                    .updated_at_unix_ms
1729                    .cmp(&left.record.updated_at_unix_ms)
1730                    .then_with(|| {
1731                        right
1732                            .record
1733                            .importance_score
1734                            .total_cmp(&left.record.importance_score)
1735                    })
1736                    .then_with(|| left.record.id.cmp(&right.record.id))
1737            });
1738            let signature = Self::dedup_signature(&group[0].record);
1739            if self
1740                .config
1741                .engine_config
1742                .compaction
1743                .summarize_after_record_count
1744                > 0
1745                && group.len()
1746                    >= self
1747                        .config
1748                        .engine_config
1749                        .compaction
1750                        .summarize_after_record_count
1751            {
1752                summarized_clusters += 1;
1753                lineage_links_created += group.len() as u64;
1754                if !request.dry_run {
1755                    let summary = Self::compaction_summary_record(group, &signature, now_unix_ms);
1756                    self.persist_record(&summary)?;
1757                }
1758            }
1759            for duplicate in group.iter_mut().skip(1) {
1760                deduplicated_records += 1;
1761                archived_records += 1;
1762                superseded_records += 1;
1763                if request.dry_run {
1764                    continue;
1765                }
1766                duplicate.record.quality_state = MemoryQualityState::Archived;
1767                duplicate.record.historical_state = MemoryHistoricalState::Superseded;
1768                duplicate.record.lineage.push(LineageLink {
1769                    record_id: Self::summary_record_id(&signature),
1770                    relation: LineageRelationKind::SupersededBy,
1771                    confidence: 1.0,
1772                });
1773                lineage_links_created += 1;
1774                duplicate.record.updated_at_unix_ms = Self::now_unix_ms()?;
1775                self.persist_record(duplicate)?;
1776            }
1777        }
1778
1779        for mut candidate in self.cold_tiering_candidates(
1780            &request.tenant_id,
1781            request.namespace.as_deref(),
1782            now_unix_ms,
1783        )? {
1784            archived_records += 1;
1785            if request.dry_run {
1786                continue;
1787            }
1788            candidate.record.quality_state = MemoryQualityState::Archived;
1789            candidate.record.historical_state = MemoryHistoricalState::Historical;
1790            candidate.record.updated_at_unix_ms = now_unix_ms;
1791            self.persist_record(&candidate)?;
1792        }
1793
1794        Ok(CompactionReport {
1795            deduplicated_records,
1796            archived_records,
1797            summarized_clusters,
1798            pruned_graph_edges: 0,
1799            superseded_records,
1800            lineage_links_created,
1801            dry_run: request.dry_run,
1802        })
1803    }
1804
1805    async fn delete(&self, request: DeleteRequest) -> Result<DeleteReceipt> {
1806        self.validate_delete_request(&request)?;
1807        let Some(stored) = self.load_record(&request.record_id)? else {
1808            return Ok(DeleteReceipt {
1809                record_id: request.record_id,
1810                tombstoned: false,
1811                hard_deleted: false,
1812            });
1813        };
1814        if stored.record.scope.tenant_id != request.tenant_id {
1815            return Err(Error::InvalidRequest(format!(
1816                "record {} does not belong to tenant {}",
1817                request.record_id, request.tenant_id
1818            )));
1819        }
1820        if stored.record.scope.namespace != request.namespace {
1821            return Err(Error::InvalidRequest(format!(
1822                "record {} does not belong to namespace {}",
1823                request.record_id, request.namespace
1824            )));
1825        }
1826
1827        if request.hard_delete {
1828            self.remove_record(&request.record_id)?;
1829            self.remove_idempotency_mapping(&stored)?;
1830        } else {
1831            let mut tombstone = stored;
1832            tombstone.record.quality_state = MemoryQualityState::Deleted;
1833            tombstone.record.updated_at_unix_ms = Self::now_unix_ms()?;
1834            self.persist_record(&tombstone)?;
1835        }
1836
1837        Ok(DeleteReceipt {
1838            record_id: request.record_id,
1839            tombstoned: !request.hard_delete,
1840            hard_deleted: request.hard_delete,
1841        })
1842    }
1843
1844    async fn archive(&self, request: ArchiveRequest) -> Result<ArchiveReceipt> {
1845        self.validate_archive_request(&request)?;
1846        let Some(mut stored) = self.load_record(&request.record_id)? else {
1847            return Err(Error::InvalidRequest(format!(
1848                "record {} was not found",
1849                request.record_id
1850            )));
1851        };
1852        Self::validate_record_scope(&stored, &request.tenant_id, &request.namespace)?;
1853
1854        let previous_quality_state = stored.record.quality_state;
1855        let previous_historical_state = stored.record.historical_state;
1856        let changed = previous_quality_state != MemoryQualityState::Archived
1857            || previous_historical_state == MemoryHistoricalState::Current;
1858        let historical_state = match previous_historical_state {
1859            MemoryHistoricalState::Current => MemoryHistoricalState::Historical,
1860            other => other,
1861        };
1862
1863        if changed && !request.dry_run {
1864            stored.record.quality_state = MemoryQualityState::Archived;
1865            stored.record.historical_state = historical_state;
1866            stored.record.updated_at_unix_ms = Self::now_unix_ms()?;
1867            self.persist_record(&stored)?;
1868        }
1869
1870        Ok(ArchiveReceipt {
1871            record_id: request.record_id,
1872            previous_quality_state,
1873            previous_historical_state,
1874            quality_state: MemoryQualityState::Archived,
1875            historical_state,
1876            changed,
1877            dry_run: request.dry_run,
1878        })
1879    }
1880
1881    async fn suppress(&self, request: SuppressRequest) -> Result<SuppressReceipt> {
1882        self.validate_suppress_request(&request)?;
1883        let Some(mut stored) = self.load_record(&request.record_id)? else {
1884            return Err(Error::InvalidRequest(format!(
1885                "record {} was not found",
1886                request.record_id
1887            )));
1888        };
1889        Self::validate_record_scope(&stored, &request.tenant_id, &request.namespace)?;
1890
1891        let previous_quality_state = stored.record.quality_state;
1892        let previous_historical_state = stored.record.historical_state;
1893        let changed = previous_quality_state != MemoryQualityState::Suppressed;
1894
1895        if changed && !request.dry_run {
1896            stored.record.quality_state = MemoryQualityState::Suppressed;
1897            stored.record.updated_at_unix_ms = Self::now_unix_ms()?;
1898            self.persist_record(&stored)?;
1899        }
1900
1901        Ok(SuppressReceipt {
1902            record_id: request.record_id,
1903            previous_quality_state,
1904            previous_historical_state,
1905            quality_state: MemoryQualityState::Suppressed,
1906            historical_state: previous_historical_state,
1907            changed,
1908            dry_run: request.dry_run,
1909        })
1910    }
1911
1912    async fn recover(&self, request: RecoverRequest) -> Result<RecoverReceipt> {
1913        self.validate_recover_request(&request)?;
1914        let Some(mut stored) = self.load_record(&request.record_id)? else {
1915            return Err(Error::InvalidRequest(format!(
1916                "record {} was not found",
1917                request.record_id
1918            )));
1919        };
1920        Self::validate_record_scope(&stored, &request.tenant_id, &request.namespace)?;
1921
1922        let previous_quality_state = stored.record.quality_state;
1923        let previous_historical_state = stored.record.historical_state;
1924        let historical_state = request
1925            .historical_state
1926            .unwrap_or(MemoryHistoricalState::Current);
1927        let changed = previous_quality_state != request.quality_state
1928            || previous_historical_state != historical_state;
1929
1930        if changed && !request.dry_run {
1931            stored.record.quality_state = request.quality_state;
1932            stored.record.historical_state = historical_state;
1933            stored.record.updated_at_unix_ms = Self::now_unix_ms()?;
1934            self.persist_record(&stored)?;
1935        }
1936
1937        Ok(RecoverReceipt {
1938            record_id: request.record_id,
1939            previous_quality_state,
1940            previous_historical_state,
1941            quality_state: request.quality_state,
1942            historical_state,
1943            changed,
1944            dry_run: request.dry_run,
1945        })
1946    }
1947
1948    async fn snapshot(&self) -> Result<SnapshotManifest> {
1949        let records = self.iterate_records()?;
1950        let namespaces = records
1951            .iter()
1952            .map(|stored| stored.record.scope.namespace.clone())
1953            .collect::<BTreeSet<_>>()
1954            .into_iter()
1955            .collect::<Vec<_>>();
1956        let created_at_unix_ms = Self::now_unix_ms()?;
1957        let storage_bytes = dir_size(&self.config.data_dir)?;
1958
1959        Ok(SnapshotManifest {
1960            snapshot_id: format!("snapshot-{created_at_unix_ms}"),
1961            created_at_unix_ms,
1962            namespaces,
1963            record_count: records.len() as u64,
1964            storage_bytes,
1965            engine: self.config.engine_config.tuning_info(),
1966        })
1967    }
1968
1969    async fn stats(&self, request: StoreStatsRequest) -> Result<StoreStatsReport> {
1970        self.build_stats_report(&request)
1971    }
1972
1973    async fn integrity_check(
1974        &self,
1975        request: IntegrityCheckRequest,
1976    ) -> Result<IntegrityCheckReport> {
1977        let summary = self
1978            .build_integrity_summary(request.tenant_id.as_deref(), request.namespace.as_deref())?;
1979        Ok(IntegrityCheckReport {
1980            generated_at_unix_ms: Self::now_unix_ms()?,
1981            healthy: summary.stale_idempotency_keys == 0
1982                && summary.missing_idempotency_keys == 0
1983                && summary.duplicate_active_records == 0,
1984            scanned_records: summary.scanned_records,
1985            scanned_idempotency_keys: summary.scanned_idempotency_keys,
1986            stale_idempotency_keys: summary.stale_idempotency_keys,
1987            missing_idempotency_keys: summary.missing_idempotency_keys,
1988            duplicate_active_records: summary.duplicate_active_records,
1989        })
1990    }
1991
1992    async fn repair(&self, request: RepairRequest) -> Result<RepairReport> {
1993        if request.reason.trim().is_empty() {
1994            return Err(Error::InvalidRequest(
1995                "repair reason is required".to_string(),
1996            ));
1997        }
1998        if !request.remove_stale_idempotency_keys && !request.rebuild_missing_idempotency_keys {
1999            return Err(Error::InvalidRequest(
2000                "repair requires at least one enabled action".to_string(),
2001            ));
2002        }
2003
2004        let tenant_filter = request.tenant_id.as_deref();
2005        let namespace_filter = request.namespace.as_deref();
2006        let summary = self.build_integrity_summary(tenant_filter, namespace_filter)?;
2007        let records = self.iterate_records()?;
2008        let mappings = self.iterate_idempotency_mappings()?;
2009        let mut removed_stale_idempotency_keys = 0u64;
2010        let mut rebuilt_missing_idempotency_keys = 0u64;
2011
2012        if request.remove_stale_idempotency_keys {
2013            for mapping in &mappings {
2014                let Some((tenant_id, namespace, _, _, _, idempotency_key)) =
2015                    Self::parse_scope_key(&mapping.scoped_key)
2016                else {
2017                    continue;
2018                };
2019                if !Self::scope_matches_filters(
2020                    &tenant_id,
2021                    &namespace,
2022                    tenant_filter,
2023                    namespace_filter,
2024                ) {
2025                    continue;
2026                }
2027                let stale = match self.load_record(&mapping.record_id)? {
2028                    Some(stored) => {
2029                        stored.record.scope.tenant_id != tenant_id
2030                            || stored.record.scope.namespace != namespace
2031                            || stored.idempotency_key.as_deref() != Some(idempotency_key.as_str())
2032                            || Self::idempotency_scope_key(&stored.record.scope, &idempotency_key)
2033                                != mapping.scoped_key
2034                    }
2035                    None => true,
2036                };
2037                if stale {
2038                    removed_stale_idempotency_keys += 1;
2039                    if !request.dry_run {
2040                        let path = Self::idempotency_dir(&self.config.data_dir)
2041                            .join(format!("{}.txt", hex_key(&mapping.scoped_key)));
2042                        if path.exists() {
2043                            fs::remove_file(&path).map_err(|err| {
2044                                Error::Backend(format!(
2045                                    "failed to remove stale idempotency mapping {}: {err}",
2046                                    path.display()
2047                                ))
2048                            })?;
2049                        }
2050                    }
2051                }
2052            }
2053        }
2054
2055        if request.rebuild_missing_idempotency_keys {
2056            let existing = self.iterate_idempotency_mappings()?;
2057            let existing_lookup = existing
2058                .into_iter()
2059                .map(|mapping| (mapping.scoped_key, mapping.record_id))
2060                .collect::<HashMap<_, _>>();
2061
2062            for stored in &records {
2063                if !Self::scope_matches_filters(
2064                    &stored.record.scope.tenant_id,
2065                    &stored.record.scope.namespace,
2066                    tenant_filter,
2067                    namespace_filter,
2068                ) {
2069                    continue;
2070                }
2071                let Some(idempotency_key) = &stored.idempotency_key else {
2072                    continue;
2073                };
2074                let scoped_key = Self::idempotency_scope_key(&stored.record.scope, idempotency_key);
2075                if existing_lookup.get(&scoped_key) == Some(&stored.record.id) {
2076                    continue;
2077                }
2078                rebuilt_missing_idempotency_keys += 1;
2079                if !request.dry_run {
2080                    fs::write(
2081                        Self::idempotency_dir(&self.config.data_dir)
2082                            .join(format!("{}.txt", hex_key(&scoped_key))),
2083                        stored.record.id.as_bytes(),
2084                    )
2085                    .map_err(|err| {
2086                        Error::Backend(format!("failed to rebuild idempotency mapping: {err}"))
2087                    })?;
2088                }
2089            }
2090        }
2091
2092        let stale_after = if request.remove_stale_idempotency_keys {
2093            0
2094        } else {
2095            summary.stale_idempotency_keys
2096        };
2097        let missing_after = if request.rebuild_missing_idempotency_keys {
2098            0
2099        } else {
2100            summary.missing_idempotency_keys
2101        };
2102
2103        Ok(RepairReport {
2104            dry_run: request.dry_run,
2105            scanned_records: summary.scanned_records,
2106            scanned_idempotency_keys: summary.scanned_idempotency_keys,
2107            removed_stale_idempotency_keys,
2108            rebuilt_missing_idempotency_keys,
2109            healthy_after: stale_after == 0
2110                && missing_after == 0
2111                && summary.duplicate_active_records == 0,
2112        })
2113    }
2114
2115    async fn export(&self, request: ExportRequest) -> Result<PortableStorePackage> {
2116        let exported_at_unix_ms = Self::now_unix_ms()?;
2117        let mut namespaces = BTreeSet::new();
2118        let mut records = Vec::new();
2119        for stored in self.iterate_records()? {
2120            if request
2121                .tenant_id
2122                .as_deref()
2123                .is_some_and(|tenant_id| stored.record.scope.tenant_id != tenant_id)
2124            {
2125                continue;
2126            }
2127            if request
2128                .namespace
2129                .as_deref()
2130                .is_some_and(|namespace| stored.record.scope.namespace != namespace)
2131            {
2132                continue;
2133            }
2134            if !request.include_archived
2135                && stored.record.quality_state == MemoryQualityState::Archived
2136            {
2137                continue;
2138            }
2139            namespaces.insert(format!(
2140                "{}:{}",
2141                stored.record.scope.tenant_id, stored.record.scope.namespace
2142            ));
2143            records.push(PortableRecord {
2144                record: stored.record,
2145                idempotency_key: stored.idempotency_key,
2146            });
2147        }
2148
2149        let storage_bytes = records
2150            .iter()
2151            .map(|entry| {
2152                entry.record.content.len()
2153                    + entry.record.summary.as_deref().map(str::len).unwrap_or(0)
2154            })
2155            .sum::<usize>() as u64;
2156
2157        Ok(PortableStorePackage {
2158            package_version: PORTABLE_PACKAGE_VERSION,
2159            exported_at_unix_ms,
2160            manifest: SnapshotManifest {
2161                snapshot_id: format!("portable-export-{exported_at_unix_ms}"),
2162                created_at_unix_ms: exported_at_unix_ms,
2163                namespaces: namespaces.into_iter().collect(),
2164                record_count: records.len() as u64,
2165                storage_bytes,
2166                engine: self.config.engine_config.tuning_info(),
2167            },
2168            records,
2169        })
2170    }
2171
2172    async fn import(&self, request: ImportRequest) -> Result<ImportReport> {
2173        let snapshot_id = request.package.manifest.snapshot_id.clone();
2174        let package_version = request.package.package_version;
2175        let (validated_records, compatible_package, failed_records, entries) =
2176            self.validate_import_request(&request);
2177        let apply_changes = compatible_package
2178            && failed_records.is_empty()
2179            && !request.dry_run
2180            && !matches!(request.mode, ImportMode::Validate);
2181        let mut imported_records = 0u64;
2182        let mut skipped_records = 0u64;
2183
2184        if apply_changes && matches!(request.mode, ImportMode::Replace) {
2185            self.clear_all_records()?;
2186        }
2187
2188        for entry in entries {
2189            if matches!(request.mode, ImportMode::Merge)
2190                && self.load_record(&entry.record.id)?.is_some()
2191            {
2192                skipped_records += 1;
2193                continue;
2194            }
2195            if apply_changes {
2196                self.persist_imported_record(&StoredRecord {
2197                    record: entry.record,
2198                    idempotency_key: entry.idempotency_key,
2199                })?;
2200            }
2201            imported_records += 1;
2202        }
2203
2204        Ok(ImportReport {
2205            mode: request.mode,
2206            dry_run: request.dry_run,
2207            applied: apply_changes,
2208            compatible_package,
2209            package_version,
2210            validated_records,
2211            imported_records,
2212            skipped_records,
2213            replaced_existing: matches!(request.mode, ImportMode::Replace),
2214            snapshot_id,
2215            failed_records,
2216        })
2217    }
2218}
2219
2220fn dir_size(path: &Path) -> Result<u64> {
2221    let mut total = 0u64;
2222    for entry in fs::read_dir(path)
2223        .map_err(|err| Error::Backend(format!("failed to read dir {}: {err}", path.display())))?
2224    {
2225        let entry = entry.map_err(|err| {
2226            Error::Backend(format!("failed to iterate dir {}: {err}", path.display()))
2227        })?;
2228        let entry_path = entry.path();
2229        if entry_path.is_dir() {
2230            total = total.saturating_add(dir_size(&entry_path)?);
2231        } else {
2232            total = total.saturating_add(
2233                entry
2234                    .metadata()
2235                    .map_err(|err| {
2236                        Error::Backend(format!(
2237                            "failed to stat file {}: {err}",
2238                            entry_path.display()
2239                        ))
2240                    })?
2241                    .len(),
2242            );
2243        }
2244    }
2245    Ok(total)
2246}
2247
2248fn hex_key(input: &str) -> String {
2249    let mut output = String::with_capacity(input.len() * 2);
2250    for byte in input.as_bytes() {
2251        output.push(nibble_to_hex(byte >> 4));
2252        output.push(nibble_to_hex(byte & 0x0f));
2253    }
2254    output
2255}
2256
2257fn nibble_to_hex(value: u8) -> char {
2258    match value {
2259        0..=9 => (b'0' + value) as char,
2260        10..=15 => (b'a' + (value - 10)) as char,
2261        _ => '0',
2262    }
2263}
2264
2265fn unhex_key(input: &str) -> Option<String> {
2266    if input.len() % 2 != 0 {
2267        return None;
2268    }
2269
2270    let mut bytes = Vec::with_capacity(input.len() / 2);
2271    let chars = input.as_bytes().chunks_exact(2);
2272    for chunk in chars {
2273        let high = hex_to_nibble(chunk[0] as char)?;
2274        let low = hex_to_nibble(chunk[1] as char)?;
2275        bytes.push((high << 4) | low);
2276    }
2277
2278    String::from_utf8(bytes).ok()
2279}
2280
2281fn hex_to_nibble(value: char) -> Option<u8> {
2282    match value {
2283        '0'..='9' => Some(value as u8 - b'0'),
2284        'a'..='f' => Some(value as u8 - b'a' + 10),
2285        'A'..='F' => Some(value as u8 - b'A' + 10),
2286        _ => None,
2287    }
2288}