1use async_trait::async_trait;
2use mnemara_core::{
3 ArchiveReceipt, ArchiveRequest, BatchUpsertRequest, CompactionReport, CompactionRequest,
4 ConfiguredRecallScorer, DeleteReceipt, DeleteRequest, EngineConfig, Error, ExportRequest,
5 ImportFailure, ImportMode, ImportReport, ImportRequest, IntegrityCheckReport,
6 IntegrityCheckRequest, LineageLink, LineageRelationKind, MaintenanceStats,
7 MemoryHistoricalState, MemoryQualityState, MemoryRecord, MemoryScope, MemoryStore,
8 MemoryTrustLevel, NamespaceStats, PlannedRecallCandidate, PortableRecord, PortableStorePackage,
9 RecallExplanation, RecallHistoricalMode, RecallHit, RecallPlanner, RecallPlanningProfile,
10 RecallPlanningTrace, RecallQuery, RecallResult, RecallScorer, RecallTemporalOrder,
11 RecallTraceCandidate, RecoverReceipt, RecoverRequest, RepairReport, RepairRequest, Result,
12 SemanticEmbedder, SnapshotManifest, StoreStatsReport, StoreStatsRequest, SuppressReceipt,
13 SuppressRequest, UpsertReceipt, UpsertRequest,
14};
15use serde::{Deserialize, Serialize};
16use std::collections::{BTreeMap, BTreeSet, HashMap};
17use std::fmt;
18use std::fs;
19use std::hash::{Hash, Hasher};
20use std::path::{Path, PathBuf};
21use std::sync::Arc;
22use std::time::{SystemTime, UNIX_EPOCH};
23
24#[derive(Clone)]
25pub struct FileStoreConfig {
26 pub data_dir: PathBuf,
27 pub engine_config: EngineConfig,
28 pub shared_embedder: Option<SharedEmbedderConfig>,
29}
30
31#[derive(Clone)]
32pub struct SharedEmbedderConfig {
33 pub embedder: Arc<dyn SemanticEmbedder>,
34 pub provider_note: String,
35}
36
37impl SharedEmbedderConfig {
38 fn new(embedder: Arc<dyn SemanticEmbedder>, provider_note: impl Into<String>) -> Self {
39 Self {
40 embedder,
41 provider_note: provider_note.into(),
42 }
43 }
44}
45
46impl fmt::Debug for SharedEmbedderConfig {
47 fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result {
48 formatter
49 .debug_struct("SharedEmbedderConfig")
50 .field("provider_note", &self.provider_note)
51 .finish()
52 }
53}
54
55impl fmt::Debug for FileStoreConfig {
56 fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result {
57 formatter
58 .debug_struct("FileStoreConfig")
59 .field("data_dir", &self.data_dir)
60 .field("engine_config", &self.engine_config)
61 .field("shared_embedder", &self.shared_embedder)
62 .finish()
63 }
64}
65
66impl FileStoreConfig {
67 pub fn new(data_dir: impl AsRef<Path>) -> Self {
68 Self {
69 data_dir: data_dir.as_ref().to_path_buf(),
70 engine_config: EngineConfig::default(),
71 shared_embedder: None,
72 }
73 }
74
75 pub fn with_engine_config(mut self, engine_config: EngineConfig) -> Self {
76 self.engine_config = engine_config;
77 self
78 }
79
80 pub fn with_shared_embedder(
81 mut self,
82 embedder: Arc<dyn SemanticEmbedder>,
83 provider_note: impl Into<String>,
84 ) -> Self {
85 self.shared_embedder = Some(SharedEmbedderConfig::new(embedder, provider_note));
86 self
87 }
88
89 fn recall_planner(&self) -> RecallPlanner {
90 if let Some(shared_embedder) = &self.shared_embedder {
91 RecallPlanner::with_shared_embedder(
92 self.engine_config.recall_planning_profile,
93 self.engine_config.graph_expansion_max_hops,
94 self.engine_config.recall_scorer_kind,
95 self.engine_config.recall_scoring_profile,
96 self.engine_config.recall_policy_profile,
97 Arc::clone(&shared_embedder.embedder),
98 shared_embedder.provider_note.clone(),
99 )
100 } else {
101 RecallPlanner::from_engine_config(&self.engine_config)
102 }
103 }
104}
105
106#[derive(Debug)]
107pub struct FileMemoryStore {
108 config: FileStoreConfig,
109}
110
111#[derive(Debug, Clone, Serialize, Deserialize)]
112struct StoredRecord {
113 record: MemoryRecord,
114 idempotency_key: Option<String>,
115}
116
117#[derive(Debug, Clone)]
118struct IdempotencyMapping {
119 scoped_key: String,
120 record_id: String,
121}
122
123type ScopeKeyParts = (
124 String,
125 String,
126 String,
127 Option<String>,
128 Option<String>,
129 String,
130);
131
132#[derive(Debug, Clone, Copy, Default)]
133struct IntegritySummary {
134 scanned_records: u64,
135 scanned_idempotency_keys: u64,
136 stale_idempotency_keys: u64,
137 missing_idempotency_keys: u64,
138 duplicate_active_records: u64,
139}
140
141#[derive(Debug, Clone, Copy, Default)]
142struct RelativeTemporalBounds {
143 after_unix_ms: Option<u64>,
144 before_unix_ms: Option<u64>,
145}
146
147const PORTABLE_PACKAGE_VERSION: u32 = 1;
148
149impl FileMemoryStore {
150 pub fn open(config: FileStoreConfig) -> Result<Self> {
151 fs::create_dir_all(Self::records_dir(&config.data_dir)).map_err(|err| {
152 Error::Backend(format!(
153 "failed to create records dir {}: {err}",
154 Self::records_dir(&config.data_dir).display()
155 ))
156 })?;
157 fs::create_dir_all(Self::idempotency_dir(&config.data_dir)).map_err(|err| {
158 Error::Backend(format!(
159 "failed to create idempotency dir {}: {err}",
160 Self::idempotency_dir(&config.data_dir).display()
161 ))
162 })?;
163 Ok(Self { config })
164 }
165
166 fn records_dir(data_dir: &Path) -> PathBuf {
167 data_dir.join("records")
168 }
169
170 fn idempotency_dir(data_dir: &Path) -> PathBuf {
171 data_dir.join("idempotency")
172 }
173
174 fn record_path(&self, record_id: &str) -> PathBuf {
175 Self::records_dir(&self.config.data_dir).join(format!("{}.json", hex_key(record_id)))
176 }
177
178 fn idempotency_scope_key(scope: &MemoryScope, key: &str) -> String {
179 format!(
180 "{}\u{1f}{}\u{1f}{}\u{1f}{}\u{1f}{}\u{1f}{}",
181 scope.tenant_id,
182 scope.namespace,
183 scope.actor_id,
184 scope.conversation_id.as_deref().unwrap_or(""),
185 scope.session_id.as_deref().unwrap_or(""),
186 key
187 )
188 }
189
190 fn idempotency_path(&self, scope: &MemoryScope, key: &str) -> PathBuf {
191 let scoped = Self::idempotency_scope_key(scope, key);
192 Self::idempotency_dir(&self.config.data_dir).join(format!("{}.txt", hex_key(&scoped)))
193 }
194
195 fn validate_record(&self, record: &MemoryRecord) -> Result<()> {
196 record.validate()
197 }
198
199 fn validate_upsert_request(&self, request: &UpsertRequest) -> Result<()> {
200 self.validate_record(&request.record)?;
201 if request.idempotency_key.is_none()
202 && self
203 .config
204 .engine_config
205 .ingestion
206 .idempotent_writes_required
207 {
208 return Err(Error::InvalidRequest(
209 "idempotency_key is required by the current ingestion policy".to_string(),
210 ));
211 }
212 if self.config.engine_config.ingestion.require_source_labels
213 && request.record.scope.labels.is_empty()
214 {
215 return Err(Error::InvalidRequest(
216 "at least one source label is required by the current ingestion policy".to_string(),
217 ));
218 }
219 Ok(())
220 }
221
222 fn validate_delete_request(&self, request: &DeleteRequest) -> Result<()> {
223 if request.tenant_id.trim().is_empty() {
224 return Err(Error::InvalidRequest(
225 "delete tenant_id is required".to_string(),
226 ));
227 }
228 if request.namespace.trim().is_empty() {
229 return Err(Error::InvalidRequest(
230 "delete namespace is required".to_string(),
231 ));
232 }
233 if request.record_id.trim().is_empty() {
234 return Err(Error::InvalidRequest(
235 "delete record_id is required".to_string(),
236 ));
237 }
238 if request.audit_reason.trim().is_empty() {
239 return Err(Error::InvalidRequest(
240 "delete audit_reason is required".to_string(),
241 ));
242 }
243 Ok(())
244 }
245
246 fn validate_archive_request(&self, request: &ArchiveRequest) -> Result<()> {
247 Self::validate_lifecycle_request(
248 "archive",
249 &request.tenant_id,
250 &request.namespace,
251 &request.record_id,
252 &request.audit_reason,
253 )
254 }
255
256 fn validate_suppress_request(&self, request: &SuppressRequest) -> Result<()> {
257 Self::validate_lifecycle_request(
258 "suppress",
259 &request.tenant_id,
260 &request.namespace,
261 &request.record_id,
262 &request.audit_reason,
263 )
264 }
265
266 fn validate_recover_request(&self, request: &RecoverRequest) -> Result<()> {
267 Self::validate_lifecycle_request(
268 "recover",
269 &request.tenant_id,
270 &request.namespace,
271 &request.record_id,
272 &request.audit_reason,
273 )?;
274 match request.quality_state {
275 MemoryQualityState::Active | MemoryQualityState::Verified => {}
276 _ => {
277 return Err(Error::InvalidRequest(
278 "recover quality_state must be Active or Verified".to_string(),
279 ));
280 }
281 }
282 if matches!(
283 request.historical_state,
284 Some(MemoryHistoricalState::Superseded)
285 ) {
286 return Err(Error::InvalidRequest(
287 "recover historical_state cannot be Superseded".to_string(),
288 ));
289 }
290 Ok(())
291 }
292
293 fn validate_lifecycle_request(
294 action: &str,
295 tenant_id: &str,
296 namespace: &str,
297 record_id: &str,
298 audit_reason: &str,
299 ) -> Result<()> {
300 if tenant_id.trim().is_empty() {
301 return Err(Error::InvalidRequest(format!(
302 "{action} tenant_id is required"
303 )));
304 }
305 if namespace.trim().is_empty() {
306 return Err(Error::InvalidRequest(format!(
307 "{action} namespace is required"
308 )));
309 }
310 if record_id.trim().is_empty() {
311 return Err(Error::InvalidRequest(format!(
312 "{action} record_id is required"
313 )));
314 }
315 if audit_reason.trim().is_empty() {
316 return Err(Error::InvalidRequest(format!(
317 "{action} audit_reason is required"
318 )));
319 }
320 Ok(())
321 }
322
323 fn validate_record_scope(
324 stored: &StoredRecord,
325 tenant_id: &str,
326 namespace: &str,
327 ) -> Result<()> {
328 if stored.record.scope.tenant_id != tenant_id {
329 return Err(Error::InvalidRequest(format!(
330 "record {} does not belong to tenant {}",
331 stored.record.id, tenant_id
332 )));
333 }
334 if stored.record.scope.namespace != namespace {
335 return Err(Error::InvalidRequest(format!(
336 "record {} does not belong to namespace {}",
337 stored.record.id, namespace
338 )));
339 }
340 Ok(())
341 }
342
343 fn validate_import_request(
344 &self,
345 request: &ImportRequest,
346 ) -> (u64, bool, Vec<ImportFailure>, Vec<PortableRecord>) {
347 let mut validated_records = 0u64;
348 let mut failures = Vec::new();
349 let mut entries = Vec::with_capacity(request.package.records.len());
350
351 if request.package.package_version != PORTABLE_PACKAGE_VERSION {
352 failures.push(ImportFailure {
353 record_id: None,
354 reason: format!(
355 "unsupported portable package version {}; expected {}",
356 request.package.package_version, PORTABLE_PACKAGE_VERSION
357 ),
358 });
359 }
360
361 if request.package.manifest.record_count != request.package.records.len() as u64 {
362 failures.push(ImportFailure {
363 record_id: None,
364 reason: format!(
365 "portable package manifest record_count={} does not match payload records={}",
366 request.package.manifest.record_count,
367 request.package.records.len()
368 ),
369 });
370 }
371
372 for entry in &request.package.records {
373 match self.validate_record(&entry.record) {
374 Ok(()) => {
375 validated_records += 1;
376 entries.push(entry.clone());
377 }
378 Err(error) => failures.push(ImportFailure {
379 record_id: Some(entry.record.id.clone()),
380 reason: error.to_string(),
381 }),
382 }
383 }
384
385 (validated_records, failures.is_empty(), failures, entries)
386 }
387
388 fn is_pinned(record: &MemoryRecord) -> bool {
389 record.scope.trust_level == MemoryTrustLevel::Pinned
390 }
391
392 fn retention_exempt(&self, record: &MemoryRecord) -> bool {
393 self.config.engine_config.retention.pinned_records_exempt && Self::is_pinned(record)
394 }
395
396 fn now_unix_ms() -> Result<u64> {
397 SystemTime::now()
398 .duration_since(UNIX_EPOCH)
399 .map_err(|err| Error::Backend(format!("system clock error: {err}")))
400 .map(|value| value.as_millis() as u64)
401 }
402
403 fn load_record(&self, record_id: &str) -> Result<Option<StoredRecord>> {
404 let path = self.record_path(record_id);
405 if !path.exists() {
406 return Ok(None);
407 }
408 let raw = fs::read(&path).map_err(|err| {
409 Error::Backend(format!("failed to read record {}: {err}", path.display()))
410 })?;
411 let stored = serde_json::from_slice::<StoredRecord>(&raw).map_err(|err| {
412 Error::Backend(format!("failed to decode record {}: {err}", path.display()))
413 })?;
414 Ok(Some(stored))
415 }
416
417 fn persist_record(&self, stored: &StoredRecord) -> Result<()> {
418 let path = self.record_path(&stored.record.id);
419 let encoded = serde_json::to_vec(stored)
420 .map_err(|err| Error::Backend(format!("failed to encode record: {err}")))?;
421 fs::write(&path, encoded).map_err(|err| {
422 Error::Backend(format!("failed to write record {}: {err}", path.display()))
423 })?;
424 Ok(())
425 }
426
427 fn persist_imported_record(&self, stored: &StoredRecord) -> Result<()> {
428 self.persist_record(stored)?;
429 if let Some(idempotency_key) = &stored.idempotency_key {
430 fs::write(
431 self.idempotency_path(&stored.record.scope, idempotency_key),
432 stored.record.id.as_bytes(),
433 )
434 .map_err(|err| Error::Backend(format!("failed to write idempotency mapping: {err}")))?;
435 }
436 Ok(())
437 }
438
439 fn remove_record(&self, record_id: &str) -> Result<bool> {
440 let path = self.record_path(record_id);
441 if !path.exists() {
442 return Ok(false);
443 }
444 fs::remove_file(&path).map_err(|err| {
445 Error::Backend(format!("failed to remove record {}: {err}", path.display()))
446 })?;
447 Ok(true)
448 }
449
450 fn remove_idempotency_mapping(&self, stored: &StoredRecord) -> Result<()> {
451 if let Some(idempotency_key) = &stored.idempotency_key {
452 let path = self.idempotency_path(&stored.record.scope, idempotency_key);
453 if path.exists() {
454 fs::remove_file(&path).map_err(|err| {
455 Error::Backend(format!(
456 "failed to remove idempotency mapping {}: {err}",
457 path.display()
458 ))
459 })?;
460 }
461 }
462 Ok(())
463 }
464
465 fn clear_all_records(&self) -> Result<()> {
466 for stored in self.iterate_records()? {
467 self.remove_record(&stored.record.id)?;
468 self.remove_idempotency_mapping(&stored)?;
469 }
470 Ok(())
471 }
472
473 fn iterate_records(&self) -> Result<Vec<StoredRecord>> {
474 let mut records = Vec::new();
475 let dir = Self::records_dir(&self.config.data_dir);
476 for entry in fs::read_dir(&dir).map_err(|err| {
477 Error::Backend(format!(
478 "failed to read records dir {}: {err}",
479 dir.display()
480 ))
481 })? {
482 let entry = entry.map_err(|err| {
483 Error::Backend(format!(
484 "failed to iterate records dir {}: {err}",
485 dir.display()
486 ))
487 })?;
488 let path = entry.path();
489 if path.extension().and_then(|ext| ext.to_str()) != Some("json") {
490 continue;
491 }
492 let raw = fs::read(&path).map_err(|err| {
493 Error::Backend(format!("failed to read record {}: {err}", path.display()))
494 })?;
495 let stored = serde_json::from_slice::<StoredRecord>(&raw).map_err(|err| {
496 Error::Backend(format!("failed to decode record {}: {err}", path.display()))
497 })?;
498 records.push(stored);
499 }
500 Ok(records)
501 }
502
503 fn iterate_idempotency_mappings(&self) -> Result<Vec<IdempotencyMapping>> {
504 let mut mappings = Vec::new();
505 for entry in fs::read_dir(Self::idempotency_dir(&self.config.data_dir)).map_err(|err| {
506 Error::Backend(format!(
507 "failed to read idempotency dir {}: {err}",
508 Self::idempotency_dir(&self.config.data_dir).display()
509 ))
510 })? {
511 let entry = entry.map_err(|err| {
512 Error::Backend(format!("failed to iterate idempotency dir: {err}"))
513 })?;
514 let path = entry.path();
515 if path.extension().and_then(|value| value.to_str()) != Some("txt") {
516 continue;
517 }
518 let Some(stem) = path.file_stem().and_then(|value| value.to_str()) else {
519 continue;
520 };
521 let Some(scoped_key) = unhex_key(stem) else {
522 continue;
523 };
524 let record_id = fs::read_to_string(&path).map_err(|err| {
525 Error::Backend(format!(
526 "failed to read idempotency mapping {}: {err}",
527 path.display()
528 ))
529 })?;
530 mappings.push(IdempotencyMapping {
531 scoped_key,
532 record_id,
533 });
534 }
535 Ok(mappings)
536 }
537
538 fn parse_scope_key(scoped_key: &str) -> Option<ScopeKeyParts> {
539 let parts = scoped_key.split('\u{1f}').collect::<Vec<_>>();
540 if parts.len() != 6 {
541 return None;
542 }
543 Some((
544 parts[0].to_string(),
545 parts[1].to_string(),
546 parts[2].to_string(),
547 (!parts[3].is_empty()).then(|| parts[3].to_string()),
548 (!parts[4].is_empty()).then(|| parts[4].to_string()),
549 parts[5].to_string(),
550 ))
551 }
552
553 fn scope_matches_filters(
554 tenant_id: &str,
555 namespace: &str,
556 tenant_filter: Option<&str>,
557 namespace_filter: Option<&str>,
558 ) -> bool {
559 tenant_filter.is_none_or(|expected| tenant_id == expected)
560 && namespace_filter.is_none_or(|expected| namespace == expected)
561 }
562
563 fn build_integrity_summary(
564 &self,
565 tenant_filter: Option<&str>,
566 namespace_filter: Option<&str>,
567 ) -> Result<IntegritySummary> {
568 let records = self.iterate_records()?;
569 let mappings = self.iterate_idempotency_mappings()?;
570 let filtered_records = records
571 .iter()
572 .filter(|stored| {
573 Self::scope_matches_filters(
574 &stored.record.scope.tenant_id,
575 &stored.record.scope.namespace,
576 tenant_filter,
577 namespace_filter,
578 )
579 })
580 .collect::<Vec<_>>();
581
582 let mut mapping_lookup = HashMap::new();
583 let mut stale_idempotency_keys = 0u64;
584 let mut scanned_idempotency_keys = 0u64;
585 for mapping in &mappings {
586 let Some((tenant_id, namespace, _, _, _, idempotency_key)) =
587 Self::parse_scope_key(&mapping.scoped_key)
588 else {
589 stale_idempotency_keys += 1;
590 scanned_idempotency_keys += 1;
591 continue;
592 };
593 if !Self::scope_matches_filters(&tenant_id, &namespace, tenant_filter, namespace_filter)
594 {
595 continue;
596 }
597 scanned_idempotency_keys += 1;
598 let Some(stored) = self.load_record(&mapping.record_id)? else {
599 stale_idempotency_keys += 1;
600 continue;
601 };
602 if stored.record.scope.tenant_id != tenant_id
603 || stored.record.scope.namespace != namespace
604 || stored.idempotency_key.as_deref() != Some(idempotency_key.as_str())
605 || Self::idempotency_scope_key(&stored.record.scope, &idempotency_key)
606 != mapping.scoped_key
607 {
608 stale_idempotency_keys += 1;
609 continue;
610 }
611 mapping_lookup.insert(mapping.scoped_key.clone(), mapping.record_id.clone());
612 }
613
614 let mut duplicate_groups = HashMap::<String, usize>::new();
615 let mut missing_idempotency_keys = 0u64;
616 let mut duplicate_active_records = 0u64;
617 for stored in &filtered_records {
618 if let Some(idempotency_key) = &stored.idempotency_key {
619 let scoped_key = Self::idempotency_scope_key(&stored.record.scope, idempotency_key);
620 if mapping_lookup.get(&scoped_key) != Some(&stored.record.id) {
621 missing_idempotency_keys += 1;
622 }
623 }
624
625 if !matches!(
626 stored.record.quality_state,
627 MemoryQualityState::Archived
628 | MemoryQualityState::Deleted
629 | MemoryQualityState::Suppressed
630 ) {
631 *duplicate_groups
632 .entry(Self::dedup_signature(&stored.record))
633 .or_default() += 1;
634 }
635 }
636
637 for group_size in duplicate_groups.into_values() {
638 if group_size > 1 {
639 duplicate_active_records += (group_size - 1) as u64;
640 }
641 }
642
643 Ok(IntegritySummary {
644 scanned_records: filtered_records.len() as u64,
645 scanned_idempotency_keys,
646 stale_idempotency_keys,
647 missing_idempotency_keys,
648 duplicate_active_records,
649 })
650 }
651
652 fn build_stats_report(&self, request: &StoreStatsRequest) -> Result<StoreStatsReport> {
653 let records = self.iterate_records()?;
654 let tenant_filter = request.tenant_id.as_deref();
655 let namespace_filter = request.namespace.as_deref();
656 let filtered_records = records
657 .iter()
658 .filter(|stored| {
659 Self::scope_matches_filters(
660 &stored.record.scope.tenant_id,
661 &stored.record.scope.namespace,
662 tenant_filter,
663 namespace_filter,
664 )
665 })
666 .collect::<Vec<_>>();
667 let now_unix_ms = Self::now_unix_ms()?;
668 let integrity = self.build_integrity_summary(tenant_filter, namespace_filter)?;
669 let mut namespace_map = BTreeMap::<(String, String), NamespaceStats>::new();
670 let mut duplicate_groups = HashMap::<String, usize>::new();
671 let mut tombstoned_records = 0u64;
672 let mut expired_records = 0u64;
673 let mut historical_records = 0u64;
674 let mut superseded_records = 0u64;
675 let mut lineage_links = 0u64;
676
677 for stored in &filtered_records {
678 let key = (
679 stored.record.scope.tenant_id.clone(),
680 stored.record.scope.namespace.clone(),
681 );
682 let entry = namespace_map
683 .entry(key.clone())
684 .or_insert_with(|| NamespaceStats {
685 tenant_id: key.0.clone(),
686 namespace: key.1.clone(),
687 active_records: 0,
688 archived_records: 0,
689 deleted_records: 0,
690 suppressed_records: 0,
691 pinned_records: 0,
692 });
693 match stored.record.quality_state {
694 MemoryQualityState::Archived => entry.archived_records += 1,
695 MemoryQualityState::Deleted => {
696 entry.deleted_records += 1;
697 tombstoned_records += 1;
698 }
699 MemoryQualityState::Suppressed => entry.suppressed_records += 1,
700 _ => entry.active_records += 1,
701 }
702 if stored.record.scope.trust_level == MemoryTrustLevel::Pinned {
703 entry.pinned_records += 1;
704 }
705 if stored
706 .record
707 .expires_at_unix_ms
708 .is_some_and(|value| value <= now_unix_ms)
709 {
710 expired_records += 1;
711 }
712 if matches!(
713 stored.record.historical_state,
714 MemoryHistoricalState::Historical
715 ) {
716 historical_records += 1;
717 }
718 if matches!(
719 stored.record.historical_state,
720 MemoryHistoricalState::Superseded
721 ) {
722 superseded_records += 1;
723 }
724 lineage_links += stored.record.lineage.len() as u64;
725 if !matches!(
726 stored.record.quality_state,
727 MemoryQualityState::Archived
728 | MemoryQualityState::Deleted
729 | MemoryQualityState::Suppressed
730 ) {
731 *duplicate_groups
732 .entry(Self::dedup_signature(&stored.record))
733 .or_default() += 1;
734 }
735 }
736
737 let mut duplicate_candidate_groups = 0u64;
738 let mut duplicate_candidate_records = 0u64;
739 for group_size in duplicate_groups.into_values() {
740 if group_size > 1 {
741 duplicate_candidate_groups += 1;
742 duplicate_candidate_records += (group_size - 1) as u64;
743 }
744 }
745
746 Ok(StoreStatsReport {
747 generated_at_unix_ms: now_unix_ms,
748 total_records: filtered_records.len() as u64,
749 storage_bytes: dir_size(&self.config.data_dir)?,
750 namespaces: namespace_map.into_values().collect(),
751 maintenance: MaintenanceStats {
752 duplicate_candidate_groups,
753 duplicate_candidate_records,
754 tombstoned_records,
755 expired_records,
756 stale_idempotency_keys: integrity.stale_idempotency_keys,
757 historical_records,
758 superseded_records,
759 lineage_links,
760 },
761 engine: self.config.engine_config.tuning_info(),
762 })
763 }
764
765 fn matches_scope(candidate: &MemoryScope, query: &MemoryScope) -> bool {
766 candidate.tenant_id == query.tenant_id
767 && candidate.namespace == query.namespace
768 && candidate.actor_id == query.actor_id
769 && (query.conversation_id.is_none()
770 || candidate.conversation_id == query.conversation_id)
771 && (query.session_id.is_none() || candidate.session_id == query.session_id)
772 }
773
774 fn record_passes_filters(
775 record: &MemoryRecord,
776 query: &RecallQuery,
777 relative_bounds: RelativeTemporalBounds,
778 ) -> bool {
779 if !Self::matches_scope(&record.scope, &query.scope) {
780 return false;
781 }
782
783 if let Some(expires_at_unix_ms) = record.expires_at_unix_ms
784 && expires_at_unix_ms <= Self::now_unix_ms().unwrap_or(u64::MAX)
785 {
786 return false;
787 }
788
789 if let Some(min_score) = query.filters.min_importance_score
790 && record.importance_score < min_score
791 {
792 return false;
793 }
794
795 if let Some(source) = &query.filters.source
796 && &record.scope.source != source
797 {
798 return false;
799 }
800
801 if let Some(from_unix_ms) = query.filters.from_unix_ms
802 && record.updated_at_unix_ms < from_unix_ms
803 {
804 return false;
805 }
806
807 if let Some(to_unix_ms) = query.filters.to_unix_ms
808 && record.updated_at_unix_ms > to_unix_ms
809 {
810 return false;
811 }
812
813 if let Some(after_unix_ms) = relative_bounds.after_unix_ms
814 && Self::record_temporal_anchor(record) <= after_unix_ms
815 {
816 return false;
817 }
818
819 if let Some(before_unix_ms) = relative_bounds.before_unix_ms
820 && Self::record_temporal_anchor(record) >= before_unix_ms
821 {
822 return false;
823 }
824
825 if !query.filters.trust_levels.is_empty()
826 && !query
827 .filters
828 .trust_levels
829 .contains(&record.scope.trust_level)
830 {
831 return false;
832 }
833
834 if !query.filters.required_labels.is_empty()
835 && !query.filters.required_labels.iter().all(|label| {
836 record
837 .scope
838 .labels
839 .iter()
840 .any(|candidate| candidate == label)
841 })
842 {
843 return false;
844 }
845
846 if !query.filters.kinds.is_empty() && !query.filters.kinds.contains(&record.kind) {
847 return false;
848 }
849
850 if let Some(episode_id) = &query.filters.episode_id
851 && record.episode.as_ref().map(|episode| &episode.episode_id) != Some(episode_id)
852 {
853 return false;
854 }
855
856 if !query.filters.continuity_states.is_empty()
857 && !record.episode.as_ref().is_some_and(|episode| {
858 query
859 .filters
860 .continuity_states
861 .contains(&episode.continuity_state)
862 })
863 {
864 return false;
865 }
866
867 if query.filters.unresolved_only
868 && !record
869 .episode
870 .as_ref()
871 .is_some_and(|episode| episode.continuity_state.is_unresolved())
872 {
873 return false;
874 }
875
876 if let Some(lineage_record_id) = &query.filters.lineage_record_id
877 && record.id != *lineage_record_id
878 && !record
879 .lineage
880 .iter()
881 .any(|link| &link.record_id == lineage_record_id)
882 {
883 return false;
884 }
885
886 if !query.filters.boundary_labels.is_empty()
887 && !record.episode.as_ref().is_some_and(|episode| {
888 episode.boundary_label.as_ref().is_some_and(|label| {
889 query
890 .filters
891 .boundary_labels
892 .iter()
893 .any(|expected| expected == label)
894 })
895 })
896 {
897 return false;
898 }
899
900 if let Some(recurrence_key) = &query.filters.recurrence_key
901 && record
902 .episode
903 .as_ref()
904 .and_then(|episode| episode.recurrence_key.as_ref())
905 != Some(recurrence_key)
906 {
907 return false;
908 }
909
910 if !query.filters.conflict_states.is_empty()
911 && !record
912 .conflict
913 .as_ref()
914 .is_some_and(|conflict| query.filters.conflict_states.contains(&conflict.state))
915 {
916 return false;
917 }
918
919 if !query.filters.resolution_kinds.is_empty()
920 && !record.conflict.as_ref().is_some_and(|conflict| {
921 query
922 .filters
923 .resolution_kinds
924 .contains(&conflict.resolution)
925 })
926 {
927 return false;
928 }
929
930 if query.filters.unresolved_conflicts_only
931 && !record.conflict.as_ref().is_some_and(|conflict| {
932 matches!(
933 conflict.state,
934 mnemara_core::ConflictReviewState::PotentialConflict
935 | mnemara_core::ConflictReviewState::UnderReview
936 )
937 })
938 {
939 return false;
940 }
941
942 match query.filters.historical_mode {
943 RecallHistoricalMode::CurrentOnly => {
944 if !matches!(record.historical_state, MemoryHistoricalState::Current) {
945 return false;
946 }
947 }
948 RecallHistoricalMode::HistoricalOnly => {
949 if matches!(record.historical_state, MemoryHistoricalState::Current) {
950 return false;
951 }
952 }
953 RecallHistoricalMode::IncludeHistorical => {}
954 }
955
956 if !query.filters.states.is_empty() {
957 if !query.filters.states.contains(&record.quality_state) {
958 return false;
959 }
960 } else {
961 match record.quality_state {
962 MemoryQualityState::Archived if !query.filters.include_archived => return false,
963 MemoryQualityState::Deleted | MemoryQualityState::Suppressed => return false,
964 _ => {}
965 }
966 }
967
968 true
969 }
970
971 fn relative_temporal_bounds(
972 records: &[StoredRecord],
973 query: &RecallQuery,
974 ) -> Result<RelativeTemporalBounds> {
975 let mut bounds = RelativeTemporalBounds::default();
976 if let Some(after_record_id) = &query.filters.after_record_id {
977 let Some(anchor) = records
978 .iter()
979 .find(|stored| {
980 stored.record.id == *after_record_id
981 && Self::matches_scope(&stored.record.scope, &query.scope)
982 })
983 .map(|stored| Self::record_temporal_anchor(&stored.record))
984 else {
985 return Err(Error::InvalidRequest(format!(
986 "after_record_id '{after_record_id}' was not found in recall scope"
987 )));
988 };
989 bounds.after_unix_ms = Some(anchor);
990 }
991 if let Some(before_record_id) = &query.filters.before_record_id {
992 let Some(anchor) = records
993 .iter()
994 .find(|stored| {
995 stored.record.id == *before_record_id
996 && Self::matches_scope(&stored.record.scope, &query.scope)
997 })
998 .map(|stored| Self::record_temporal_anchor(&stored.record))
999 else {
1000 return Err(Error::InvalidRequest(format!(
1001 "before_record_id '{before_record_id}' was not found in recall scope"
1002 )));
1003 };
1004 bounds.before_unix_ms = Some(anchor);
1005 }
1006 if let (Some(after), Some(before)) = (bounds.after_unix_ms, bounds.before_unix_ms)
1007 && after >= before
1008 {
1009 return Err(Error::InvalidRequest(
1010 "after_record_id must refer to an earlier record than before_record_id".to_string(),
1011 ));
1012 }
1013 Ok(bounds)
1014 }
1015
1016 fn approximate_tokens(record: &MemoryRecord) -> usize {
1017 let content_tokens = record.content.split_whitespace().count();
1018 let summary_tokens = record
1019 .summary
1020 .as_deref()
1021 .map(|summary| summary.split_whitespace().count())
1022 .unwrap_or(0);
1023 content_tokens + summary_tokens
1024 }
1025
1026 fn record_temporal_anchor(record: &MemoryRecord) -> u64 {
1027 record
1028 .episode
1029 .as_ref()
1030 .and_then(|episode| episode.last_active_unix_ms.or(episode.started_at_unix_ms))
1031 .unwrap_or(record.updated_at_unix_ms)
1032 }
1033
1034 fn selected_channels_for_hit(hit: &RecallHit, empty_query: bool) -> Vec<String> {
1035 let mut selected_channels = if empty_query {
1036 vec!["temporal".to_string(), "policy".to_string()]
1037 } else {
1038 vec!["lexical".to_string(), "policy".to_string()]
1039 };
1040 if hit.breakdown.semantic > 0.0 {
1041 selected_channels.push("semantic".to_string());
1042 }
1043 if hit.breakdown.metadata > 0.0 {
1044 selected_channels.push("metadata".to_string());
1045 }
1046 if hit.breakdown.episodic > 0.0 {
1047 selected_channels.push("episodic".to_string());
1048 }
1049 if hit.breakdown.salience > 0.0 {
1050 selected_channels.push("salience".to_string());
1051 }
1052 if hit.breakdown.curation > 0.0 {
1053 selected_channels.push("curation".to_string());
1054 }
1055 if hit.record.conflict.is_some() {
1056 selected_channels.push("conflict".to_string());
1057 }
1058 selected_channels.sort();
1059 selected_channels.dedup();
1060 selected_channels
1061 }
1062
1063 fn planning_profile_note(profile: RecallPlanningProfile) -> &'static str {
1064 match profile {
1065 RecallPlanningProfile::FastPath => "planning_profile=fast_path",
1066 RecallPlanningProfile::ContinuityAware => "planning_profile=continuity_aware",
1067 }
1068 }
1069
1070 fn dedup_signature(record: &MemoryRecord) -> String {
1071 format!(
1072 "{}\u{1f}{}\u{1f}{}\u{1f}{}\u{1f}{}\u{1f}{}",
1073 record.scope.tenant_id,
1074 record.scope.namespace,
1075 record.scope.actor_id,
1076 record.kind as u8,
1077 record.content.trim().to_ascii_lowercase(),
1078 record
1079 .summary
1080 .clone()
1081 .unwrap_or_default()
1082 .trim()
1083 .to_ascii_lowercase()
1084 )
1085 }
1086
1087 fn summary_record_id(signature: &str) -> String {
1088 let mut hasher = std::collections::hash_map::DefaultHasher::new();
1089 signature.hash(&mut hasher);
1090 format!("compacted-summary-{:016x}", hasher.finish())
1091 }
1092
1093 fn compaction_summary_record(
1094 group: &[StoredRecord],
1095 signature: &str,
1096 now_unix_ms: u64,
1097 ) -> StoredRecord {
1098 let canonical = &group[0].record;
1099 let representative_summary = canonical
1100 .summary
1101 .clone()
1102 .filter(|value| !value.trim().is_empty())
1103 .unwrap_or_else(|| canonical.content.clone());
1104 let cluster_size = group.len();
1105 let max_importance_score = group
1106 .iter()
1107 .map(|stored| stored.record.importance_score)
1108 .fold(canonical.importance_score, f32::max);
1109
1110 let mut metadata = BTreeMap::new();
1111 metadata.insert(
1112 "compaction_reason".to_string(),
1113 "duplicate_cluster_rollup".to_string(),
1114 );
1115 metadata.insert(
1116 "compaction_cluster_size".to_string(),
1117 cluster_size.to_string(),
1118 );
1119 metadata.insert("representative_record_id".to_string(), canonical.id.clone());
1120
1121 let mut labels = canonical.scope.labels.clone();
1122 if !labels.iter().any(|label| label == "compacted") {
1123 labels.push("compacted".to_string());
1124 }
1125
1126 StoredRecord {
1127 record: MemoryRecord {
1128 id: Self::summary_record_id(signature),
1129 scope: MemoryScope {
1130 tenant_id: canonical.scope.tenant_id.clone(),
1131 namespace: canonical.scope.namespace.clone(),
1132 actor_id: canonical.scope.actor_id.clone(),
1133 conversation_id: canonical.scope.conversation_id.clone(),
1134 session_id: canonical.scope.session_id.clone(),
1135 source: canonical.scope.source.clone(),
1136 labels,
1137 trust_level: canonical.scope.trust_level,
1138 },
1139 kind: mnemara_core::MemoryRecordKind::Summary,
1140 content: format!(
1141 "Compacted {} related records into a durable summary. Representative memory: {}",
1142 cluster_size, representative_summary
1143 ),
1144 summary: Some(format!(
1145 "{} related records: {}",
1146 cluster_size, representative_summary
1147 )),
1148 source_id: None,
1149 metadata,
1150 quality_state: if matches!(canonical.quality_state, MemoryQualityState::Verified) {
1151 MemoryQualityState::Verified
1152 } else {
1153 MemoryQualityState::Active
1154 },
1155 created_at_unix_ms: now_unix_ms,
1156 updated_at_unix_ms: now_unix_ms,
1157 expires_at_unix_ms: None,
1158 importance_score: max_importance_score,
1159 artifact: canonical.artifact.clone(),
1160 episode: canonical.episode.clone(),
1161 historical_state: MemoryHistoricalState::Current,
1162 lineage: group
1163 .iter()
1164 .map(|stored| LineageLink {
1165 record_id: stored.record.id.clone(),
1166 relation: LineageRelationKind::ConsolidatedFrom,
1167 confidence: 1.0,
1168 })
1169 .collect(),
1170 conflict: canonical.conflict.clone(),
1171 },
1172 idempotency_key: None,
1173 }
1174 }
1175
1176 fn cold_tiering_candidates(
1177 &self,
1178 tenant_id: &str,
1179 namespace: Option<&str>,
1180 now_unix_ms: u64,
1181 ) -> Result<Vec<StoredRecord>> {
1182 let cold_archive_after_days = self.config.engine_config.compaction.cold_archive_after_days;
1183 if cold_archive_after_days == 0 {
1184 return Ok(Vec::new());
1185 }
1186 let archive_threshold_ms =
1187 u64::from(cold_archive_after_days).saturating_mul(24 * 60 * 60 * 1_000);
1188 let max_importance = f32::from(
1189 self.config
1190 .engine_config
1191 .compaction
1192 .cold_archive_importance_threshold_per_mille,
1193 ) / 1000.0;
1194
1195 Ok(self
1196 .iterate_records()?
1197 .into_iter()
1198 .filter(|stored| stored.record.scope.tenant_id == tenant_id)
1199 .filter(|stored| namespace.is_none_or(|value| stored.record.scope.namespace == value))
1200 .filter(|stored| {
1201 matches!(
1202 stored.record.quality_state,
1203 MemoryQualityState::Draft
1204 | MemoryQualityState::Active
1205 | MemoryQualityState::Verified
1206 )
1207 })
1208 .filter(|stored| {
1209 stored.record.scope.trust_level != mnemara_core::MemoryTrustLevel::Pinned
1210 })
1211 .filter(|stored| {
1212 now_unix_ms.saturating_sub(stored.record.updated_at_unix_ms) > archive_threshold_ms
1213 && stored.record.importance_score <= max_importance
1214 })
1215 .collect())
1216 }
1217
1218 fn build_explanations(
1219 &self,
1220 scorer: ConfiguredRecallScorer,
1221 planning_profile: RecallPlanningProfile,
1222 query: &RecallQuery,
1223 planned: &[PlannedRecallCandidate],
1224 selected_record_ids: &[String],
1225 trace_id: &str,
1226 ) -> (Vec<RecallHit>, Option<RecallExplanation>) {
1227 let selected_set = selected_record_ids.iter().cloned().collect::<BTreeSet<_>>();
1228 let hits = planned
1229 .iter()
1230 .filter(|candidate| selected_set.contains(&candidate.hit.record.id))
1231 .map(|candidate| {
1232 let mut enriched = candidate.hit.clone();
1233 if query.include_explanation {
1234 let selected_channels = Self::selected_channels_for_hit(
1235 &candidate.hit,
1236 query.query_text.trim().is_empty(),
1237 );
1238 enriched.explanation = Some(RecallExplanation {
1239 selected_channels,
1240 policy_notes: vec![if query.query_text.trim().is_empty() {
1241 "recent_scope_scan".to_string()
1242 } else {
1243 "initial_file_backend_scoring".to_string()
1244 }],
1245 trace_id: Some(trace_id.to_string()),
1246 planning_trace: None,
1247 planning_profile: Some(planning_profile),
1248 policy_profile: Some(scorer.policy_profile()),
1249 scorer_kind: Some(scorer.scorer_kind()),
1250 scoring_profile: Some(scorer.scoring_profile()),
1251 });
1252 if let Some(explanation) = enriched.explanation.as_mut() {
1253 explanation
1254 .policy_notes
1255 .push(scorer.profile_note().to_string());
1256 explanation
1257 .policy_notes
1258 .push(scorer.policy_profile_note().to_string());
1259 explanation
1260 .policy_notes
1261 .push(Self::planning_profile_note(planning_profile).to_string());
1262 if let Some(note) = scorer.embedding_note() {
1263 explanation.policy_notes.push(note.to_string());
1264 }
1265 if query.filters.episode_id.is_some() {
1266 explanation
1267 .policy_notes
1268 .push("episode_filter_applied".to_string());
1269 }
1270 if query.filters.unresolved_only {
1271 explanation
1272 .policy_notes
1273 .push("unresolved_only_filter_applied".to_string());
1274 }
1275 explanation.policy_notes.push(format!(
1276 "matched_terms={}",
1277 candidate.matched_terms.join(",")
1278 ));
1279 }
1280 }
1281 enriched
1282 })
1283 .collect::<Vec<_>>();
1284
1285 let planning_trace = query.include_explanation.then(|| RecallPlanningTrace {
1286 trace_id: trace_id.to_string(),
1287 token_budget_applied: query.token_budget.is_some(),
1288 candidates: planned
1289 .iter()
1290 .enumerate()
1291 .map(|(index, candidate)| {
1292 let selected = selected_set.contains(&candidate.hit.record.id);
1293 let selection_rank = selected_record_ids
1294 .iter()
1295 .position(|record_id| record_id == &candidate.hit.record.id)
1296 .map(|position| position as u32 + 1);
1297 let selected_channels = Self::selected_channels_for_hit(
1298 &candidate.hit,
1299 query.query_text.trim().is_empty(),
1300 );
1301
1302 let mut filter_reasons = Vec::new();
1303 if selected {
1304 filter_reasons.push("retained".to_string());
1305 } else {
1306 if index >= query.max_items {
1307 filter_reasons.push("max_items_exhausted".to_string());
1308 }
1309 if query.token_budget.is_some() {
1310 filter_reasons.push("token_budget_exhausted".to_string());
1311 }
1312 }
1313
1314 RecallTraceCandidate {
1315 record_id: candidate.hit.record.id.clone(),
1316 kind: candidate.hit.record.kind,
1317 selected,
1318 planner_stage: candidate.planner_stage,
1319 candidate_sources: candidate.candidate_sources.clone(),
1320 selection_rank,
1321 matched_terms: candidate.matched_terms.clone(),
1322 selected_channels,
1323 filter_reasons,
1324 decision_reason: if selected {
1325 "selected_by_rank".to_string()
1326 } else if query.token_budget.is_some() {
1327 "excluded_by_rank_or_budget".to_string()
1328 } else {
1329 "excluded_by_rank".to_string()
1330 },
1331 breakdown: candidate.hit.breakdown.clone(),
1332 }
1333 })
1334 .collect(),
1335 });
1336
1337 let explanation = query.include_explanation.then(|| {
1338 let mut selected_channels = if query.query_text.trim().is_empty() {
1339 vec!["temporal".to_string(), "policy".to_string()]
1340 } else {
1341 vec!["lexical".to_string(), "policy".to_string()]
1342 };
1343 for channel in [
1344 "semantic", "metadata", "episodic", "salience", "curation", "conflict",
1345 ] {
1346 let present = planned.iter().any(|candidate| match channel {
1347 "semantic" => candidate.hit.breakdown.semantic > 0.0,
1348 "metadata" => candidate.hit.breakdown.metadata > 0.0,
1349 "episodic" => candidate.hit.breakdown.episodic > 0.0,
1350 "salience" => candidate.hit.breakdown.salience > 0.0,
1351 "curation" => candidate.hit.breakdown.curation > 0.0,
1352 "conflict" => candidate.hit.record.conflict.is_some(),
1353 _ => false,
1354 });
1355 if present && !selected_channels.iter().any(|existing| existing == channel) {
1356 selected_channels.push(channel.to_string());
1357 }
1358 }
1359 let mut policy_notes = vec![if query.query_text.trim().is_empty() {
1360 "recent_scope_scan".to_string()
1361 } else {
1362 "initial_file_backend_scoring".to_string()
1363 }];
1364 policy_notes.push(scorer.profile_note().to_string());
1365 policy_notes.push(scorer.policy_profile_note().to_string());
1366 policy_notes.push(Self::planning_profile_note(planning_profile).to_string());
1367 if let Some(note) = scorer.embedding_note() {
1368 policy_notes.push(note.to_string());
1369 }
1370 if query.filters.episode_id.is_some() {
1371 policy_notes.push("episode_filter_applied".to_string());
1372 }
1373 if query.filters.unresolved_only {
1374 policy_notes.push("unresolved_only_filter_applied".to_string());
1375 }
1376 if query.filters.before_record_id.is_some() || query.filters.after_record_id.is_some() {
1377 policy_notes.push("relative_temporal_filter_applied".to_string());
1378 }
1379 if !query.filters.boundary_labels.is_empty() || query.filters.recurrence_key.is_some() {
1380 policy_notes.push("episodic_boundary_filter_applied".to_string());
1381 }
1382 if !query.filters.conflict_states.is_empty()
1383 || !query.filters.resolution_kinds.is_empty()
1384 || query.filters.unresolved_conflicts_only
1385 {
1386 policy_notes.push("conflict_review_filter_applied".to_string());
1387 }
1388 RecallExplanation {
1389 selected_channels,
1390 policy_notes,
1391 trace_id: Some(trace_id.to_string()),
1392 planning_trace,
1393 planning_profile: Some(planning_profile),
1394 policy_profile: Some(scorer.policy_profile()),
1395 scorer_kind: Some(scorer.scorer_kind()),
1396 scoring_profile: Some(scorer.scoring_profile()),
1397 }
1398 });
1399
1400 (hits, explanation)
1401 }
1402
1403 fn apply_retention_for_namespace(&self, tenant_id: &str, namespace: &str) -> Result<()> {
1404 let now_unix_ms = Self::now_unix_ms()?;
1405 let retention = &self.config.engine_config.retention;
1406 let ttl_window_ms = u64::from(retention.ttl_days).saturating_mul(24 * 60 * 60 * 1_000);
1407 let archive_window_ms =
1408 u64::from(retention.archive_after_days).saturating_mul(24 * 60 * 60 * 1_000);
1409
1410 let mut namespace_records = self
1411 .iterate_records()?
1412 .into_iter()
1413 .filter(|stored| {
1414 stored.record.scope.tenant_id == tenant_id
1415 && stored.record.scope.namespace == namespace
1416 })
1417 .collect::<Vec<_>>();
1418
1419 for stored in &mut namespace_records {
1420 if self.retention_exempt(&stored.record) {
1421 continue;
1422 }
1423
1424 let expired_by_explicit_deadline = stored
1425 .record
1426 .expires_at_unix_ms
1427 .is_some_and(|expires_at| expires_at <= now_unix_ms);
1428 let expired_by_ttl = ttl_window_ms > 0
1429 && now_unix_ms.saturating_sub(stored.record.created_at_unix_ms) > ttl_window_ms;
1430
1431 if expired_by_explicit_deadline || expired_by_ttl {
1432 self.remove_record(&stored.record.id)?;
1433 self.remove_idempotency_mapping(stored)?;
1434 continue;
1435 }
1436
1437 let should_archive_by_age = archive_window_ms > 0
1438 && now_unix_ms.saturating_sub(stored.record.created_at_unix_ms) > archive_window_ms
1439 && matches!(
1440 stored.record.quality_state,
1441 MemoryQualityState::Draft
1442 | MemoryQualityState::Active
1443 | MemoryQualityState::Verified
1444 );
1445 if should_archive_by_age && stored.record.quality_state != MemoryQualityState::Archived
1446 {
1447 stored.record.quality_state = MemoryQualityState::Archived;
1448 stored.record.historical_state = MemoryHistoricalState::Historical;
1449 stored.record.updated_at_unix_ms = now_unix_ms;
1450 self.persist_record(stored)?;
1451 }
1452 }
1453
1454 if retention.max_records_per_namespace > 0 {
1455 let mut active = self
1456 .iterate_records()?
1457 .into_iter()
1458 .filter(|stored| {
1459 stored.record.scope.tenant_id == tenant_id
1460 && stored.record.scope.namespace == namespace
1461 && !self.retention_exempt(&stored.record)
1462 && matches!(
1463 stored.record.quality_state,
1464 MemoryQualityState::Draft
1465 | MemoryQualityState::Active
1466 | MemoryQualityState::Verified
1467 )
1468 })
1469 .collect::<Vec<_>>();
1470 if active.len() > retention.max_records_per_namespace {
1471 active.sort_by(|left, right| {
1472 left.record
1473 .updated_at_unix_ms
1474 .cmp(&right.record.updated_at_unix_ms)
1475 .then_with(|| {
1476 left.record
1477 .importance_score
1478 .total_cmp(&right.record.importance_score)
1479 })
1480 .then_with(|| left.record.id.cmp(&right.record.id))
1481 });
1482 let archive_count = active.len() - retention.max_records_per_namespace;
1483 for stored in active.iter_mut().take(archive_count) {
1484 stored.record.quality_state = MemoryQualityState::Archived;
1485 stored.record.historical_state = MemoryHistoricalState::Historical;
1486 stored.record.updated_at_unix_ms = now_unix_ms;
1487 self.persist_record(stored)?;
1488 }
1489 }
1490 }
1491 Ok(())
1492 }
1493}
1494
1495#[async_trait]
1496impl MemoryStore for FileMemoryStore {
1497 fn backend_kind(&self) -> &'static str {
1498 "file"
1499 }
1500
1501 async fn upsert(&self, request: UpsertRequest) -> Result<UpsertReceipt> {
1502 self.validate_upsert_request(&request)?;
1503
1504 if let Some(idempotency_key) = &request.idempotency_key {
1505 let path = self.idempotency_path(&request.record.scope, idempotency_key);
1506 if path.exists() {
1507 let existing_record_id = fs::read_to_string(&path).map_err(|err| {
1508 Error::Backend(format!(
1509 "failed to read idempotency mapping {}: {err}",
1510 path.display()
1511 ))
1512 })?;
1513 if existing_record_id != request.record.id {
1514 return Err(Error::Conflict(format!(
1515 "idempotency key already belongs to record {}",
1516 existing_record_id
1517 )));
1518 }
1519 if self.load_record(&existing_record_id)?.is_some() {
1520 return Ok(UpsertReceipt {
1521 record_id: existing_record_id,
1522 deduplicated: true,
1523 summary_refreshed: false,
1524 });
1525 }
1526 fs::remove_file(&path).map_err(|err| {
1527 Error::Backend(format!(
1528 "failed to clear stale idempotency mapping {}: {err}",
1529 path.display()
1530 ))
1531 })?;
1532 }
1533 }
1534
1535 let key = request.record.id.clone();
1536 let tenant_id = request.record.scope.tenant_id.clone();
1537 let namespace = request.record.scope.namespace.clone();
1538 let existing = self.load_record(&key)?;
1539 let deduplicated = existing.is_some();
1540 let stored = StoredRecord {
1541 record: request.record,
1542 idempotency_key: request.idempotency_key,
1543 };
1544 if let Some(existing) = existing
1545 && existing.idempotency_key != stored.idempotency_key
1546 {
1547 self.remove_idempotency_mapping(&existing)?;
1548 }
1549 self.persist_record(&stored)?;
1550 if let Some(idempotency_key) = &stored.idempotency_key {
1551 fs::write(
1552 self.idempotency_path(&stored.record.scope, idempotency_key),
1553 key.as_bytes(),
1554 )
1555 .map_err(|err| Error::Backend(format!("failed to write idempotency mapping: {err}")))?;
1556 }
1557 self.apply_retention_for_namespace(&tenant_id, &namespace)?;
1558 Ok(UpsertReceipt {
1559 record_id: key,
1560 deduplicated,
1561 summary_refreshed: false,
1562 })
1563 }
1564
1565 async fn batch_upsert(&self, request: BatchUpsertRequest) -> Result<Vec<UpsertReceipt>> {
1566 if request.requests.len() > self.config.engine_config.max_batch_size {
1567 return Err(Error::InvalidRequest(format!(
1568 "batch size {} exceeds configured max_batch_size {}",
1569 request.requests.len(),
1570 self.config.engine_config.max_batch_size
1571 )));
1572 }
1573 for request in &request.requests {
1574 self.validate_upsert_request(request)?;
1575 }
1576 let mut receipts = Vec::with_capacity(request.requests.len());
1577 for request in request.requests {
1578 receipts.push(self.upsert(request).await?);
1579 }
1580 Ok(receipts)
1581 }
1582
1583 async fn recall(&self, query: RecallQuery) -> Result<RecallResult> {
1584 let empty_query = query.query_text.trim().is_empty();
1585 let planner = self.config.recall_planner();
1586 let scorer = planner.scorer();
1587 let planning_profile = planner.effective_profile(&query);
1588 let stored_records = self.iterate_records()?;
1589 let relative_bounds = Self::relative_temporal_bounds(&stored_records, &query)?;
1590 let records = stored_records
1591 .into_iter()
1592 .filter(|stored| Self::record_passes_filters(&stored.record, &query, relative_bounds))
1593 .map(|stored| stored.record)
1594 .collect::<Vec<_>>();
1595 let mut scored = planner.plan(&records, &query);
1596 match query.filters.temporal_order {
1597 RecallTemporalOrder::Relevance if empty_query => {
1598 scored.sort_by(|left, right| {
1599 Self::record_temporal_anchor(&right.hit.record)
1600 .cmp(&Self::record_temporal_anchor(&left.hit.record))
1601 .then_with(|| {
1602 right
1603 .hit
1604 .record
1605 .importance_score
1606 .total_cmp(&left.hit.record.importance_score)
1607 })
1608 .then_with(|| left.hit.record.id.cmp(&right.hit.record.id))
1609 });
1610 }
1611 RecallTemporalOrder::Relevance => {
1612 scored.sort_by(|left, right| {
1613 right
1614 .hit
1615 .breakdown
1616 .total
1617 .total_cmp(&left.hit.breakdown.total)
1618 .then_with(|| left.hit.record.id.cmp(&right.hit.record.id))
1619 });
1620 }
1621 RecallTemporalOrder::ChronologicalAsc => {
1622 scored.sort_by(|left, right| {
1623 Self::record_temporal_anchor(&left.hit.record)
1624 .cmp(&Self::record_temporal_anchor(&right.hit.record))
1625 .then_with(|| {
1626 right
1627 .hit
1628 .breakdown
1629 .total
1630 .total_cmp(&left.hit.breakdown.total)
1631 })
1632 .then_with(|| left.hit.record.id.cmp(&right.hit.record.id))
1633 });
1634 }
1635 RecallTemporalOrder::ChronologicalDesc => {
1636 scored.sort_by(|left, right| {
1637 Self::record_temporal_anchor(&right.hit.record)
1638 .cmp(&Self::record_temporal_anchor(&left.hit.record))
1639 .then_with(|| {
1640 right
1641 .hit
1642 .breakdown
1643 .total
1644 .total_cmp(&left.hit.breakdown.total)
1645 })
1646 .then_with(|| left.hit.record.id.cmp(&right.hit.record.id))
1647 });
1648 }
1649 }
1650
1651 let examined = scored.len();
1652 let mut selected_ids = Vec::with_capacity(query.max_items);
1653 let mut remaining_budget = query.token_budget.unwrap_or(usize::MAX);
1654 for candidate in &scored {
1655 if selected_ids.len() >= query.max_items {
1656 break;
1657 }
1658 let estimated_tokens = Self::approximate_tokens(&candidate.hit.record);
1659 if selected_ids.is_empty() || estimated_tokens <= remaining_budget {
1660 remaining_budget = remaining_budget.saturating_sub(estimated_tokens);
1661 selected_ids.push(candidate.hit.record.id.clone());
1662 }
1663 }
1664
1665 let trace_id = format!(
1666 "recall:{}:{}:{}",
1667 query.scope.tenant_id, query.scope.namespace, examined
1668 );
1669 let (hits, explanation) = self.build_explanations(
1670 scorer,
1671 planning_profile,
1672 &query,
1673 &scored,
1674 &selected_ids,
1675 &trace_id,
1676 );
1677 Ok(RecallResult {
1678 hits,
1679 total_candidates_examined: examined,
1680 explanation,
1681 })
1682 }
1683
1684 async fn compact(&self, request: CompactionRequest) -> Result<CompactionReport> {
1685 if request.tenant_id.trim().is_empty() {
1686 return Err(Error::InvalidRequest(
1687 "compaction tenant_id is required".to_string(),
1688 ));
1689 }
1690
1691 let mut groups: HashMap<String, Vec<StoredRecord>> = HashMap::new();
1692 for stored in self.iterate_records()? {
1693 if stored.record.scope.tenant_id != request.tenant_id {
1694 continue;
1695 }
1696 if let Some(namespace) = &request.namespace
1697 && stored.record.scope.namespace != *namespace
1698 {
1699 continue;
1700 }
1701 if matches!(
1702 stored.record.quality_state,
1703 MemoryQualityState::Archived
1704 | MemoryQualityState::Deleted
1705 | MemoryQualityState::Suppressed
1706 ) {
1707 continue;
1708 }
1709 groups
1710 .entry(Self::dedup_signature(&stored.record))
1711 .or_default()
1712 .push(stored);
1713 }
1714
1715 let mut deduplicated_records = 0u64;
1716 let mut archived_records = 0u64;
1717 let mut summarized_clusters = 0u64;
1718 let mut superseded_records = 0u64;
1719 let mut lineage_links_created = 0u64;
1720 let now_unix_ms = Self::now_unix_ms()?;
1721 for group in groups.values_mut() {
1722 if group.len() < 2 {
1723 continue;
1724 }
1725 group.sort_by(|left, right| {
1726 right
1727 .record
1728 .updated_at_unix_ms
1729 .cmp(&left.record.updated_at_unix_ms)
1730 .then_with(|| {
1731 right
1732 .record
1733 .importance_score
1734 .total_cmp(&left.record.importance_score)
1735 })
1736 .then_with(|| left.record.id.cmp(&right.record.id))
1737 });
1738 let signature = Self::dedup_signature(&group[0].record);
1739 if self
1740 .config
1741 .engine_config
1742 .compaction
1743 .summarize_after_record_count
1744 > 0
1745 && group.len()
1746 >= self
1747 .config
1748 .engine_config
1749 .compaction
1750 .summarize_after_record_count
1751 {
1752 summarized_clusters += 1;
1753 lineage_links_created += group.len() as u64;
1754 if !request.dry_run {
1755 let summary = Self::compaction_summary_record(group, &signature, now_unix_ms);
1756 self.persist_record(&summary)?;
1757 }
1758 }
1759 for duplicate in group.iter_mut().skip(1) {
1760 deduplicated_records += 1;
1761 archived_records += 1;
1762 superseded_records += 1;
1763 if request.dry_run {
1764 continue;
1765 }
1766 duplicate.record.quality_state = MemoryQualityState::Archived;
1767 duplicate.record.historical_state = MemoryHistoricalState::Superseded;
1768 duplicate.record.lineage.push(LineageLink {
1769 record_id: Self::summary_record_id(&signature),
1770 relation: LineageRelationKind::SupersededBy,
1771 confidence: 1.0,
1772 });
1773 lineage_links_created += 1;
1774 duplicate.record.updated_at_unix_ms = Self::now_unix_ms()?;
1775 self.persist_record(duplicate)?;
1776 }
1777 }
1778
1779 for mut candidate in self.cold_tiering_candidates(
1780 &request.tenant_id,
1781 request.namespace.as_deref(),
1782 now_unix_ms,
1783 )? {
1784 archived_records += 1;
1785 if request.dry_run {
1786 continue;
1787 }
1788 candidate.record.quality_state = MemoryQualityState::Archived;
1789 candidate.record.historical_state = MemoryHistoricalState::Historical;
1790 candidate.record.updated_at_unix_ms = now_unix_ms;
1791 self.persist_record(&candidate)?;
1792 }
1793
1794 Ok(CompactionReport {
1795 deduplicated_records,
1796 archived_records,
1797 summarized_clusters,
1798 pruned_graph_edges: 0,
1799 superseded_records,
1800 lineage_links_created,
1801 dry_run: request.dry_run,
1802 })
1803 }
1804
1805 async fn delete(&self, request: DeleteRequest) -> Result<DeleteReceipt> {
1806 self.validate_delete_request(&request)?;
1807 let Some(stored) = self.load_record(&request.record_id)? else {
1808 return Ok(DeleteReceipt {
1809 record_id: request.record_id,
1810 tombstoned: false,
1811 hard_deleted: false,
1812 });
1813 };
1814 if stored.record.scope.tenant_id != request.tenant_id {
1815 return Err(Error::InvalidRequest(format!(
1816 "record {} does not belong to tenant {}",
1817 request.record_id, request.tenant_id
1818 )));
1819 }
1820 if stored.record.scope.namespace != request.namespace {
1821 return Err(Error::InvalidRequest(format!(
1822 "record {} does not belong to namespace {}",
1823 request.record_id, request.namespace
1824 )));
1825 }
1826
1827 if request.hard_delete {
1828 self.remove_record(&request.record_id)?;
1829 self.remove_idempotency_mapping(&stored)?;
1830 } else {
1831 let mut tombstone = stored;
1832 tombstone.record.quality_state = MemoryQualityState::Deleted;
1833 tombstone.record.updated_at_unix_ms = Self::now_unix_ms()?;
1834 self.persist_record(&tombstone)?;
1835 }
1836
1837 Ok(DeleteReceipt {
1838 record_id: request.record_id,
1839 tombstoned: !request.hard_delete,
1840 hard_deleted: request.hard_delete,
1841 })
1842 }
1843
1844 async fn archive(&self, request: ArchiveRequest) -> Result<ArchiveReceipt> {
1845 self.validate_archive_request(&request)?;
1846 let Some(mut stored) = self.load_record(&request.record_id)? else {
1847 return Err(Error::InvalidRequest(format!(
1848 "record {} was not found",
1849 request.record_id
1850 )));
1851 };
1852 Self::validate_record_scope(&stored, &request.tenant_id, &request.namespace)?;
1853
1854 let previous_quality_state = stored.record.quality_state;
1855 let previous_historical_state = stored.record.historical_state;
1856 let changed = previous_quality_state != MemoryQualityState::Archived
1857 || previous_historical_state == MemoryHistoricalState::Current;
1858 let historical_state = match previous_historical_state {
1859 MemoryHistoricalState::Current => MemoryHistoricalState::Historical,
1860 other => other,
1861 };
1862
1863 if changed && !request.dry_run {
1864 stored.record.quality_state = MemoryQualityState::Archived;
1865 stored.record.historical_state = historical_state;
1866 stored.record.updated_at_unix_ms = Self::now_unix_ms()?;
1867 self.persist_record(&stored)?;
1868 }
1869
1870 Ok(ArchiveReceipt {
1871 record_id: request.record_id,
1872 previous_quality_state,
1873 previous_historical_state,
1874 quality_state: MemoryQualityState::Archived,
1875 historical_state,
1876 changed,
1877 dry_run: request.dry_run,
1878 })
1879 }
1880
1881 async fn suppress(&self, request: SuppressRequest) -> Result<SuppressReceipt> {
1882 self.validate_suppress_request(&request)?;
1883 let Some(mut stored) = self.load_record(&request.record_id)? else {
1884 return Err(Error::InvalidRequest(format!(
1885 "record {} was not found",
1886 request.record_id
1887 )));
1888 };
1889 Self::validate_record_scope(&stored, &request.tenant_id, &request.namespace)?;
1890
1891 let previous_quality_state = stored.record.quality_state;
1892 let previous_historical_state = stored.record.historical_state;
1893 let changed = previous_quality_state != MemoryQualityState::Suppressed;
1894
1895 if changed && !request.dry_run {
1896 stored.record.quality_state = MemoryQualityState::Suppressed;
1897 stored.record.updated_at_unix_ms = Self::now_unix_ms()?;
1898 self.persist_record(&stored)?;
1899 }
1900
1901 Ok(SuppressReceipt {
1902 record_id: request.record_id,
1903 previous_quality_state,
1904 previous_historical_state,
1905 quality_state: MemoryQualityState::Suppressed,
1906 historical_state: previous_historical_state,
1907 changed,
1908 dry_run: request.dry_run,
1909 })
1910 }
1911
1912 async fn recover(&self, request: RecoverRequest) -> Result<RecoverReceipt> {
1913 self.validate_recover_request(&request)?;
1914 let Some(mut stored) = self.load_record(&request.record_id)? else {
1915 return Err(Error::InvalidRequest(format!(
1916 "record {} was not found",
1917 request.record_id
1918 )));
1919 };
1920 Self::validate_record_scope(&stored, &request.tenant_id, &request.namespace)?;
1921
1922 let previous_quality_state = stored.record.quality_state;
1923 let previous_historical_state = stored.record.historical_state;
1924 let historical_state = request
1925 .historical_state
1926 .unwrap_or(MemoryHistoricalState::Current);
1927 let changed = previous_quality_state != request.quality_state
1928 || previous_historical_state != historical_state;
1929
1930 if changed && !request.dry_run {
1931 stored.record.quality_state = request.quality_state;
1932 stored.record.historical_state = historical_state;
1933 stored.record.updated_at_unix_ms = Self::now_unix_ms()?;
1934 self.persist_record(&stored)?;
1935 }
1936
1937 Ok(RecoverReceipt {
1938 record_id: request.record_id,
1939 previous_quality_state,
1940 previous_historical_state,
1941 quality_state: request.quality_state,
1942 historical_state,
1943 changed,
1944 dry_run: request.dry_run,
1945 })
1946 }
1947
1948 async fn snapshot(&self) -> Result<SnapshotManifest> {
1949 let records = self.iterate_records()?;
1950 let namespaces = records
1951 .iter()
1952 .map(|stored| stored.record.scope.namespace.clone())
1953 .collect::<BTreeSet<_>>()
1954 .into_iter()
1955 .collect::<Vec<_>>();
1956 let created_at_unix_ms = Self::now_unix_ms()?;
1957 let storage_bytes = dir_size(&self.config.data_dir)?;
1958
1959 Ok(SnapshotManifest {
1960 snapshot_id: format!("snapshot-{created_at_unix_ms}"),
1961 created_at_unix_ms,
1962 namespaces,
1963 record_count: records.len() as u64,
1964 storage_bytes,
1965 engine: self.config.engine_config.tuning_info(),
1966 })
1967 }
1968
1969 async fn stats(&self, request: StoreStatsRequest) -> Result<StoreStatsReport> {
1970 self.build_stats_report(&request)
1971 }
1972
1973 async fn integrity_check(
1974 &self,
1975 request: IntegrityCheckRequest,
1976 ) -> Result<IntegrityCheckReport> {
1977 let summary = self
1978 .build_integrity_summary(request.tenant_id.as_deref(), request.namespace.as_deref())?;
1979 Ok(IntegrityCheckReport {
1980 generated_at_unix_ms: Self::now_unix_ms()?,
1981 healthy: summary.stale_idempotency_keys == 0
1982 && summary.missing_idempotency_keys == 0
1983 && summary.duplicate_active_records == 0,
1984 scanned_records: summary.scanned_records,
1985 scanned_idempotency_keys: summary.scanned_idempotency_keys,
1986 stale_idempotency_keys: summary.stale_idempotency_keys,
1987 missing_idempotency_keys: summary.missing_idempotency_keys,
1988 duplicate_active_records: summary.duplicate_active_records,
1989 })
1990 }
1991
1992 async fn repair(&self, request: RepairRequest) -> Result<RepairReport> {
1993 if request.reason.trim().is_empty() {
1994 return Err(Error::InvalidRequest(
1995 "repair reason is required".to_string(),
1996 ));
1997 }
1998 if !request.remove_stale_idempotency_keys && !request.rebuild_missing_idempotency_keys {
1999 return Err(Error::InvalidRequest(
2000 "repair requires at least one enabled action".to_string(),
2001 ));
2002 }
2003
2004 let tenant_filter = request.tenant_id.as_deref();
2005 let namespace_filter = request.namespace.as_deref();
2006 let summary = self.build_integrity_summary(tenant_filter, namespace_filter)?;
2007 let records = self.iterate_records()?;
2008 let mappings = self.iterate_idempotency_mappings()?;
2009 let mut removed_stale_idempotency_keys = 0u64;
2010 let mut rebuilt_missing_idempotency_keys = 0u64;
2011
2012 if request.remove_stale_idempotency_keys {
2013 for mapping in &mappings {
2014 let Some((tenant_id, namespace, _, _, _, idempotency_key)) =
2015 Self::parse_scope_key(&mapping.scoped_key)
2016 else {
2017 continue;
2018 };
2019 if !Self::scope_matches_filters(
2020 &tenant_id,
2021 &namespace,
2022 tenant_filter,
2023 namespace_filter,
2024 ) {
2025 continue;
2026 }
2027 let stale = match self.load_record(&mapping.record_id)? {
2028 Some(stored) => {
2029 stored.record.scope.tenant_id != tenant_id
2030 || stored.record.scope.namespace != namespace
2031 || stored.idempotency_key.as_deref() != Some(idempotency_key.as_str())
2032 || Self::idempotency_scope_key(&stored.record.scope, &idempotency_key)
2033 != mapping.scoped_key
2034 }
2035 None => true,
2036 };
2037 if stale {
2038 removed_stale_idempotency_keys += 1;
2039 if !request.dry_run {
2040 let path = Self::idempotency_dir(&self.config.data_dir)
2041 .join(format!("{}.txt", hex_key(&mapping.scoped_key)));
2042 if path.exists() {
2043 fs::remove_file(&path).map_err(|err| {
2044 Error::Backend(format!(
2045 "failed to remove stale idempotency mapping {}: {err}",
2046 path.display()
2047 ))
2048 })?;
2049 }
2050 }
2051 }
2052 }
2053 }
2054
2055 if request.rebuild_missing_idempotency_keys {
2056 let existing = self.iterate_idempotency_mappings()?;
2057 let existing_lookup = existing
2058 .into_iter()
2059 .map(|mapping| (mapping.scoped_key, mapping.record_id))
2060 .collect::<HashMap<_, _>>();
2061
2062 for stored in &records {
2063 if !Self::scope_matches_filters(
2064 &stored.record.scope.tenant_id,
2065 &stored.record.scope.namespace,
2066 tenant_filter,
2067 namespace_filter,
2068 ) {
2069 continue;
2070 }
2071 let Some(idempotency_key) = &stored.idempotency_key else {
2072 continue;
2073 };
2074 let scoped_key = Self::idempotency_scope_key(&stored.record.scope, idempotency_key);
2075 if existing_lookup.get(&scoped_key) == Some(&stored.record.id) {
2076 continue;
2077 }
2078 rebuilt_missing_idempotency_keys += 1;
2079 if !request.dry_run {
2080 fs::write(
2081 Self::idempotency_dir(&self.config.data_dir)
2082 .join(format!("{}.txt", hex_key(&scoped_key))),
2083 stored.record.id.as_bytes(),
2084 )
2085 .map_err(|err| {
2086 Error::Backend(format!("failed to rebuild idempotency mapping: {err}"))
2087 })?;
2088 }
2089 }
2090 }
2091
2092 let stale_after = if request.remove_stale_idempotency_keys {
2093 0
2094 } else {
2095 summary.stale_idempotency_keys
2096 };
2097 let missing_after = if request.rebuild_missing_idempotency_keys {
2098 0
2099 } else {
2100 summary.missing_idempotency_keys
2101 };
2102
2103 Ok(RepairReport {
2104 dry_run: request.dry_run,
2105 scanned_records: summary.scanned_records,
2106 scanned_idempotency_keys: summary.scanned_idempotency_keys,
2107 removed_stale_idempotency_keys,
2108 rebuilt_missing_idempotency_keys,
2109 healthy_after: stale_after == 0
2110 && missing_after == 0
2111 && summary.duplicate_active_records == 0,
2112 })
2113 }
2114
2115 async fn export(&self, request: ExportRequest) -> Result<PortableStorePackage> {
2116 let exported_at_unix_ms = Self::now_unix_ms()?;
2117 let mut namespaces = BTreeSet::new();
2118 let mut records = Vec::new();
2119 for stored in self.iterate_records()? {
2120 if request
2121 .tenant_id
2122 .as_deref()
2123 .is_some_and(|tenant_id| stored.record.scope.tenant_id != tenant_id)
2124 {
2125 continue;
2126 }
2127 if request
2128 .namespace
2129 .as_deref()
2130 .is_some_and(|namespace| stored.record.scope.namespace != namespace)
2131 {
2132 continue;
2133 }
2134 if !request.include_archived
2135 && stored.record.quality_state == MemoryQualityState::Archived
2136 {
2137 continue;
2138 }
2139 namespaces.insert(format!(
2140 "{}:{}",
2141 stored.record.scope.tenant_id, stored.record.scope.namespace
2142 ));
2143 records.push(PortableRecord {
2144 record: stored.record,
2145 idempotency_key: stored.idempotency_key,
2146 });
2147 }
2148
2149 let storage_bytes = records
2150 .iter()
2151 .map(|entry| {
2152 entry.record.content.len()
2153 + entry.record.summary.as_deref().map(str::len).unwrap_or(0)
2154 })
2155 .sum::<usize>() as u64;
2156
2157 Ok(PortableStorePackage {
2158 package_version: PORTABLE_PACKAGE_VERSION,
2159 exported_at_unix_ms,
2160 manifest: SnapshotManifest {
2161 snapshot_id: format!("portable-export-{exported_at_unix_ms}"),
2162 created_at_unix_ms: exported_at_unix_ms,
2163 namespaces: namespaces.into_iter().collect(),
2164 record_count: records.len() as u64,
2165 storage_bytes,
2166 engine: self.config.engine_config.tuning_info(),
2167 },
2168 records,
2169 })
2170 }
2171
2172 async fn import(&self, request: ImportRequest) -> Result<ImportReport> {
2173 let snapshot_id = request.package.manifest.snapshot_id.clone();
2174 let package_version = request.package.package_version;
2175 let (validated_records, compatible_package, failed_records, entries) =
2176 self.validate_import_request(&request);
2177 let apply_changes = compatible_package
2178 && failed_records.is_empty()
2179 && !request.dry_run
2180 && !matches!(request.mode, ImportMode::Validate);
2181 let mut imported_records = 0u64;
2182 let mut skipped_records = 0u64;
2183
2184 if apply_changes && matches!(request.mode, ImportMode::Replace) {
2185 self.clear_all_records()?;
2186 }
2187
2188 for entry in entries {
2189 if matches!(request.mode, ImportMode::Merge)
2190 && self.load_record(&entry.record.id)?.is_some()
2191 {
2192 skipped_records += 1;
2193 continue;
2194 }
2195 if apply_changes {
2196 self.persist_imported_record(&StoredRecord {
2197 record: entry.record,
2198 idempotency_key: entry.idempotency_key,
2199 })?;
2200 }
2201 imported_records += 1;
2202 }
2203
2204 Ok(ImportReport {
2205 mode: request.mode,
2206 dry_run: request.dry_run,
2207 applied: apply_changes,
2208 compatible_package,
2209 package_version,
2210 validated_records,
2211 imported_records,
2212 skipped_records,
2213 replaced_existing: matches!(request.mode, ImportMode::Replace),
2214 snapshot_id,
2215 failed_records,
2216 })
2217 }
2218}
2219
2220fn dir_size(path: &Path) -> Result<u64> {
2221 let mut total = 0u64;
2222 for entry in fs::read_dir(path)
2223 .map_err(|err| Error::Backend(format!("failed to read dir {}: {err}", path.display())))?
2224 {
2225 let entry = entry.map_err(|err| {
2226 Error::Backend(format!("failed to iterate dir {}: {err}", path.display()))
2227 })?;
2228 let entry_path = entry.path();
2229 if entry_path.is_dir() {
2230 total = total.saturating_add(dir_size(&entry_path)?);
2231 } else {
2232 total = total.saturating_add(
2233 entry
2234 .metadata()
2235 .map_err(|err| {
2236 Error::Backend(format!(
2237 "failed to stat file {}: {err}",
2238 entry_path.display()
2239 ))
2240 })?
2241 .len(),
2242 );
2243 }
2244 }
2245 Ok(total)
2246}
2247
2248fn hex_key(input: &str) -> String {
2249 let mut output = String::with_capacity(input.len() * 2);
2250 for byte in input.as_bytes() {
2251 output.push(nibble_to_hex(byte >> 4));
2252 output.push(nibble_to_hex(byte & 0x0f));
2253 }
2254 output
2255}
2256
2257fn nibble_to_hex(value: u8) -> char {
2258 match value {
2259 0..=9 => (b'0' + value) as char,
2260 10..=15 => (b'a' + (value - 10)) as char,
2261 _ => '0',
2262 }
2263}
2264
2265fn unhex_key(input: &str) -> Option<String> {
2266 if input.len() % 2 != 0 {
2267 return None;
2268 }
2269
2270 let mut bytes = Vec::with_capacity(input.len() / 2);
2271 let chars = input.as_bytes().chunks_exact(2);
2272 for chunk in chars {
2273 let high = hex_to_nibble(chunk[0] as char)?;
2274 let low = hex_to_nibble(chunk[1] as char)?;
2275 bytes.push((high << 4) | low);
2276 }
2277
2278 String::from_utf8(bytes).ok()
2279}
2280
2281fn hex_to_nibble(value: char) -> Option<u8> {
2282 match value {
2283 '0'..='9' => Some(value as u8 - b'0'),
2284 'a'..='f' => Some(value as u8 - b'a' + 10),
2285 'A'..='F' => Some(value as u8 - b'A' + 10),
2286 _ => None,
2287 }
2288}