1use std::collections::HashMap;
4use std::ffi::OsStr;
5use std::fs::OpenOptions;
6use std::io::Read as _;
7use std::path::{Path, PathBuf};
8use std::time::{Duration, Instant, SystemTime};
9
10use agentic_memory::{
11 AmemReader, AmemWriter, CognitiveEventBuilder, Edge, EdgeType, EventType, MemoryGraph,
12 QueryEngine, WriteEngine,
13};
14use serde_json::Value;
15
16use crate::types::{McpError, McpResult};
17
18const DEFAULT_AUTO_SAVE_SECS: u64 = 30;
20const DEFAULT_BACKUP_INTERVAL_SECS: u64 = 900;
22const DEFAULT_BACKUP_RETENTION: usize = 24;
24const DEFAULT_SLEEP_CYCLE_SECS: u64 = 1800;
26const DEFAULT_ARCHIVE_MIN_SESSION_NODES: usize = 25;
28const DEFAULT_HOT_MIN_DECAY: f32 = 0.7;
30const DEFAULT_WARM_MIN_DECAY: f32 = 0.3;
32const DEFAULT_SLA_MAX_MUTATIONS_PER_MIN: u32 = 240;
34const DEFAULT_HEALTH_LEDGER_EMIT_SECS: u64 = 30;
36const DEFAULT_STORAGE_BUDGET_BYTES: u64 = 2 * 1024 * 1024 * 1024;
38const DEFAULT_STORAGE_BUDGET_HORIZON_YEARS: u32 = 20;
40const DEFAULT_AUTO_CAPTURE_MAX_CHARS: usize = 2048;
42const CURRENT_AMEM_VERSION: u32 = 1;
44
45#[derive(Debug, Clone, Copy)]
46enum AutonomicProfile {
47 Desktop,
48 Cloud,
49 Aggressive,
50}
51
52#[derive(Debug, Clone, Copy, PartialEq, Eq)]
53enum StorageMigrationPolicy {
54 AutoSafe,
55 Strict,
56 Off,
57}
58
59#[derive(Debug, Clone, Copy, PartialEq, Eq)]
60enum StorageBudgetMode {
61 AutoRollup,
62 Warn,
63 Off,
64}
65
66#[derive(Debug, Clone, Copy, PartialEq, Eq)]
67enum AutoCaptureMode {
68 Safe,
70 Full,
72 Off,
74}
75
76#[derive(Debug, Clone, Copy)]
77struct ProfileDefaults {
78 auto_save_secs: u64,
79 backup_secs: u64,
80 backup_retention: usize,
81 sleep_cycle_secs: u64,
82 sleep_idle_secs: u64,
83 archive_min_session_nodes: usize,
84 hot_min_decay: f32,
85 warm_min_decay: f32,
86 sla_max_mutations_per_min: u32,
87}
88
89impl AutonomicProfile {
90 fn from_env(name: &str) -> Self {
91 let raw = read_env_string(name).unwrap_or_else(|| "desktop".to_string());
92 match raw.trim().to_ascii_lowercase().as_str() {
93 "cloud" => Self::Cloud,
94 "aggressive" => Self::Aggressive,
95 _ => Self::Desktop,
96 }
97 }
98
99 fn defaults(self) -> ProfileDefaults {
100 match self {
101 Self::Desktop => ProfileDefaults {
102 auto_save_secs: DEFAULT_AUTO_SAVE_SECS,
103 backup_secs: DEFAULT_BACKUP_INTERVAL_SECS,
104 backup_retention: DEFAULT_BACKUP_RETENTION,
105 sleep_cycle_secs: DEFAULT_SLEEP_CYCLE_SECS,
106 sleep_idle_secs: 180,
107 archive_min_session_nodes: DEFAULT_ARCHIVE_MIN_SESSION_NODES,
108 hot_min_decay: DEFAULT_HOT_MIN_DECAY,
109 warm_min_decay: DEFAULT_WARM_MIN_DECAY,
110 sla_max_mutations_per_min: DEFAULT_SLA_MAX_MUTATIONS_PER_MIN,
111 },
112 Self::Cloud => ProfileDefaults {
113 auto_save_secs: 15,
114 backup_secs: 600,
115 backup_retention: 48,
116 sleep_cycle_secs: 900,
117 sleep_idle_secs: 90,
118 archive_min_session_nodes: 50,
119 hot_min_decay: 0.75,
120 warm_min_decay: 0.4,
121 sla_max_mutations_per_min: 600,
122 },
123 Self::Aggressive => ProfileDefaults {
124 auto_save_secs: 10,
125 backup_secs: 300,
126 backup_retention: 16,
127 sleep_cycle_secs: 300,
128 sleep_idle_secs: 45,
129 archive_min_session_nodes: 15,
130 hot_min_decay: 0.8,
131 warm_min_decay: 0.5,
132 sla_max_mutations_per_min: 900,
133 },
134 }
135 }
136
137 fn as_str(self) -> &'static str {
138 match self {
139 Self::Desktop => "desktop",
140 Self::Cloud => "cloud",
141 Self::Aggressive => "aggressive",
142 }
143 }
144}
145
146impl StorageMigrationPolicy {
147 fn from_env(name: &str) -> Self {
148 let raw = read_env_string(name).unwrap_or_else(|| "auto-safe".to_string());
149 match raw.trim().to_ascii_lowercase().as_str() {
150 "strict" => Self::Strict,
151 "off" | "disabled" | "none" => Self::Off,
152 _ => Self::AutoSafe,
153 }
154 }
155
156 fn as_str(self) -> &'static str {
157 match self {
158 Self::AutoSafe => "auto-safe",
159 Self::Strict => "strict",
160 Self::Off => "off",
161 }
162 }
163}
164
165impl StorageBudgetMode {
166 fn from_env(name: &str) -> Self {
167 let raw = read_env_string(name).unwrap_or_else(|| "auto-rollup".to_string());
168 match raw.trim().to_ascii_lowercase().as_str() {
169 "warn" => Self::Warn,
170 "off" | "disabled" | "none" => Self::Off,
171 _ => Self::AutoRollup,
172 }
173 }
174
175 fn as_str(self) -> &'static str {
176 match self {
177 Self::AutoRollup => "auto-rollup",
178 Self::Warn => "warn",
179 Self::Off => "off",
180 }
181 }
182}
183
184impl AutoCaptureMode {
185 fn from_env(name: &str) -> Self {
186 let raw = read_env_string(name).unwrap_or_else(|| "safe".to_string());
187 match raw.trim().to_ascii_lowercase().as_str() {
188 "full" => Self::Full,
189 "off" | "disabled" | "none" => Self::Off,
190 _ => Self::Safe,
191 }
192 }
193
194 fn as_str(self) -> &'static str {
195 match self {
196 Self::Safe => "safe",
197 Self::Full => "full",
198 Self::Off => "off",
199 }
200 }
201}
202
203pub struct SessionManager {
205 graph: MemoryGraph,
206 query_engine: QueryEngine,
207 write_engine: WriteEngine,
208 file_path: PathBuf,
209 current_session: u32,
210 profile: AutonomicProfile,
211 migration_policy: StorageMigrationPolicy,
212 dirty: bool,
213 last_save: Instant,
214 auto_save_interval: Duration,
215 backup_interval: Duration,
216 backup_retention: usize,
217 backups_dir: PathBuf,
218 save_generation: u64,
219 last_backup_generation: u64,
220 last_backup: Instant,
221 sleep_cycle_interval: Duration,
222 archive_min_session_nodes: usize,
223 hot_min_decay: f32,
224 warm_min_decay: f32,
225 sla_max_mutations_per_min: u32,
226 last_sleep_cycle: Instant,
227 sleep_idle_min: Duration,
228 last_activity: Instant,
229 mutation_window_started: Instant,
230 mutation_window_count: u32,
231 maintenance_throttle_count: u64,
232 last_health_ledger_emit: Instant,
233 health_ledger_emit_interval: Duration,
234 storage_budget_mode: StorageBudgetMode,
235 storage_budget_max_bytes: u64,
236 storage_budget_horizon_years: u32,
237 storage_budget_target_fraction: f32,
238 storage_budget_rollup_count: u64,
239 auto_capture_mode: AutoCaptureMode,
240 auto_capture_redact: bool,
241 auto_capture_max_chars: usize,
242 auto_capture_count: u64,
243 last_temporal_node_id: Option<u64>,
246 last_file_mtime: Option<SystemTime>,
248 workspace_manager: super::workspace::WorkspaceManager,
250}
251
252impl SessionManager {
253 pub fn open(path: &str) -> McpResult<Self> {
255 let file_path = PathBuf::from(path);
256 let dimension = agentic_memory::DEFAULT_DIMENSION;
257 let file_existed = file_path.exists();
258 let profile = AutonomicProfile::from_env("AMEM_AUTONOMIC_PROFILE");
259 let defaults = profile.defaults();
260 let migration_policy = StorageMigrationPolicy::from_env("AMEM_STORAGE_MIGRATION_POLICY");
261 let detected_version = if file_existed {
262 read_storage_version(&file_path)
263 } else {
264 None
265 };
266 let legacy_version = detected_version.filter(|v| *v < CURRENT_AMEM_VERSION);
267
268 let graph = if file_existed {
269 tracing::info!("Opening existing memory file: {}", file_path.display());
270 AmemReader::read_from_file(&file_path)
271 .map_err(|e| McpError::AgenticMemory(format!("Failed to read memory file: {e}")))?
272 } else {
273 tracing::info!("Creating new memory file: {}", file_path.display());
274 if let Some(parent) = file_path.parent() {
276 std::fs::create_dir_all(parent).map_err(|e| {
277 McpError::Io(std::io::Error::other(format!(
278 "Failed to create directory {}: {e}",
279 parent.display()
280 )))
281 })?;
282 }
283 MemoryGraph::new(dimension)
284 };
285
286 let session_ids = graph.session_index().session_ids();
290 let max_existing = session_ids.iter().copied().max().unwrap_or(0);
291 let pid_component = std::process::id() % 1000;
292 let current_session = max_existing.saturating_add(1).saturating_add(pid_component);
293
294 tracing::info!(
295 "Session {} started. Graph has {} nodes, {} edges.",
296 current_session,
297 graph.node_count(),
298 graph.edge_count()
299 );
300 tracing::info!(
301 "Autonomic profile={} migration_policy={}",
302 profile.as_str(),
303 migration_policy.as_str()
304 );
305
306 let auto_save_secs = read_env_u64("AMEM_AUTOSAVE_SECS", defaults.auto_save_secs);
307 let backup_secs = read_env_u64("AMEM_AUTO_BACKUP_SECS", defaults.backup_secs).max(30);
308 let backup_retention =
309 read_env_usize("AMEM_AUTO_BACKUP_RETENTION", defaults.backup_retention).max(1);
310 let backups_dir = resolve_backups_dir(&file_path);
311 let sleep_cycle_secs =
312 read_env_u64("AMEM_SLEEP_CYCLE_SECS", defaults.sleep_cycle_secs).max(60);
313 let sleep_idle_secs =
314 read_env_u64("AMEM_SLEEP_IDLE_SECS", defaults.sleep_idle_secs).max(30);
315 let archive_min_session_nodes = read_env_usize(
316 "AMEM_ARCHIVE_MIN_SESSION_NODES",
317 defaults.archive_min_session_nodes,
318 )
319 .max(1);
320 let hot_min_decay =
321 read_env_f32("AMEM_TIER_HOT_MIN_DECAY", defaults.hot_min_decay).clamp(0.0, 1.0);
322 let warm_min_decay = read_env_f32("AMEM_TIER_WARM_MIN_DECAY", defaults.warm_min_decay)
323 .clamp(0.0, 1.0)
324 .min(hot_min_decay);
325 let sla_max_mutations_per_min = read_env_u32(
326 "AMEM_SLA_MAX_MUTATIONS_PER_MIN",
327 defaults.sla_max_mutations_per_min,
328 )
329 .max(1);
330 let health_ledger_emit_interval = Duration::from_secs(
331 read_env_u64(
332 "AMEM_HEALTH_LEDGER_EMIT_SECS",
333 DEFAULT_HEALTH_LEDGER_EMIT_SECS,
334 )
335 .max(5),
336 );
337 let storage_budget_mode = StorageBudgetMode::from_env("AMEM_STORAGE_BUDGET_MODE");
338 let storage_budget_max_bytes =
339 read_env_u64("AMEM_STORAGE_BUDGET_BYTES", DEFAULT_STORAGE_BUDGET_BYTES).max(1);
340 let storage_budget_horizon_years = read_env_u32(
341 "AMEM_STORAGE_BUDGET_HORIZON_YEARS",
342 DEFAULT_STORAGE_BUDGET_HORIZON_YEARS,
343 )
344 .max(1);
345 let storage_budget_target_fraction =
346 read_env_f32("AMEM_STORAGE_BUDGET_TARGET_FRACTION", 0.85).clamp(0.50, 0.99);
347 let auto_capture_mode = AutoCaptureMode::from_env("AMEM_AUTO_CAPTURE_MODE");
348 let auto_capture_redact = read_env_bool("AMEM_AUTO_CAPTURE_REDACT", true);
349 let auto_capture_max_chars = read_env_usize(
350 "AMEM_AUTO_CAPTURE_MAX_CHARS",
351 DEFAULT_AUTO_CAPTURE_MAX_CHARS,
352 )
353 .clamp(256, 16384);
354
355 let mut manager = Self {
356 graph,
357 query_engine: QueryEngine::new(),
358 write_engine: WriteEngine::new(dimension),
359 file_path,
360 current_session,
361 profile,
362 migration_policy,
363 dirty: false,
364 last_save: Instant::now(),
365 auto_save_interval: Duration::from_secs(auto_save_secs),
366 backup_interval: Duration::from_secs(backup_secs),
367 backup_retention,
368 backups_dir,
369 save_generation: if file_existed { 1 } else { 0 },
370 last_backup_generation: 0,
371 last_backup: Instant::now(),
372 sleep_cycle_interval: Duration::from_secs(sleep_cycle_secs),
373 archive_min_session_nodes,
374 hot_min_decay,
375 warm_min_decay,
376 sla_max_mutations_per_min,
377 last_sleep_cycle: Instant::now(),
378 sleep_idle_min: Duration::from_secs(sleep_idle_secs),
379 last_activity: Instant::now(),
380 mutation_window_started: Instant::now(),
381 mutation_window_count: 0,
382 maintenance_throttle_count: 0,
383 last_health_ledger_emit: Instant::now()
384 .checked_sub(health_ledger_emit_interval)
385 .unwrap_or_else(Instant::now),
386 health_ledger_emit_interval,
387 storage_budget_mode,
388 storage_budget_max_bytes,
389 storage_budget_horizon_years,
390 storage_budget_target_fraction,
391 storage_budget_rollup_count: 0,
392 auto_capture_mode,
393 auto_capture_redact,
394 auto_capture_max_chars,
395 auto_capture_count: 0,
396 last_temporal_node_id: None,
397 last_file_mtime: if file_existed {
398 std::fs::metadata(path).and_then(|m| m.modified()).ok()
399 } else {
400 None
401 },
402 workspace_manager: super::workspace::WorkspaceManager::new(),
403 };
404
405 if let Some(version) = legacy_version {
406 match migration_policy {
407 StorageMigrationPolicy::Strict => {
408 return Err(McpError::AgenticMemory(format!(
409 "Legacy .amem version {} blocked by strict migration policy",
410 version
411 )));
412 }
413 StorageMigrationPolicy::Off => {
414 tracing::warn!(
415 "Legacy storage version detected (v{}), auto-migration disabled by policy",
416 version
417 );
418 }
419 StorageMigrationPolicy::AutoSafe => {
420 if let Some(checkpoint) = manager.create_migration_checkpoint(version)? {
421 tracing::info!(
422 "Legacy storage version detected (v{}), checkpoint created at {}",
423 version,
424 checkpoint.display()
425 );
426 }
427 manager.dirty = true;
428 manager.save()?;
429 tracing::info!(
430 "Auto-migrated memory storage from v{} to v{} at {}",
431 version,
432 CURRENT_AMEM_VERSION,
433 manager.file_path.display()
434 );
435 }
436 }
437 }
438
439 Ok(manager)
440 }
441
442 pub fn graph(&self) -> &MemoryGraph {
444 &self.graph
445 }
446
447 pub fn graph_mut(&mut self) -> &mut MemoryGraph {
449 self.dirty = true;
450 self.last_activity = Instant::now();
451 self.record_mutation();
452 &mut self.graph
453 }
454
455 pub fn query_engine(&self) -> &QueryEngine {
457 &self.query_engine
458 }
459
460 pub fn write_engine(&self) -> &WriteEngine {
462 &self.write_engine
463 }
464
465 pub fn workspace_manager(&self) -> &super::workspace::WorkspaceManager {
467 &self.workspace_manager
468 }
469
470 pub fn workspace_manager_mut(&mut self) -> &mut super::workspace::WorkspaceManager {
472 &mut self.workspace_manager
473 }
474
475 pub fn current_session_id(&self) -> u32 {
477 self.current_session
478 }
479
480 pub fn start_session(&mut self, explicit_id: Option<u32>) -> McpResult<u32> {
482 let session_id = explicit_id.unwrap_or_else(|| {
483 let ids = self.graph.session_index().session_ids();
484 let max_indexed = ids.iter().copied().max().unwrap_or(0);
485 max_indexed.max(self.current_session).saturating_add(1)
487 });
488
489 self.current_session = session_id;
490 self.last_temporal_node_id = None;
491 self.last_activity = Instant::now();
492 tracing::info!("Started session {session_id}");
493 Ok(session_id)
494 }
495
496 pub fn end_session_with_episode(&mut self, session_id: u32, summary: &str) -> McpResult<u64> {
498 let episode_id = self
499 .write_engine
500 .compress_session(&mut self.graph, session_id, summary)
501 .map_err(|e| McpError::AgenticMemory(format!("Failed to compress session: {e}")))?;
502
503 self.dirty = true;
504 self.last_activity = Instant::now();
505 self.record_mutation();
506 self.save()?;
507
508 tracing::info!("Ended session {session_id}, created episode node {episode_id}");
509
510 Ok(episode_id)
511 }
512
513 pub fn save(&mut self) -> McpResult<()> {
521 if !self.dirty {
522 return Ok(());
523 }
524
525 let _lock = FileLock::acquire(&self.file_path)?;
526
527 if self.file_path.exists() {
529 let current_mtime = std::fs::metadata(&self.file_path)
530 .and_then(|m| m.modified())
531 .ok();
532 if let (Some(current), Some(last_known)) = (current_mtime, self.last_file_mtime) {
533 if current > last_known {
534 tracing::info!("Detected external modification, merging with disk state");
535 self.merge_with_disk()?;
536 }
537 }
538 }
539
540 let writer = AmemWriter::new(self.graph.dimension());
541 writer
542 .write_to_file(&self.graph, &self.file_path)
543 .map_err(|e| McpError::AgenticMemory(format!("Failed to write memory file: {e}")))?;
544
545 self.last_file_mtime = std::fs::metadata(&self.file_path)
547 .and_then(|m| m.modified())
548 .ok();
549
550 self.dirty = false;
551 self.last_save = Instant::now();
552 self.save_generation = self.save_generation.saturating_add(1);
553 tracing::debug!("Saved memory file: {}", self.file_path.display());
554 Ok(())
555 }
556
557 fn merge_with_disk(&mut self) -> McpResult<()> {
563 let disk_graph = AmemReader::read_from_file(&self.file_path)
564 .map_err(|e| McpError::AgenticMemory(format!("Failed to re-read for merge: {e}")))?;
565
566 let our_nodes: Vec<_> = self
568 .graph
569 .nodes()
570 .iter()
571 .filter(|n| n.session_id == self.current_session)
572 .cloned()
573 .collect();
574
575 let our_node_ids: std::collections::HashSet<u64> = our_nodes.iter().map(|n| n.id).collect();
577 let our_edges: Vec<_> = self
578 .graph
579 .edges()
580 .iter()
581 .filter(|e| our_node_ids.contains(&e.source_id) || our_node_ids.contains(&e.target_id))
582 .cloned()
583 .collect();
584
585 self.graph = disk_graph;
587
588 let mut id_map: HashMap<u64, u64> = HashMap::new();
590 for node in &our_nodes {
591 let event = CognitiveEventBuilder::new(node.event_type, node.content.clone())
592 .session_id(self.current_session)
593 .confidence(node.confidence)
594 .build();
595 let result = self
596 .write_engine
597 .ingest(&mut self.graph, vec![event], vec![])
598 .map_err(|e| McpError::AgenticMemory(format!("Merge node re-add failed: {e}")))?;
599 if let Some(&new_id) = result.new_node_ids.first() {
600 id_map.insert(node.id, new_id);
601 }
602 }
603
604 for edge in &our_edges {
606 let source = id_map
607 .get(&edge.source_id)
608 .copied()
609 .unwrap_or(edge.source_id);
610 let target = id_map
611 .get(&edge.target_id)
612 .copied()
613 .unwrap_or(edge.target_id);
614 let new_edge = Edge::new(source, target, edge.edge_type, edge.weight);
615 if let Err(e) = self.graph.add_edge(new_edge) {
616 tracing::warn!("Merge edge re-add skipped: {e}");
617 }
618 }
619
620 tracing::info!(
621 "Merged {} nodes and {} edges from session {} into disk state",
622 our_nodes.len(),
623 our_edges.len(),
624 self.current_session
625 );
626 Ok(())
627 }
628
629 pub fn maybe_auto_save(&mut self) -> McpResult<()> {
631 if self.dirty && self.last_save.elapsed() >= self.auto_save_interval {
632 self.save()?;
633 }
634 Ok(())
635 }
636
637 pub fn run_maintenance_tick(&mut self) -> McpResult<()> {
639 if self.should_throttle_maintenance() {
640 self.maintenance_throttle_count = self.maintenance_throttle_count.saturating_add(1);
641 self.maybe_auto_save()?;
642 self.emit_health_ledger("throttled")?;
643 tracing::debug!(
644 "Maintenance throttled by SLA guard: mutation_rate={} threshold={}",
645 self.mutation_rate_per_min(),
646 self.sla_max_mutations_per_min
647 );
648 return Ok(());
649 }
650
651 self.maybe_run_sleep_cycle()?;
652 self.maybe_auto_save()?;
653 self.maybe_enforce_storage_budget()?;
654 self.maybe_auto_backup()?;
655 self.emit_health_ledger("normal")?;
656 Ok(())
657 }
658
659 pub fn maybe_run_sleep_cycle(&mut self) -> McpResult<()> {
661 if self.last_sleep_cycle.elapsed() < self.sleep_cycle_interval {
662 return Ok(());
663 }
664 if self.last_activity.elapsed() < self.sleep_idle_min {
665 return Ok(());
666 }
667
668 let now = agentic_memory::now_micros();
669 let decay_report = self
670 .write_engine
671 .run_decay(&mut self.graph, now)
672 .map_err(|e| McpError::AgenticMemory(format!("Sleep-cycle decay failed: {e}")))?;
673 let archived_sessions = self.auto_archive_completed_sessions()?;
674
675 if decay_report.nodes_decayed > 0 || archived_sessions > 0 {
676 self.dirty = true;
677 self.save()?;
678 }
679
680 let (hot, warm, cold) = self.tier_counts();
681 self.last_sleep_cycle = Instant::now();
682 tracing::info!(
683 "Sleep-cycle complete: decayed={} archived_sessions={} tiers(h/w/c)={}/{}/{}",
684 decay_report.nodes_decayed,
685 archived_sessions,
686 hot,
687 warm,
688 cold
689 );
690 Ok(())
691 }
692
693 pub fn maybe_auto_backup(&mut self) -> McpResult<()> {
695 if self.last_backup.elapsed() < self.backup_interval {
696 return Ok(());
697 }
698 if self.save_generation <= self.last_backup_generation {
699 return Ok(());
700 }
701 if !self.file_path.exists() {
702 return Ok(());
703 }
704
705 std::fs::create_dir_all(&self.backups_dir).map_err(McpError::Io)?;
706 let backup_path = self.next_backup_path();
707 std::fs::copy(&self.file_path, &backup_path).map_err(McpError::Io)?;
708 self.last_backup_generation = self.save_generation;
709 self.last_backup = Instant::now();
710 self.prune_old_backups()?;
711 tracing::info!("Auto-backup written: {}", backup_path.display());
712 Ok(())
713 }
714
715 pub fn mark_dirty(&mut self) {
717 self.dirty = true;
718 self.last_activity = Instant::now();
719 self.record_mutation();
720 }
721
722 pub fn file_path(&self) -> &PathBuf {
724 &self.file_path
725 }
726
727 pub fn last_temporal_node_id(&self) -> Option<u64> {
729 self.last_temporal_node_id
730 }
731
732 pub fn advance_temporal_chain(&mut self, node_id: u64) {
734 self.last_temporal_node_id = Some(node_id);
735 }
736
737 pub fn link_temporal(&mut self, prev_id: u64, next_id: u64) -> McpResult<()> {
739 let edge = Edge::new(prev_id, next_id, EdgeType::TemporalNext, 1.0);
740 self.graph
741 .add_edge(edge)
742 .map_err(|e| McpError::AgenticMemory(format!("Failed to add temporal edge: {e}")))?;
743 self.dirty = true;
744 Ok(())
745 }
746
747 pub fn maintenance_interval(&self) -> Duration {
749 self.auto_save_interval
750 .min(self.backup_interval)
751 .min(self.sleep_cycle_interval)
752 }
753
754 pub fn capture_prompt_request(
756 &mut self,
757 prompt_name: &str,
758 arguments: Option<&Value>,
759 ) -> McpResult<Option<u64>> {
760 if self.auto_capture_mode == AutoCaptureMode::Off {
761 return Ok(None);
762 }
763 match extract_prompt_capture_text(prompt_name, arguments)? {
764 Some(text) => self.persist_auto_capture(EventType::Fact, &text, 0.90),
765 None => Ok(None),
766 }
767 }
768
769 pub fn capture_tool_call(
771 &mut self,
772 tool_name: &str,
773 arguments: Option<&Value>,
774 ) -> McpResult<Option<u64>> {
775 if self.auto_capture_mode == AutoCaptureMode::Off {
776 return Ok(None);
777 }
778
779 let text = match self.auto_capture_mode {
780 AutoCaptureMode::Safe => extract_safe_tool_capture_text(tool_name, arguments)?,
781 AutoCaptureMode::Full => extract_full_tool_capture_text(tool_name, arguments)?,
782 AutoCaptureMode::Off => None,
783 };
784 match text {
785 Some(v) => self.persist_auto_capture(EventType::Inference, &v, 0.82),
786 None => Ok(None),
787 }
788 }
789
790 pub fn add_event(
792 &mut self,
793 event_type: EventType,
794 content: &str,
795 confidence: f32,
796 edges: Vec<(u64, EdgeType, f32)>,
797 ) -> McpResult<(u64, usize)> {
798 let event = CognitiveEventBuilder::new(event_type, content.to_string())
799 .session_id(self.current_session)
800 .confidence(confidence)
801 .build();
802
803 let result = self
805 .write_engine
806 .ingest(&mut self.graph, vec![event], vec![])
807 .map_err(|e| McpError::AgenticMemory(format!("Failed to add event: {e}")))?;
808
809 let node_id = result.new_node_ids.first().copied().ok_or_else(|| {
810 McpError::InternalError("No node ID returned from ingest".to_string())
811 })?;
812
813 let mut edge_count = 0;
815 for (target_id, edge_type, weight) in &edges {
816 let edge = Edge::new(node_id, *target_id, *edge_type, *weight);
817 self.graph
818 .add_edge(edge)
819 .map_err(|e| McpError::AgenticMemory(format!("Failed to add edge: {e}")))?;
820 edge_count += 1;
821 }
822
823 self.dirty = true;
824 self.last_activity = Instant::now();
825 self.record_mutation();
826 self.maybe_auto_save()?;
827
828 Ok((node_id, edge_count))
829 }
830
831 pub fn correct_node(&mut self, old_node_id: u64, new_content: &str) -> McpResult<u64> {
833 let new_id = self
834 .write_engine
835 .correct(
836 &mut self.graph,
837 old_node_id,
838 new_content,
839 self.current_session,
840 )
841 .map_err(|e| McpError::AgenticMemory(format!("Failed to correct node: {e}")))?;
842
843 self.dirty = true;
844 self.last_activity = Instant::now();
845 self.record_mutation();
846 self.maybe_auto_save()?;
847
848 Ok(new_id)
849 }
850
851 fn record_mutation(&mut self) {
852 if self.mutation_window_started.elapsed() >= Duration::from_secs(60) {
853 self.mutation_window_started = Instant::now();
854 self.mutation_window_count = 0;
855 }
856 self.mutation_window_count = self.mutation_window_count.saturating_add(1);
857 }
858
859 fn mutation_rate_per_min(&self) -> u32 {
860 let elapsed = self.mutation_window_started.elapsed().as_secs().max(1);
861 let scaled = (self.mutation_window_count as u64)
862 .saturating_mul(60)
863 .saturating_div(elapsed);
864 scaled.min(u32::MAX as u64) as u32
865 }
866
867 fn should_throttle_maintenance(&self) -> bool {
868 self.mutation_rate_per_min() > self.sla_max_mutations_per_min
869 }
870
871 fn emit_health_ledger(&mut self, maintenance_mode: &str) -> McpResult<()> {
872 if self.last_health_ledger_emit.elapsed() < self.health_ledger_emit_interval {
873 return Ok(());
874 }
875
876 let dir = resolve_health_ledger_dir();
877 std::fs::create_dir_all(&dir).map_err(McpError::Io)?;
878 let path = dir.join("agentic-memory.json");
879 let tmp = dir.join("agentic-memory.json.tmp");
880 let (hot, warm, cold) = self.tier_counts();
881 let current_size_bytes = self.current_file_size_bytes();
882 let projected_size_bytes = self.projected_file_size_bytes(current_size_bytes);
883 let over_budget = current_size_bytes > self.storage_budget_max_bytes
884 || projected_size_bytes
885 .map(|v| v > self.storage_budget_max_bytes)
886 .unwrap_or(false);
887 let payload = serde_json::json!({
888 "project": "AgenticMemory",
889 "timestamp": chrono::Utc::now().to_rfc3339(),
890 "status": "ok",
891 "autonomic": {
892 "profile": self.profile.as_str(),
893 "migration_policy": self.migration_policy.as_str(),
894 "maintenance_mode": maintenance_mode,
895 "throttle_count": self.maintenance_throttle_count,
896 },
897 "sla": {
898 "mutation_rate_per_min": self.mutation_rate_per_min(),
899 "max_mutations_per_min": self.sla_max_mutations_per_min
900 },
901 "storage": {
902 "file": self.file_path.display().to_string(),
903 "dirty": self.dirty,
904 "save_generation": self.save_generation,
905 "backup_retention": self.backup_retention,
906 },
907 "storage_budget": {
908 "mode": self.storage_budget_mode.as_str(),
909 "max_bytes": self.storage_budget_max_bytes,
910 "horizon_years": self.storage_budget_horizon_years,
911 "target_fraction": self.storage_budget_target_fraction,
912 "current_size_bytes": current_size_bytes,
913 "projected_size_bytes": projected_size_bytes,
914 "over_budget": over_budget,
915 "rollup_count": self.storage_budget_rollup_count,
916 },
917 "auto_capture": {
918 "mode": self.auto_capture_mode.as_str(),
919 "redact": self.auto_capture_redact,
920 "max_chars": self.auto_capture_max_chars,
921 "captured_count": self.auto_capture_count
922 },
923 "graph": {
924 "nodes": self.graph.node_count(),
925 "edges": self.graph.edge_count(),
926 "tiers": {
927 "hot": hot,
928 "warm": warm,
929 "cold": cold,
930 },
931 },
932 });
933 let bytes = serde_json::to_vec_pretty(&payload).map_err(|e| {
934 McpError::AgenticMemory(format!("Failed to encode health ledger payload: {e}"))
935 })?;
936 std::fs::write(&tmp, bytes).map_err(McpError::Io)?;
937 std::fs::rename(&tmp, &path).map_err(McpError::Io)?;
938 self.last_health_ledger_emit = Instant::now();
939 Ok(())
940 }
941}
942
943impl Drop for SessionManager {
944 fn drop(&mut self) {
945 if self.dirty {
946 if let Err(e) = self.save() {
947 tracing::error!("Failed to save on drop: {e}");
948 }
949 }
950 if let Err(e) = self.maybe_auto_backup() {
951 tracing::error!("Failed auto-backup on drop: {e}");
952 }
953 }
954}
955
956impl SessionManager {
957 fn auto_archive_completed_sessions(&mut self) -> McpResult<usize> {
958 self.auto_archive_completed_sessions_with_min(self.archive_min_session_nodes)
959 }
960
961 fn auto_archive_completed_sessions_with_min(
962 &mut self,
963 min_session_nodes: usize,
964 ) -> McpResult<usize> {
965 let mut session_ids = self.graph.session_index().session_ids();
966 session_ids.sort_unstable();
967
968 let mut archived = 0usize;
969 for session_id in session_ids {
970 if session_id >= self.current_session {
971 continue;
972 }
973
974 let node_ids = self.graph.session_index().get_session(session_id).to_vec();
975 if node_ids.is_empty() {
976 continue;
977 }
978
979 let mut has_episode = false;
980 let mut event_nodes = 0usize;
981 let mut hot = 0usize;
982 let mut warm = 0usize;
983 let mut cold = 0usize;
984
985 for node_id in &node_ids {
986 if let Some(node) = self.graph.get_node(*node_id) {
987 if node.event_type == EventType::Episode {
988 has_episode = true;
989 continue;
990 }
991 event_nodes += 1;
992 if node.decay_score >= self.hot_min_decay {
993 hot += 1;
994 } else if node.decay_score >= self.warm_min_decay {
995 warm += 1;
996 } else {
997 cold += 1;
998 }
999 }
1000 }
1001
1002 if has_episode || event_nodes < min_session_nodes {
1003 continue;
1004 }
1005
1006 let summary = format!(
1007 "Auto-archive session {}: {} events ({} hot / {} warm / {} cold)",
1008 session_id, event_nodes, hot, warm, cold
1009 );
1010 self.write_engine
1011 .compress_session(&mut self.graph, session_id, &summary)
1012 .map_err(|e| {
1013 McpError::AgenticMemory(format!(
1014 "Auto-archive failed for session {session_id}: {e}"
1015 ))
1016 })?;
1017 archived = archived.saturating_add(1);
1018 }
1019
1020 Ok(archived)
1021 }
1022
1023 fn maybe_enforce_storage_budget(&mut self) -> McpResult<()> {
1024 if self.storage_budget_mode == StorageBudgetMode::Off {
1025 return Ok(());
1026 }
1027
1028 let current_size = self.current_file_size_bytes();
1029 if current_size == 0 {
1030 return Ok(());
1031 }
1032 let projected = self.projected_file_size_bytes(current_size);
1033 let over_current = current_size > self.storage_budget_max_bytes;
1034 let over_projected = projected
1035 .map(|v| v > self.storage_budget_max_bytes)
1036 .unwrap_or(false);
1037
1038 if !over_current && !over_projected {
1039 return Ok(());
1040 }
1041
1042 if self.storage_budget_mode == StorageBudgetMode::Warn {
1043 tracing::warn!(
1044 "Storage budget warning: current={} projected={:?} budget={} (mode=warn)",
1045 current_size,
1046 projected,
1047 self.storage_budget_max_bytes
1048 );
1049 return Ok(());
1050 }
1051
1052 let target_bytes = ((self.storage_budget_max_bytes as f64
1053 * self.storage_budget_target_fraction as f64)
1054 .round() as u64)
1055 .max(1);
1056 let mut rollup_count = 0usize;
1057 let mut threshold = self.archive_min_session_nodes.saturating_div(2).max(1);
1058
1059 for _ in 0..3 {
1060 let archived = self.auto_archive_completed_sessions_with_min(threshold)?;
1061 if archived == 0 {
1062 if threshold > 1 {
1063 threshold = 1;
1064 continue;
1065 }
1066 break;
1067 }
1068 rollup_count += archived;
1069 self.dirty = true;
1070 self.save()?;
1071 let new_size = self.current_file_size_bytes();
1072 if new_size <= target_bytes {
1073 break;
1074 }
1075 threshold = 1;
1076 }
1077
1078 if rollup_count > 0 {
1079 self.storage_budget_rollup_count = self
1080 .storage_budget_rollup_count
1081 .saturating_add(rollup_count as u64);
1082 tracing::info!(
1083 "Storage budget rollup applied: archived_sessions={} budget={} target={} current={}",
1084 rollup_count,
1085 self.storage_budget_max_bytes,
1086 target_bytes,
1087 self.current_file_size_bytes()
1088 );
1089 } else {
1090 tracing::warn!(
1091 "Storage budget exceeded but no completed sessions eligible for rollup (current={} projected={:?} budget={})",
1092 current_size,
1093 projected,
1094 self.storage_budget_max_bytes
1095 );
1096 }
1097
1098 Ok(())
1099 }
1100
1101 fn current_file_size_bytes(&self) -> u64 {
1102 std::fs::metadata(&self.file_path)
1103 .map(|m| m.len())
1104 .unwrap_or(0)
1105 }
1106
1107 fn persist_auto_capture(
1108 &mut self,
1109 event_type: EventType,
1110 raw_text: &str,
1111 confidence: f32,
1112 ) -> McpResult<Option<u64>> {
1113 let mut text = raw_text.trim().to_string();
1114 if text.is_empty() {
1115 return Ok(None);
1116 }
1117
1118 if self.auto_capture_redact {
1119 text = redact_sensitive_tokens(&text);
1120 }
1121
1122 if text.len() > self.auto_capture_max_chars {
1123 text.truncate(self.auto_capture_max_chars);
1124 text.push_str(" …[truncated]");
1125 }
1126
1127 let prev_id = self.last_temporal_node_id;
1128 let (node_id, _) = self.add_event(event_type, &text, confidence, vec![])?;
1129
1130 if let Some(prev) = prev_id {
1132 if let Err(e) = self.link_temporal(prev, node_id) {
1133 tracing::warn!("Failed to link temporal chain: {e}");
1134 }
1135 }
1136 self.last_temporal_node_id = Some(node_id);
1137
1138 self.auto_capture_count = self.auto_capture_count.saturating_add(1);
1139 Ok(Some(node_id))
1140 }
1141
1142 fn projected_file_size_bytes(&self, current_size: u64) -> Option<u64> {
1143 if current_size == 0 || self.graph.node_count() < 2 {
1144 return None;
1145 }
1146 let mut min_ts = u64::MAX;
1147 let mut max_ts = 0u64;
1148 for node in self.graph.nodes() {
1149 min_ts = min_ts.min(node.created_at);
1150 max_ts = max_ts.max(node.created_at);
1151 }
1152 if min_ts == u64::MAX || max_ts <= min_ts {
1153 return None;
1154 }
1155
1156 let span_secs_raw = (max_ts - min_ts) / 1_000_000;
1157 let span_secs = span_secs_raw.max(7 * 24 * 3600) as f64;
1159 let per_sec = current_size as f64 / span_secs;
1160 let horizon_secs = (self.storage_budget_horizon_years as f64) * 365.25 * 24.0 * 3600.0;
1161 let projected = (per_sec * horizon_secs).round();
1162 Some(projected.max(0.0).min(u64::MAX as f64) as u64)
1163 }
1164
1165 fn tier_counts(&self) -> (usize, usize, usize) {
1166 let mut hot = 0usize;
1167 let mut warm = 0usize;
1168 let mut cold = 0usize;
1169
1170 for node in self.graph.nodes() {
1171 if node.event_type == EventType::Episode {
1172 continue;
1173 }
1174 if node.decay_score >= self.hot_min_decay {
1175 hot += 1;
1176 } else if node.decay_score >= self.warm_min_decay {
1177 warm += 1;
1178 } else {
1179 cold += 1;
1180 }
1181 }
1182
1183 (hot, warm, cold)
1184 }
1185
1186 fn create_migration_checkpoint(&self, from_version: u32) -> McpResult<Option<PathBuf>> {
1187 if !self.file_path.exists() {
1188 return Ok(None);
1189 }
1190
1191 let migration_dir = resolve_migration_dir(&self.file_path);
1192 std::fs::create_dir_all(&migration_dir).map_err(McpError::Io)?;
1193
1194 let ts = chrono::Utc::now().format("%Y%m%d%H%M%S");
1195 let stem = self
1196 .file_path
1197 .file_stem()
1198 .and_then(OsStr::to_str)
1199 .unwrap_or("brain");
1200 let checkpoint = migration_dir.join(format!("{stem}.v{from_version}.{ts}.amem.checkpoint"));
1201 std::fs::copy(&self.file_path, &checkpoint).map_err(McpError::Io)?;
1202 Ok(Some(checkpoint))
1203 }
1204
1205 fn next_backup_path(&self) -> PathBuf {
1206 let ts = chrono::Utc::now().format("%Y%m%d%H%M%S");
1207 let stem = self
1208 .file_path
1209 .file_stem()
1210 .and_then(OsStr::to_str)
1211 .unwrap_or("brain");
1212 self.backups_dir.join(format!("{stem}.{ts}.amem.bak"))
1213 }
1214
1215 fn prune_old_backups(&self) -> McpResult<()> {
1216 let mut entries = std::fs::read_dir(&self.backups_dir)
1217 .map_err(McpError::Io)?
1218 .filter_map(Result::ok)
1219 .filter(|entry| {
1220 entry
1221 .file_name()
1222 .to_str()
1223 .map(|name| name.ends_with(".amem.bak"))
1224 .unwrap_or(false)
1225 })
1226 .collect::<Vec<_>>();
1227
1228 if entries.len() <= self.backup_retention {
1229 return Ok(());
1230 }
1231
1232 entries.sort_by_key(|entry| {
1233 entry
1234 .metadata()
1235 .and_then(|m| m.modified())
1236 .ok()
1237 .unwrap_or(std::time::SystemTime::UNIX_EPOCH)
1238 });
1239 let to_remove = entries.len().saturating_sub(self.backup_retention);
1240 for entry in entries.into_iter().take(to_remove) {
1241 let _ = std::fs::remove_file(entry.path());
1242 }
1243 Ok(())
1244 }
1245}
1246
1247fn resolve_backups_dir(memory_path: &std::path::Path) -> PathBuf {
1248 if let Ok(custom) = std::env::var("AMEM_AUTO_BACKUP_DIR") {
1249 let trimmed = custom.trim();
1250 if !trimmed.is_empty() {
1251 return PathBuf::from(trimmed);
1252 }
1253 }
1254
1255 let parent = memory_path.parent().unwrap_or(std::path::Path::new("."));
1256 parent.join(".amem-backups")
1257}
1258
1259fn resolve_migration_dir(memory_path: &Path) -> PathBuf {
1260 let parent = memory_path.parent().unwrap_or(std::path::Path::new("."));
1261 parent.join(".amem-migrations")
1262}
1263
1264fn read_storage_version(path: &Path) -> Option<u32> {
1265 let mut file = std::fs::File::open(path).ok()?;
1266 let mut header = [0u8; 8];
1267 file.read_exact(&mut header).ok()?;
1268 if &header[0..4] != b"AMEM" {
1269 return None;
1270 }
1271 Some(u32::from_le_bytes([
1272 header[4], header[5], header[6], header[7],
1273 ]))
1274}
1275
1276fn read_env_u64(name: &str, default_value: u64) -> u64 {
1277 std::env::var(name)
1278 .ok()
1279 .and_then(|v| v.parse::<u64>().ok())
1280 .unwrap_or(default_value)
1281}
1282
1283fn read_env_u32(name: &str, default_value: u32) -> u32 {
1284 std::env::var(name)
1285 .ok()
1286 .and_then(|v| v.parse::<u32>().ok())
1287 .unwrap_or(default_value)
1288}
1289
1290fn read_env_usize(name: &str, default_value: usize) -> usize {
1291 std::env::var(name)
1292 .ok()
1293 .and_then(|v| v.parse::<usize>().ok())
1294 .unwrap_or(default_value)
1295}
1296
1297fn read_env_f32(name: &str, default_value: f32) -> f32 {
1298 std::env::var(name)
1299 .ok()
1300 .and_then(|v| v.parse::<f32>().ok())
1301 .unwrap_or(default_value)
1302}
1303
1304fn read_env_bool(name: &str, default_value: bool) -> bool {
1305 std::env::var(name)
1306 .ok()
1307 .map(|v| {
1308 matches!(
1309 v.trim().to_ascii_lowercase().as_str(),
1310 "1" | "true" | "yes" | "on"
1311 )
1312 })
1313 .unwrap_or(default_value)
1314}
1315
1316fn read_env_string(name: &str) -> Option<String> {
1317 std::env::var(name).ok().map(|v| v.trim().to_string())
1318}
1319
1320fn resolve_health_ledger_dir() -> PathBuf {
1321 if let Some(custom) = read_env_string("AMEM_HEALTH_LEDGER_DIR") {
1322 if !custom.is_empty() {
1323 return PathBuf::from(custom);
1324 }
1325 }
1326 if let Some(custom) = read_env_string("AGENTRA_HEALTH_LEDGER_DIR") {
1327 if !custom.is_empty() {
1328 return PathBuf::from(custom);
1329 }
1330 }
1331
1332 let home = std::env::var("HOME")
1333 .ok()
1334 .map(PathBuf::from)
1335 .unwrap_or_else(|| PathBuf::from("."));
1336 home.join(".agentra").join("health-ledger")
1337}
1338
1339fn extract_prompt_capture_text(
1340 prompt_name: &str,
1341 arguments: Option<&Value>,
1342) -> McpResult<Option<String>> {
1343 let args = arguments.unwrap_or(&Value::Null);
1344 let fields = collect_text_fields_by_keys(
1345 args,
1346 &[
1347 "information",
1348 "context",
1349 "topic",
1350 "old_belief",
1351 "new_information",
1352 "reason",
1353 "summary",
1354 "instruction",
1355 "prompt",
1356 "query",
1357 ],
1358 8,
1359 );
1360 if fields.is_empty() {
1361 return Ok(None);
1362 }
1363 let joined = fields.join(" | ");
1364 Ok(Some(format!(
1365 "[auto-capture][prompt] template={prompt_name} input={joined}"
1366 )))
1367}
1368
1369fn extract_safe_tool_capture_text(
1370 tool_name: &str,
1371 arguments: Option<&Value>,
1372) -> McpResult<Option<String>> {
1373 let args = arguments.unwrap_or(&Value::Null);
1374 let keys = ["feedback", "summary", "note"];
1375 if tool_name != "session_end" {
1376 let explicit_feedback = collect_text_fields_by_keys(args, &["feedback", "note"], 4);
1378 if explicit_feedback.is_empty() {
1379 return Ok(None);
1380 }
1381 }
1382 let fields = collect_text_fields_by_keys(args, &keys, 6);
1383 if fields.is_empty() {
1384 return Ok(None);
1385 }
1386 Ok(Some(format!(
1387 "[auto-capture][feedback] tool={tool_name} context={}",
1388 fields.join(" | ")
1389 )))
1390}
1391
1392fn extract_full_tool_capture_text(
1393 tool_name: &str,
1394 arguments: Option<&Value>,
1395) -> McpResult<Option<String>> {
1396 if tool_name == "memory_add" {
1397 return Ok(None);
1398 }
1399 let args = arguments.unwrap_or(&Value::Null);
1400 let preferred = collect_text_fields_by_keys(
1401 args,
1402 &[
1403 "query",
1404 "content",
1405 "prompt",
1406 "new_content",
1407 "reason",
1408 "summary",
1409 "topic",
1410 "instruction",
1411 "information",
1412 "context",
1413 "feedback",
1414 ],
1415 10,
1416 );
1417
1418 let fields = if preferred.is_empty() {
1419 collect_all_string_like_fields(args, 8)
1420 } else {
1421 preferred
1422 };
1423
1424 if fields.is_empty() {
1425 return Ok(None);
1426 }
1427 Ok(Some(format!(
1428 "[auto-capture][tool] tool={tool_name} input={}",
1429 fields.join(" | ")
1430 )))
1431}
1432
1433fn collect_text_fields_by_keys(value: &Value, keys: &[&str], limit: usize) -> Vec<String> {
1434 let mut out = Vec::new();
1435 let mut seen = std::collections::BTreeSet::<String>::new();
1436
1437 fn walk(
1438 value: &Value,
1439 path: String,
1440 keys: &[&str],
1441 out: &mut Vec<String>,
1442 seen: &mut std::collections::BTreeSet<String>,
1443 limit: usize,
1444 ) {
1445 if out.len() >= limit {
1446 return;
1447 }
1448 match value {
1449 Value::Object(map) => {
1450 for (k, v) in map {
1451 if out.len() >= limit {
1452 break;
1453 }
1454 let next = if path.is_empty() {
1455 k.to_string()
1456 } else {
1457 format!("{path}.{k}")
1458 };
1459 let key_match = keys
1460 .iter()
1461 .any(|needle| k.eq_ignore_ascii_case(needle) || next.ends_with(needle));
1462 if key_match {
1463 if let Some(s) = value_to_compact_string(v) {
1464 let entry = format!("{next}={s}");
1465 if seen.insert(entry.clone()) {
1466 out.push(entry);
1467 }
1468 }
1469 }
1470 walk(v, next, keys, out, seen, limit);
1471 }
1472 }
1473 Value::Array(items) => {
1474 for (idx, item) in items.iter().enumerate() {
1475 if out.len() >= limit {
1476 break;
1477 }
1478 let next = format!("{path}[{idx}]");
1479 walk(item, next, keys, out, seen, limit);
1480 }
1481 }
1482 _ => {}
1483 }
1484 }
1485
1486 walk(value, String::new(), keys, &mut out, &mut seen, limit);
1487 out
1488}
1489
1490fn collect_all_string_like_fields(value: &Value, limit: usize) -> Vec<String> {
1491 let mut out = Vec::new();
1492 fn walk(value: &Value, path: String, out: &mut Vec<String>, limit: usize) {
1493 if out.len() >= limit {
1494 return;
1495 }
1496 match value {
1497 Value::Object(map) => {
1498 for (k, v) in map {
1499 if out.len() >= limit {
1500 break;
1501 }
1502 let next = if path.is_empty() {
1503 k.to_string()
1504 } else {
1505 format!("{path}.{k}")
1506 };
1507 walk(v, next, out, limit);
1508 }
1509 }
1510 Value::Array(items) => {
1511 for (idx, item) in items.iter().enumerate() {
1512 if out.len() >= limit {
1513 break;
1514 }
1515 walk(item, format!("{path}[{idx}]"), out, limit);
1516 }
1517 }
1518 _ => {
1519 if let Some(s) = value_to_compact_string(value) {
1520 out.push(format!("{path}={s}"));
1521 }
1522 }
1523 }
1524 }
1525 walk(value, String::new(), &mut out, limit);
1526 out
1527}
1528
1529fn value_to_compact_string(value: &Value) -> Option<String> {
1530 match value {
1531 Value::String(s) => {
1532 let trimmed = s.trim();
1533 if trimmed.is_empty() {
1534 None
1535 } else {
1536 Some(trimmed.to_string())
1537 }
1538 }
1539 Value::Number(n) => Some(n.to_string()),
1540 Value::Bool(b) => Some(b.to_string()),
1541 Value::Null => None,
1542 Value::Array(arr) => {
1543 if arr.is_empty() {
1544 None
1545 } else {
1546 Some(format!("<array:{}>", arr.len()))
1547 }
1548 }
1549 Value::Object(map) => {
1550 if map.is_empty() {
1551 None
1552 } else {
1553 Some(format!("<object:{}>", map.len()))
1554 }
1555 }
1556 }
1557}
1558
1559fn redact_sensitive_tokens(text: &str) -> String {
1560 text.split_whitespace()
1561 .map(redact_token)
1562 .collect::<Vec<_>>()
1563 .join(" ")
1564}
1565
1566fn redact_token(token: &str) -> String {
1567 let trimmed = token.trim_matches(|c: char| c == '"' || c == '\'' || c == ',' || c == ';');
1568 let lower = trimmed.to_ascii_lowercase();
1569 if trimmed.starts_with("/Users/")
1570 || trimmed.starts_with("C:\\Users\\")
1571 || trimmed.contains("/Users/")
1572 || trimmed.contains("C:\\Users\\")
1573 {
1574 return "[REDACTED_PATH]".to_string();
1575 }
1576 if trimmed.contains('@') && trimmed.contains('.') {
1577 return "[REDACTED_EMAIL]".to_string();
1578 }
1579 if lower.starts_with("sk-")
1580 || lower.contains("api_key")
1581 || lower.contains("access_token")
1582 || lower.contains("bearer")
1583 || lower.contains("authorization")
1584 {
1585 return "[REDACTED_SECRET]".to_string();
1586 }
1587 if looks_like_long_secret(trimmed) {
1588 return "[REDACTED_SECRET]".to_string();
1589 }
1590 token.to_string()
1591}
1592
1593fn looks_like_long_secret(token: &str) -> bool {
1594 if token.len() < 24 {
1595 return false;
1596 }
1597 token
1598 .chars()
1599 .all(|c| c.is_ascii_alphanumeric() || c == '_' || c == '-' || c == '.')
1600}
1601
1602struct FileLock {
1608 lock_path: PathBuf,
1609}
1610
1611impl FileLock {
1612 fn acquire(data_path: &Path) -> McpResult<Self> {
1615 let lock_path = data_path.with_extension("amem.lock");
1616 let stale_threshold = Duration::from_secs(60);
1617 let max_attempts = 200; for attempt in 0..max_attempts {
1620 match OpenOptions::new()
1621 .write(true)
1622 .create_new(true)
1623 .open(&lock_path)
1624 {
1625 Ok(_file) => {
1626 if attempt > 0 {
1627 tracing::debug!(
1628 "Acquired file lock after {} attempts: {}",
1629 attempt + 1,
1630 lock_path.display()
1631 );
1632 }
1633 return Ok(FileLock { lock_path });
1634 }
1635 Err(e) if e.kind() == std::io::ErrorKind::AlreadyExists => {
1636 if let Ok(meta) = std::fs::metadata(&lock_path) {
1638 let is_stale = meta
1639 .modified()
1640 .ok()
1641 .and_then(|m| m.elapsed().ok())
1642 .map(|age| age > stale_threshold)
1643 .unwrap_or(false);
1644 if is_stale {
1645 tracing::warn!("Removing stale lock file: {}", lock_path.display());
1646 let _ = std::fs::remove_file(&lock_path);
1647 continue;
1648 }
1649 }
1650 std::thread::sleep(Duration::from_millis(50));
1651 }
1652 Err(e) => {
1653 return Err(McpError::Io(e));
1654 }
1655 }
1656 }
1657
1658 Err(McpError::AgenticMemory(format!(
1659 "Timed out waiting for file lock: {}",
1660 lock_path.display()
1661 )))
1662 }
1663}
1664
1665impl Drop for FileLock {
1666 fn drop(&mut self) {
1667 let _ = std::fs::remove_file(&self.lock_path);
1668 }
1669}
1670
1671#[cfg(test)]
1672mod tests {
1673 use super::*;
1674 use serde_json::json;
1675
1676 #[test]
1677 fn budget_projection_available_with_timeline() {
1678 let dir = tempfile::tempdir().expect("tempdir");
1679 let brain = dir.path().join("projection.amem");
1680 let mut manager = SessionManager::open(brain.to_str().expect("path")).expect("open");
1681
1682 let (id_a, _) = manager
1683 .add_event(EventType::Fact, "old fact", 0.9, vec![])
1684 .expect("add fact");
1685 let (_id_b, _) = manager
1686 .add_event(EventType::Fact, "new fact", 0.9, vec![])
1687 .expect("add fact");
1688
1689 {
1690 let graph = manager.graph_mut();
1691 let old = graph.get_node_mut(id_a).expect("node");
1692 old.created_at = old.created_at.saturating_sub(15 * 24 * 3600 * 1_000_000);
1693 }
1694 manager.save().expect("save");
1695 let size = manager.current_file_size_bytes();
1696 let projected = manager.projected_file_size_bytes(size);
1697 assert!(size > 0);
1698 assert!(projected.is_some());
1699 }
1700
1701 #[test]
1702 fn budget_auto_rollup_archives_completed_session() {
1703 let dir = tempfile::tempdir().expect("tempdir");
1704 let brain = dir.path().join("rollup.amem");
1705 let mut manager = SessionManager::open(brain.to_str().expect("path")).expect("open");
1706
1707 let _ = manager
1709 .add_event(EventType::Fact, "alpha", 0.8, vec![])
1710 .expect("add");
1711 let _ = manager
1712 .add_event(EventType::Decision, "beta", 0.9, vec![])
1713 .expect("add");
1714 manager.start_session(None).expect("session");
1715 manager.save().expect("save");
1716
1717 manager.storage_budget_mode = StorageBudgetMode::AutoRollup;
1719 manager.storage_budget_max_bytes = 1;
1720 manager.storage_budget_target_fraction = 0.5;
1721
1722 manager
1723 .maybe_enforce_storage_budget()
1724 .expect("enforce budget");
1725
1726 let episode_count = manager
1727 .graph()
1728 .nodes()
1729 .iter()
1730 .filter(|n| n.event_type == EventType::Episode)
1731 .count();
1732 assert!(episode_count >= 1);
1733 assert!(manager.storage_budget_rollup_count >= 1);
1734 }
1735
1736 #[test]
1737 fn auto_capture_off_noop() {
1738 let dir = tempfile::tempdir().expect("tempdir");
1739 let brain = dir.path().join("capture-off.amem");
1740 let mut manager = SessionManager::open(brain.to_str().expect("path")).expect("open");
1741 manager.auto_capture_mode = AutoCaptureMode::Off;
1742
1743 let captured = manager
1744 .capture_prompt_request(
1745 "remember",
1746 Some(&json!({"information":"hello world","context":"ctx"})),
1747 )
1748 .expect("capture");
1749 assert!(captured.is_none());
1750 assert_eq!(manager.graph().node_count(), 0);
1751 }
1752
1753 #[test]
1754 fn auto_capture_full_records_and_redacts() {
1755 let dir = tempfile::tempdir().expect("tempdir");
1756 let brain = dir.path().join("capture-full.amem");
1757 let mut manager = SessionManager::open(brain.to_str().expect("path")).expect("open");
1758 manager.auto_capture_mode = AutoCaptureMode::Full;
1759 manager.auto_capture_redact = true;
1760
1761 manager
1762 .capture_tool_call(
1763 "memory_query",
1764 Some(&json!({
1765 "query":"Find anything about token sk-THISISALONGSECRET123456",
1766 "context":"/Users/omoshola/Documents/private.txt",
1767 "reason":"email me at test@example.com"
1768 })),
1769 )
1770 .expect("capture");
1771
1772 assert!(manager.graph().node_count() >= 1);
1773 let latest = manager
1774 .graph()
1775 .nodes()
1776 .iter()
1777 .max_by_key(|n| n.id)
1778 .expect("node");
1779 assert!(latest.content.contains("[auto-capture][tool]"));
1780 assert!(latest.content.contains("[REDACTED_SECRET]"));
1781 assert!(latest.content.contains("[REDACTED_PATH]"));
1782 assert!(latest.content.contains("[REDACTED_EMAIL]"));
1783 }
1784
1785 #[test]
1786 fn auto_capture_temporal_chain() {
1787 let dir = tempfile::tempdir().expect("tempdir");
1788 let brain = dir.path().join("chain.amem");
1789 let mut manager = SessionManager::open(brain.to_str().expect("path")).expect("open");
1790 manager.auto_capture_mode = AutoCaptureMode::Full;
1791
1792 let id1 = manager
1794 .capture_tool_call("memory_query", Some(&json!({"query": "first question"})))
1795 .expect("capture")
1796 .expect("node_id");
1797
1798 assert_eq!(manager.last_temporal_node_id(), Some(id1));
1799
1800 let id2 = manager
1802 .capture_tool_call(
1803 "memory_similar",
1804 Some(&json!({"query_text": "second question"})),
1805 )
1806 .expect("capture")
1807 .expect("node_id");
1808
1809 assert_eq!(manager.last_temporal_node_id(), Some(id2));
1810
1811 let has_temporal = manager.graph().edges().iter().any(|e| {
1813 e.source_id == id1 && e.target_id == id2 && e.edge_type == EdgeType::TemporalNext
1814 });
1815 assert!(has_temporal, "Expected TemporalNext edge from id1 to id2");
1816 }
1817
1818 #[test]
1819 fn temporal_chain_resets_on_new_session() {
1820 let dir = tempfile::tempdir().expect("tempdir");
1821 let brain = dir.path().join("reset.amem");
1822 let mut manager = SessionManager::open(brain.to_str().expect("path")).expect("open");
1823 manager.auto_capture_mode = AutoCaptureMode::Full;
1824
1825 let _id1 = manager
1826 .capture_tool_call("memory_query", Some(&json!({"query": "first"})))
1827 .expect("capture");
1828
1829 assert!(manager.last_temporal_node_id().is_some());
1830
1831 manager.start_session(None).expect("new session");
1833 assert!(manager.last_temporal_node_id().is_none());
1834 }
1835
1836 #[test]
1837 fn memory_add_joins_temporal_chain() {
1838 let dir = tempfile::tempdir().expect("tempdir");
1839 let brain = dir.path().join("splice.amem");
1840 let mut manager = SessionManager::open(brain.to_str().expect("path")).expect("open");
1841 manager.auto_capture_mode = AutoCaptureMode::Full;
1842
1843 let id1 = manager
1845 .capture_tool_call("memory_query", Some(&json!({"query": "something"})))
1846 .expect("capture")
1847 .expect("node_id");
1848
1849 let (id2, _) = manager
1851 .add_event(EventType::Fact, "User prefers dark mode", 0.9, vec![])
1852 .expect("add_event");
1853 manager.link_temporal(id1, id2).expect("link");
1854 manager.advance_temporal_chain(id2);
1855
1856 assert_eq!(manager.last_temporal_node_id(), Some(id2));
1857
1858 let has_edge = manager.graph().edges().iter().any(|e| {
1859 e.source_id == id1 && e.target_id == id2 && e.edge_type == EdgeType::TemporalNext
1860 });
1861 assert!(has_edge, "memory_add node should be linked into chain");
1862 }
1863}