1use std::collections::HashMap;
4use std::ffi::OsStr;
5use std::fs::OpenOptions;
6use std::io::Read as _;
7use std::path::{Path, PathBuf};
8use std::time::{Duration, Instant, SystemTime};
9
10use agentic_memory::{
11 AmemReader, AmemWriter, CognitiveEventBuilder, Edge, EdgeType, EventType, MemoryGraph,
12 QueryEngine, WriteEngine,
13};
14use serde_json::Value;
15
16use crate::types::{McpError, McpResult};
17
18const DEFAULT_AUTO_SAVE_SECS: u64 = 30;
20const DEFAULT_BACKUP_INTERVAL_SECS: u64 = 900;
22const DEFAULT_BACKUP_RETENTION: usize = 24;
24const DEFAULT_SLEEP_CYCLE_SECS: u64 = 1800;
26const DEFAULT_ARCHIVE_MIN_SESSION_NODES: usize = 25;
28const DEFAULT_HOT_MIN_DECAY: f32 = 0.7;
30const DEFAULT_WARM_MIN_DECAY: f32 = 0.3;
32const DEFAULT_SLA_MAX_MUTATIONS_PER_MIN: u32 = 240;
34const DEFAULT_HEALTH_LEDGER_EMIT_SECS: u64 = 30;
36const DEFAULT_STORAGE_BUDGET_BYTES: u64 = 2 * 1024 * 1024 * 1024;
38const DEFAULT_STORAGE_BUDGET_HORIZON_YEARS: u32 = 20;
40const DEFAULT_AUTO_CAPTURE_MAX_CHARS: usize = 2048;
42const CURRENT_AMEM_VERSION: u32 = 1;
44
45#[derive(Debug, Clone, Copy)]
46enum AutonomicProfile {
47 Desktop,
48 Cloud,
49 Aggressive,
50}
51
52#[derive(Debug, Clone, Copy, PartialEq, Eq)]
53enum StorageMigrationPolicy {
54 AutoSafe,
55 Strict,
56 Off,
57}
58
59#[derive(Debug, Clone, Copy, PartialEq, Eq)]
60enum StorageBudgetMode {
61 AutoRollup,
62 Warn,
63 Off,
64}
65
66#[derive(Debug, Clone, Copy, PartialEq, Eq)]
67enum AutoCaptureMode {
68 Safe,
70 Full,
72 Off,
74}
75
76#[derive(Debug, Clone, Copy)]
77struct ProfileDefaults {
78 auto_save_secs: u64,
79 backup_secs: u64,
80 backup_retention: usize,
81 sleep_cycle_secs: u64,
82 sleep_idle_secs: u64,
83 archive_min_session_nodes: usize,
84 hot_min_decay: f32,
85 warm_min_decay: f32,
86 sla_max_mutations_per_min: u32,
87}
88
89impl AutonomicProfile {
90 fn from_env(name: &str) -> Self {
91 let raw = read_env_string(name).unwrap_or_else(|| "desktop".to_string());
92 match raw.trim().to_ascii_lowercase().as_str() {
93 "cloud" => Self::Cloud,
94 "aggressive" => Self::Aggressive,
95 _ => Self::Desktop,
96 }
97 }
98
99 fn defaults(self) -> ProfileDefaults {
100 match self {
101 Self::Desktop => ProfileDefaults {
102 auto_save_secs: DEFAULT_AUTO_SAVE_SECS,
103 backup_secs: DEFAULT_BACKUP_INTERVAL_SECS,
104 backup_retention: DEFAULT_BACKUP_RETENTION,
105 sleep_cycle_secs: DEFAULT_SLEEP_CYCLE_SECS,
106 sleep_idle_secs: 180,
107 archive_min_session_nodes: DEFAULT_ARCHIVE_MIN_SESSION_NODES,
108 hot_min_decay: DEFAULT_HOT_MIN_DECAY,
109 warm_min_decay: DEFAULT_WARM_MIN_DECAY,
110 sla_max_mutations_per_min: DEFAULT_SLA_MAX_MUTATIONS_PER_MIN,
111 },
112 Self::Cloud => ProfileDefaults {
113 auto_save_secs: 15,
114 backup_secs: 600,
115 backup_retention: 48,
116 sleep_cycle_secs: 900,
117 sleep_idle_secs: 90,
118 archive_min_session_nodes: 50,
119 hot_min_decay: 0.75,
120 warm_min_decay: 0.4,
121 sla_max_mutations_per_min: 600,
122 },
123 Self::Aggressive => ProfileDefaults {
124 auto_save_secs: 10,
125 backup_secs: 300,
126 backup_retention: 16,
127 sleep_cycle_secs: 300,
128 sleep_idle_secs: 45,
129 archive_min_session_nodes: 15,
130 hot_min_decay: 0.8,
131 warm_min_decay: 0.5,
132 sla_max_mutations_per_min: 900,
133 },
134 }
135 }
136
137 fn as_str(self) -> &'static str {
138 match self {
139 Self::Desktop => "desktop",
140 Self::Cloud => "cloud",
141 Self::Aggressive => "aggressive",
142 }
143 }
144}
145
146impl StorageMigrationPolicy {
147 fn from_env(name: &str) -> Self {
148 let raw = read_env_string(name).unwrap_or_else(|| "auto-safe".to_string());
149 match raw.trim().to_ascii_lowercase().as_str() {
150 "strict" => Self::Strict,
151 "off" | "disabled" | "none" => Self::Off,
152 _ => Self::AutoSafe,
153 }
154 }
155
156 fn as_str(self) -> &'static str {
157 match self {
158 Self::AutoSafe => "auto-safe",
159 Self::Strict => "strict",
160 Self::Off => "off",
161 }
162 }
163}
164
165impl StorageBudgetMode {
166 fn from_env(name: &str) -> Self {
167 let raw = read_env_string(name).unwrap_or_else(|| "auto-rollup".to_string());
168 match raw.trim().to_ascii_lowercase().as_str() {
169 "warn" => Self::Warn,
170 "off" | "disabled" | "none" => Self::Off,
171 _ => Self::AutoRollup,
172 }
173 }
174
175 fn as_str(self) -> &'static str {
176 match self {
177 Self::AutoRollup => "auto-rollup",
178 Self::Warn => "warn",
179 Self::Off => "off",
180 }
181 }
182}
183
184impl AutoCaptureMode {
185 fn from_env(name: &str) -> Self {
186 let raw = read_env_string(name).unwrap_or_else(|| "safe".to_string());
187 match raw.trim().to_ascii_lowercase().as_str() {
188 "full" => Self::Full,
189 "off" | "disabled" | "none" => Self::Off,
190 _ => Self::Safe,
191 }
192 }
193
194 fn as_str(self) -> &'static str {
195 match self {
196 Self::Safe => "safe",
197 Self::Full => "full",
198 Self::Off => "off",
199 }
200 }
201}
202
203pub struct SessionManager {
205 graph: MemoryGraph,
206 query_engine: QueryEngine,
207 write_engine: WriteEngine,
208 file_path: PathBuf,
209 current_session: u32,
210 profile: AutonomicProfile,
211 migration_policy: StorageMigrationPolicy,
212 dirty: bool,
213 last_save: Instant,
214 auto_save_interval: Duration,
215 backup_interval: Duration,
216 backup_retention: usize,
217 backups_dir: PathBuf,
218 save_generation: u64,
219 last_backup_generation: u64,
220 last_backup: Instant,
221 sleep_cycle_interval: Duration,
222 archive_min_session_nodes: usize,
223 hot_min_decay: f32,
224 warm_min_decay: f32,
225 sla_max_mutations_per_min: u32,
226 last_sleep_cycle: Instant,
227 sleep_idle_min: Duration,
228 last_activity: Instant,
229 mutation_window_started: Instant,
230 mutation_window_count: u32,
231 maintenance_throttle_count: u64,
232 last_health_ledger_emit: Instant,
233 health_ledger_emit_interval: Duration,
234 storage_budget_mode: StorageBudgetMode,
235 storage_budget_max_bytes: u64,
236 storage_budget_horizon_years: u32,
237 storage_budget_target_fraction: f32,
238 storage_budget_rollup_count: u64,
239 auto_capture_mode: AutoCaptureMode,
240 auto_capture_redact: bool,
241 auto_capture_max_chars: usize,
242 auto_capture_count: u64,
243 last_temporal_node_id: Option<u64>,
246 last_file_mtime: Option<SystemTime>,
248}
249
250impl SessionManager {
251 pub fn open(path: &str) -> McpResult<Self> {
253 let file_path = PathBuf::from(path);
254 let dimension = agentic_memory::DEFAULT_DIMENSION;
255 let file_existed = file_path.exists();
256 let profile = AutonomicProfile::from_env("AMEM_AUTONOMIC_PROFILE");
257 let defaults = profile.defaults();
258 let migration_policy = StorageMigrationPolicy::from_env("AMEM_STORAGE_MIGRATION_POLICY");
259 let detected_version = if file_existed {
260 read_storage_version(&file_path)
261 } else {
262 None
263 };
264 let legacy_version = detected_version.filter(|v| *v < CURRENT_AMEM_VERSION);
265
266 let graph = if file_existed {
267 tracing::info!("Opening existing memory file: {}", file_path.display());
268 AmemReader::read_from_file(&file_path)
269 .map_err(|e| McpError::AgenticMemory(format!("Failed to read memory file: {e}")))?
270 } else {
271 tracing::info!("Creating new memory file: {}", file_path.display());
272 if let Some(parent) = file_path.parent() {
274 std::fs::create_dir_all(parent).map_err(|e| {
275 McpError::Io(std::io::Error::other(format!(
276 "Failed to create directory {}: {e}",
277 parent.display()
278 )))
279 })?;
280 }
281 MemoryGraph::new(dimension)
282 };
283
284 let session_ids = graph.session_index().session_ids();
288 let max_existing = session_ids.iter().copied().max().unwrap_or(0);
289 let pid_component = (std::process::id() % 1000) as u32;
290 let current_session = max_existing.saturating_add(1).saturating_add(pid_component);
291
292 tracing::info!(
293 "Session {} started. Graph has {} nodes, {} edges.",
294 current_session,
295 graph.node_count(),
296 graph.edge_count()
297 );
298 tracing::info!(
299 "Autonomic profile={} migration_policy={}",
300 profile.as_str(),
301 migration_policy.as_str()
302 );
303
304 let auto_save_secs = read_env_u64("AMEM_AUTOSAVE_SECS", defaults.auto_save_secs);
305 let backup_secs = read_env_u64("AMEM_AUTO_BACKUP_SECS", defaults.backup_secs).max(30);
306 let backup_retention =
307 read_env_usize("AMEM_AUTO_BACKUP_RETENTION", defaults.backup_retention).max(1);
308 let backups_dir = resolve_backups_dir(&file_path);
309 let sleep_cycle_secs =
310 read_env_u64("AMEM_SLEEP_CYCLE_SECS", defaults.sleep_cycle_secs).max(60);
311 let sleep_idle_secs =
312 read_env_u64("AMEM_SLEEP_IDLE_SECS", defaults.sleep_idle_secs).max(30);
313 let archive_min_session_nodes = read_env_usize(
314 "AMEM_ARCHIVE_MIN_SESSION_NODES",
315 defaults.archive_min_session_nodes,
316 )
317 .max(1);
318 let hot_min_decay =
319 read_env_f32("AMEM_TIER_HOT_MIN_DECAY", defaults.hot_min_decay).clamp(0.0, 1.0);
320 let warm_min_decay = read_env_f32("AMEM_TIER_WARM_MIN_DECAY", defaults.warm_min_decay)
321 .clamp(0.0, 1.0)
322 .min(hot_min_decay);
323 let sla_max_mutations_per_min = read_env_u32(
324 "AMEM_SLA_MAX_MUTATIONS_PER_MIN",
325 defaults.sla_max_mutations_per_min,
326 )
327 .max(1);
328 let health_ledger_emit_interval = Duration::from_secs(
329 read_env_u64(
330 "AMEM_HEALTH_LEDGER_EMIT_SECS",
331 DEFAULT_HEALTH_LEDGER_EMIT_SECS,
332 )
333 .max(5),
334 );
335 let storage_budget_mode = StorageBudgetMode::from_env("AMEM_STORAGE_BUDGET_MODE");
336 let storage_budget_max_bytes =
337 read_env_u64("AMEM_STORAGE_BUDGET_BYTES", DEFAULT_STORAGE_BUDGET_BYTES).max(1);
338 let storage_budget_horizon_years = read_env_u32(
339 "AMEM_STORAGE_BUDGET_HORIZON_YEARS",
340 DEFAULT_STORAGE_BUDGET_HORIZON_YEARS,
341 )
342 .max(1);
343 let storage_budget_target_fraction =
344 read_env_f32("AMEM_STORAGE_BUDGET_TARGET_FRACTION", 0.85).clamp(0.50, 0.99);
345 let auto_capture_mode = AutoCaptureMode::from_env("AMEM_AUTO_CAPTURE_MODE");
346 let auto_capture_redact = read_env_bool("AMEM_AUTO_CAPTURE_REDACT", true);
347 let auto_capture_max_chars = read_env_usize(
348 "AMEM_AUTO_CAPTURE_MAX_CHARS",
349 DEFAULT_AUTO_CAPTURE_MAX_CHARS,
350 )
351 .clamp(256, 16384);
352
353 let mut manager = Self {
354 graph,
355 query_engine: QueryEngine::new(),
356 write_engine: WriteEngine::new(dimension),
357 file_path,
358 current_session,
359 profile,
360 migration_policy,
361 dirty: false,
362 last_save: Instant::now(),
363 auto_save_interval: Duration::from_secs(auto_save_secs),
364 backup_interval: Duration::from_secs(backup_secs),
365 backup_retention,
366 backups_dir,
367 save_generation: if file_existed { 1 } else { 0 },
368 last_backup_generation: 0,
369 last_backup: Instant::now(),
370 sleep_cycle_interval: Duration::from_secs(sleep_cycle_secs),
371 archive_min_session_nodes,
372 hot_min_decay,
373 warm_min_decay,
374 sla_max_mutations_per_min,
375 last_sleep_cycle: Instant::now(),
376 sleep_idle_min: Duration::from_secs(sleep_idle_secs),
377 last_activity: Instant::now(),
378 mutation_window_started: Instant::now(),
379 mutation_window_count: 0,
380 maintenance_throttle_count: 0,
381 last_health_ledger_emit: Instant::now()
382 .checked_sub(health_ledger_emit_interval)
383 .unwrap_or_else(Instant::now),
384 health_ledger_emit_interval,
385 storage_budget_mode,
386 storage_budget_max_bytes,
387 storage_budget_horizon_years,
388 storage_budget_target_fraction,
389 storage_budget_rollup_count: 0,
390 auto_capture_mode,
391 auto_capture_redact,
392 auto_capture_max_chars,
393 auto_capture_count: 0,
394 last_temporal_node_id: None,
395 last_file_mtime: if file_existed {
396 std::fs::metadata(path).and_then(|m| m.modified()).ok()
397 } else {
398 None
399 },
400 };
401
402 if let Some(version) = legacy_version {
403 match migration_policy {
404 StorageMigrationPolicy::Strict => {
405 return Err(McpError::AgenticMemory(format!(
406 "Legacy .amem version {} blocked by strict migration policy",
407 version
408 )));
409 }
410 StorageMigrationPolicy::Off => {
411 tracing::warn!(
412 "Legacy storage version detected (v{}), auto-migration disabled by policy",
413 version
414 );
415 }
416 StorageMigrationPolicy::AutoSafe => {
417 if let Some(checkpoint) = manager.create_migration_checkpoint(version)? {
418 tracing::info!(
419 "Legacy storage version detected (v{}), checkpoint created at {}",
420 version,
421 checkpoint.display()
422 );
423 }
424 manager.dirty = true;
425 manager.save()?;
426 tracing::info!(
427 "Auto-migrated memory storage from v{} to v{} at {}",
428 version,
429 CURRENT_AMEM_VERSION,
430 manager.file_path.display()
431 );
432 }
433 }
434 }
435
436 Ok(manager)
437 }
438
439 pub fn graph(&self) -> &MemoryGraph {
441 &self.graph
442 }
443
444 pub fn graph_mut(&mut self) -> &mut MemoryGraph {
446 self.dirty = true;
447 self.last_activity = Instant::now();
448 self.record_mutation();
449 &mut self.graph
450 }
451
452 pub fn query_engine(&self) -> &QueryEngine {
454 &self.query_engine
455 }
456
457 pub fn write_engine(&self) -> &WriteEngine {
459 &self.write_engine
460 }
461
462 pub fn current_session_id(&self) -> u32 {
464 self.current_session
465 }
466
467 pub fn start_session(&mut self, explicit_id: Option<u32>) -> McpResult<u32> {
469 let session_id = explicit_id.unwrap_or_else(|| {
470 let ids = self.graph.session_index().session_ids();
471 let max_indexed = ids.iter().copied().max().unwrap_or(0);
472 max_indexed.max(self.current_session).saturating_add(1)
474 });
475
476 self.current_session = session_id;
477 self.last_temporal_node_id = None;
478 self.last_activity = Instant::now();
479 tracing::info!("Started session {session_id}");
480 Ok(session_id)
481 }
482
483 pub fn end_session_with_episode(&mut self, session_id: u32, summary: &str) -> McpResult<u64> {
485 let episode_id = self
486 .write_engine
487 .compress_session(&mut self.graph, session_id, summary)
488 .map_err(|e| McpError::AgenticMemory(format!("Failed to compress session: {e}")))?;
489
490 self.dirty = true;
491 self.last_activity = Instant::now();
492 self.record_mutation();
493 self.save()?;
494
495 tracing::info!("Ended session {session_id}, created episode node {episode_id}");
496
497 Ok(episode_id)
498 }
499
500 pub fn save(&mut self) -> McpResult<()> {
508 if !self.dirty {
509 return Ok(());
510 }
511
512 let _lock = FileLock::acquire(&self.file_path)?;
513
514 if self.file_path.exists() {
516 let current_mtime = std::fs::metadata(&self.file_path)
517 .and_then(|m| m.modified())
518 .ok();
519 if let (Some(current), Some(last_known)) = (current_mtime, self.last_file_mtime) {
520 if current > last_known {
521 tracing::info!("Detected external modification, merging with disk state");
522 self.merge_with_disk()?;
523 }
524 }
525 }
526
527 let writer = AmemWriter::new(self.graph.dimension());
528 writer
529 .write_to_file(&self.graph, &self.file_path)
530 .map_err(|e| McpError::AgenticMemory(format!("Failed to write memory file: {e}")))?;
531
532 self.last_file_mtime = std::fs::metadata(&self.file_path)
534 .and_then(|m| m.modified())
535 .ok();
536
537 self.dirty = false;
538 self.last_save = Instant::now();
539 self.save_generation = self.save_generation.saturating_add(1);
540 tracing::debug!("Saved memory file: {}", self.file_path.display());
541 Ok(())
542 }
543
544 fn merge_with_disk(&mut self) -> McpResult<()> {
550 let disk_graph = AmemReader::read_from_file(&self.file_path)
551 .map_err(|e| McpError::AgenticMemory(format!("Failed to re-read for merge: {e}")))?;
552
553 let our_nodes: Vec<_> = self
555 .graph
556 .nodes()
557 .iter()
558 .filter(|n| n.session_id == self.current_session)
559 .cloned()
560 .collect();
561
562 let our_node_ids: std::collections::HashSet<u64> = our_nodes.iter().map(|n| n.id).collect();
564 let our_edges: Vec<_> = self
565 .graph
566 .edges()
567 .iter()
568 .filter(|e| our_node_ids.contains(&e.source_id) || our_node_ids.contains(&e.target_id))
569 .cloned()
570 .collect();
571
572 self.graph = disk_graph;
574
575 let mut id_map: HashMap<u64, u64> = HashMap::new();
577 for node in &our_nodes {
578 let event = CognitiveEventBuilder::new(node.event_type, node.content.clone())
579 .session_id(self.current_session)
580 .confidence(node.confidence)
581 .build();
582 let result = self
583 .write_engine
584 .ingest(&mut self.graph, vec![event], vec![])
585 .map_err(|e| McpError::AgenticMemory(format!("Merge node re-add failed: {e}")))?;
586 if let Some(&new_id) = result.new_node_ids.first() {
587 id_map.insert(node.id, new_id);
588 }
589 }
590
591 for edge in &our_edges {
593 let source = id_map
594 .get(&edge.source_id)
595 .copied()
596 .unwrap_or(edge.source_id);
597 let target = id_map
598 .get(&edge.target_id)
599 .copied()
600 .unwrap_or(edge.target_id);
601 let new_edge = Edge::new(source, target, edge.edge_type, edge.weight);
602 if let Err(e) = self.graph.add_edge(new_edge) {
603 tracing::warn!("Merge edge re-add skipped: {e}");
604 }
605 }
606
607 tracing::info!(
608 "Merged {} nodes and {} edges from session {} into disk state",
609 our_nodes.len(),
610 our_edges.len(),
611 self.current_session
612 );
613 Ok(())
614 }
615
616 pub fn maybe_auto_save(&mut self) -> McpResult<()> {
618 if self.dirty && self.last_save.elapsed() >= self.auto_save_interval {
619 self.save()?;
620 }
621 Ok(())
622 }
623
624 pub fn run_maintenance_tick(&mut self) -> McpResult<()> {
626 if self.should_throttle_maintenance() {
627 self.maintenance_throttle_count = self.maintenance_throttle_count.saturating_add(1);
628 self.maybe_auto_save()?;
629 self.emit_health_ledger("throttled")?;
630 tracing::debug!(
631 "Maintenance throttled by SLA guard: mutation_rate={} threshold={}",
632 self.mutation_rate_per_min(),
633 self.sla_max_mutations_per_min
634 );
635 return Ok(());
636 }
637
638 self.maybe_run_sleep_cycle()?;
639 self.maybe_auto_save()?;
640 self.maybe_enforce_storage_budget()?;
641 self.maybe_auto_backup()?;
642 self.emit_health_ledger("normal")?;
643 Ok(())
644 }
645
646 pub fn maybe_run_sleep_cycle(&mut self) -> McpResult<()> {
648 if self.last_sleep_cycle.elapsed() < self.sleep_cycle_interval {
649 return Ok(());
650 }
651 if self.last_activity.elapsed() < self.sleep_idle_min {
652 return Ok(());
653 }
654
655 let now = agentic_memory::now_micros();
656 let decay_report = self
657 .write_engine
658 .run_decay(&mut self.graph, now)
659 .map_err(|e| McpError::AgenticMemory(format!("Sleep-cycle decay failed: {e}")))?;
660 let archived_sessions = self.auto_archive_completed_sessions()?;
661
662 if decay_report.nodes_decayed > 0 || archived_sessions > 0 {
663 self.dirty = true;
664 self.save()?;
665 }
666
667 let (hot, warm, cold) = self.tier_counts();
668 self.last_sleep_cycle = Instant::now();
669 tracing::info!(
670 "Sleep-cycle complete: decayed={} archived_sessions={} tiers(h/w/c)={}/{}/{}",
671 decay_report.nodes_decayed,
672 archived_sessions,
673 hot,
674 warm,
675 cold
676 );
677 Ok(())
678 }
679
680 pub fn maybe_auto_backup(&mut self) -> McpResult<()> {
682 if self.last_backup.elapsed() < self.backup_interval {
683 return Ok(());
684 }
685 if self.save_generation <= self.last_backup_generation {
686 return Ok(());
687 }
688 if !self.file_path.exists() {
689 return Ok(());
690 }
691
692 std::fs::create_dir_all(&self.backups_dir).map_err(McpError::Io)?;
693 let backup_path = self.next_backup_path();
694 std::fs::copy(&self.file_path, &backup_path).map_err(McpError::Io)?;
695 self.last_backup_generation = self.save_generation;
696 self.last_backup = Instant::now();
697 self.prune_old_backups()?;
698 tracing::info!("Auto-backup written: {}", backup_path.display());
699 Ok(())
700 }
701
702 pub fn mark_dirty(&mut self) {
704 self.dirty = true;
705 self.last_activity = Instant::now();
706 self.record_mutation();
707 }
708
709 pub fn file_path(&self) -> &PathBuf {
711 &self.file_path
712 }
713
714 pub fn last_temporal_node_id(&self) -> Option<u64> {
716 self.last_temporal_node_id
717 }
718
719 pub fn advance_temporal_chain(&mut self, node_id: u64) {
721 self.last_temporal_node_id = Some(node_id);
722 }
723
724 pub fn link_temporal(&mut self, prev_id: u64, next_id: u64) -> McpResult<()> {
726 let edge = Edge::new(prev_id, next_id, EdgeType::TemporalNext, 1.0);
727 self.graph
728 .add_edge(edge)
729 .map_err(|e| McpError::AgenticMemory(format!("Failed to add temporal edge: {e}")))?;
730 self.dirty = true;
731 Ok(())
732 }
733
734 pub fn maintenance_interval(&self) -> Duration {
736 self.auto_save_interval
737 .min(self.backup_interval)
738 .min(self.sleep_cycle_interval)
739 }
740
741 pub fn capture_prompt_request(
743 &mut self,
744 prompt_name: &str,
745 arguments: Option<&Value>,
746 ) -> McpResult<Option<u64>> {
747 if self.auto_capture_mode == AutoCaptureMode::Off {
748 return Ok(None);
749 }
750 match extract_prompt_capture_text(prompt_name, arguments)? {
751 Some(text) => self.persist_auto_capture(EventType::Fact, &text, 0.90),
752 None => Ok(None),
753 }
754 }
755
756 pub fn capture_tool_call(
758 &mut self,
759 tool_name: &str,
760 arguments: Option<&Value>,
761 ) -> McpResult<Option<u64>> {
762 if self.auto_capture_mode == AutoCaptureMode::Off {
763 return Ok(None);
764 }
765
766 let text = match self.auto_capture_mode {
767 AutoCaptureMode::Safe => extract_safe_tool_capture_text(tool_name, arguments)?,
768 AutoCaptureMode::Full => extract_full_tool_capture_text(tool_name, arguments)?,
769 AutoCaptureMode::Off => None,
770 };
771 match text {
772 Some(v) => self.persist_auto_capture(EventType::Inference, &v, 0.82),
773 None => Ok(None),
774 }
775 }
776
777 pub fn add_event(
779 &mut self,
780 event_type: EventType,
781 content: &str,
782 confidence: f32,
783 edges: Vec<(u64, EdgeType, f32)>,
784 ) -> McpResult<(u64, usize)> {
785 let event = CognitiveEventBuilder::new(event_type, content.to_string())
786 .session_id(self.current_session)
787 .confidence(confidence)
788 .build();
789
790 let result = self
792 .write_engine
793 .ingest(&mut self.graph, vec![event], vec![])
794 .map_err(|e| McpError::AgenticMemory(format!("Failed to add event: {e}")))?;
795
796 let node_id = result.new_node_ids.first().copied().ok_or_else(|| {
797 McpError::InternalError("No node ID returned from ingest".to_string())
798 })?;
799
800 let mut edge_count = 0;
802 for (target_id, edge_type, weight) in &edges {
803 let edge = Edge::new(node_id, *target_id, *edge_type, *weight);
804 self.graph
805 .add_edge(edge)
806 .map_err(|e| McpError::AgenticMemory(format!("Failed to add edge: {e}")))?;
807 edge_count += 1;
808 }
809
810 self.dirty = true;
811 self.last_activity = Instant::now();
812 self.record_mutation();
813 self.maybe_auto_save()?;
814
815 Ok((node_id, edge_count))
816 }
817
818 pub fn correct_node(&mut self, old_node_id: u64, new_content: &str) -> McpResult<u64> {
820 let new_id = self
821 .write_engine
822 .correct(
823 &mut self.graph,
824 old_node_id,
825 new_content,
826 self.current_session,
827 )
828 .map_err(|e| McpError::AgenticMemory(format!("Failed to correct node: {e}")))?;
829
830 self.dirty = true;
831 self.last_activity = Instant::now();
832 self.record_mutation();
833 self.maybe_auto_save()?;
834
835 Ok(new_id)
836 }
837
838 fn record_mutation(&mut self) {
839 if self.mutation_window_started.elapsed() >= Duration::from_secs(60) {
840 self.mutation_window_started = Instant::now();
841 self.mutation_window_count = 0;
842 }
843 self.mutation_window_count = self.mutation_window_count.saturating_add(1);
844 }
845
846 fn mutation_rate_per_min(&self) -> u32 {
847 let elapsed = self.mutation_window_started.elapsed().as_secs().max(1);
848 let scaled = (self.mutation_window_count as u64)
849 .saturating_mul(60)
850 .saturating_div(elapsed);
851 scaled.min(u32::MAX as u64) as u32
852 }
853
854 fn should_throttle_maintenance(&self) -> bool {
855 self.mutation_rate_per_min() > self.sla_max_mutations_per_min
856 }
857
858 fn emit_health_ledger(&mut self, maintenance_mode: &str) -> McpResult<()> {
859 if self.last_health_ledger_emit.elapsed() < self.health_ledger_emit_interval {
860 return Ok(());
861 }
862
863 let dir = resolve_health_ledger_dir();
864 std::fs::create_dir_all(&dir).map_err(McpError::Io)?;
865 let path = dir.join("agentic-memory.json");
866 let tmp = dir.join("agentic-memory.json.tmp");
867 let (hot, warm, cold) = self.tier_counts();
868 let current_size_bytes = self.current_file_size_bytes();
869 let projected_size_bytes = self.projected_file_size_bytes(current_size_bytes);
870 let over_budget = current_size_bytes > self.storage_budget_max_bytes
871 || projected_size_bytes
872 .map(|v| v > self.storage_budget_max_bytes)
873 .unwrap_or(false);
874 let payload = serde_json::json!({
875 "project": "AgenticMemory",
876 "timestamp": chrono::Utc::now().to_rfc3339(),
877 "status": "ok",
878 "autonomic": {
879 "profile": self.profile.as_str(),
880 "migration_policy": self.migration_policy.as_str(),
881 "maintenance_mode": maintenance_mode,
882 "throttle_count": self.maintenance_throttle_count,
883 },
884 "sla": {
885 "mutation_rate_per_min": self.mutation_rate_per_min(),
886 "max_mutations_per_min": self.sla_max_mutations_per_min
887 },
888 "storage": {
889 "file": self.file_path.display().to_string(),
890 "dirty": self.dirty,
891 "save_generation": self.save_generation,
892 "backup_retention": self.backup_retention,
893 },
894 "storage_budget": {
895 "mode": self.storage_budget_mode.as_str(),
896 "max_bytes": self.storage_budget_max_bytes,
897 "horizon_years": self.storage_budget_horizon_years,
898 "target_fraction": self.storage_budget_target_fraction,
899 "current_size_bytes": current_size_bytes,
900 "projected_size_bytes": projected_size_bytes,
901 "over_budget": over_budget,
902 "rollup_count": self.storage_budget_rollup_count,
903 },
904 "auto_capture": {
905 "mode": self.auto_capture_mode.as_str(),
906 "redact": self.auto_capture_redact,
907 "max_chars": self.auto_capture_max_chars,
908 "captured_count": self.auto_capture_count
909 },
910 "graph": {
911 "nodes": self.graph.node_count(),
912 "edges": self.graph.edge_count(),
913 "tiers": {
914 "hot": hot,
915 "warm": warm,
916 "cold": cold,
917 },
918 },
919 });
920 let bytes = serde_json::to_vec_pretty(&payload).map_err(|e| {
921 McpError::AgenticMemory(format!("Failed to encode health ledger payload: {e}"))
922 })?;
923 std::fs::write(&tmp, bytes).map_err(McpError::Io)?;
924 std::fs::rename(&tmp, &path).map_err(McpError::Io)?;
925 self.last_health_ledger_emit = Instant::now();
926 Ok(())
927 }
928}
929
930impl Drop for SessionManager {
931 fn drop(&mut self) {
932 if self.dirty {
933 if let Err(e) = self.save() {
934 tracing::error!("Failed to save on drop: {e}");
935 }
936 }
937 if let Err(e) = self.maybe_auto_backup() {
938 tracing::error!("Failed auto-backup on drop: {e}");
939 }
940 }
941}
942
943impl SessionManager {
944 fn auto_archive_completed_sessions(&mut self) -> McpResult<usize> {
945 self.auto_archive_completed_sessions_with_min(self.archive_min_session_nodes)
946 }
947
948 fn auto_archive_completed_sessions_with_min(
949 &mut self,
950 min_session_nodes: usize,
951 ) -> McpResult<usize> {
952 let mut session_ids = self.graph.session_index().session_ids();
953 session_ids.sort_unstable();
954
955 let mut archived = 0usize;
956 for session_id in session_ids {
957 if session_id >= self.current_session {
958 continue;
959 }
960
961 let node_ids = self.graph.session_index().get_session(session_id).to_vec();
962 if node_ids.is_empty() {
963 continue;
964 }
965
966 let mut has_episode = false;
967 let mut event_nodes = 0usize;
968 let mut hot = 0usize;
969 let mut warm = 0usize;
970 let mut cold = 0usize;
971
972 for node_id in &node_ids {
973 if let Some(node) = self.graph.get_node(*node_id) {
974 if node.event_type == EventType::Episode {
975 has_episode = true;
976 continue;
977 }
978 event_nodes += 1;
979 if node.decay_score >= self.hot_min_decay {
980 hot += 1;
981 } else if node.decay_score >= self.warm_min_decay {
982 warm += 1;
983 } else {
984 cold += 1;
985 }
986 }
987 }
988
989 if has_episode || event_nodes < min_session_nodes {
990 continue;
991 }
992
993 let summary = format!(
994 "Auto-archive session {}: {} events ({} hot / {} warm / {} cold)",
995 session_id, event_nodes, hot, warm, cold
996 );
997 self.write_engine
998 .compress_session(&mut self.graph, session_id, &summary)
999 .map_err(|e| {
1000 McpError::AgenticMemory(format!(
1001 "Auto-archive failed for session {session_id}: {e}"
1002 ))
1003 })?;
1004 archived = archived.saturating_add(1);
1005 }
1006
1007 Ok(archived)
1008 }
1009
1010 fn maybe_enforce_storage_budget(&mut self) -> McpResult<()> {
1011 if self.storage_budget_mode == StorageBudgetMode::Off {
1012 return Ok(());
1013 }
1014
1015 let current_size = self.current_file_size_bytes();
1016 if current_size == 0 {
1017 return Ok(());
1018 }
1019 let projected = self.projected_file_size_bytes(current_size);
1020 let over_current = current_size > self.storage_budget_max_bytes;
1021 let over_projected = projected
1022 .map(|v| v > self.storage_budget_max_bytes)
1023 .unwrap_or(false);
1024
1025 if !over_current && !over_projected {
1026 return Ok(());
1027 }
1028
1029 if self.storage_budget_mode == StorageBudgetMode::Warn {
1030 tracing::warn!(
1031 "Storage budget warning: current={} projected={:?} budget={} (mode=warn)",
1032 current_size,
1033 projected,
1034 self.storage_budget_max_bytes
1035 );
1036 return Ok(());
1037 }
1038
1039 let target_bytes = ((self.storage_budget_max_bytes as f64
1040 * self.storage_budget_target_fraction as f64)
1041 .round() as u64)
1042 .max(1);
1043 let mut rollup_count = 0usize;
1044 let mut threshold = self.archive_min_session_nodes.saturating_div(2).max(1);
1045
1046 for _ in 0..3 {
1047 let archived = self.auto_archive_completed_sessions_with_min(threshold)?;
1048 if archived == 0 {
1049 if threshold > 1 {
1050 threshold = 1;
1051 continue;
1052 }
1053 break;
1054 }
1055 rollup_count += archived;
1056 self.dirty = true;
1057 self.save()?;
1058 let new_size = self.current_file_size_bytes();
1059 if new_size <= target_bytes {
1060 break;
1061 }
1062 threshold = 1;
1063 }
1064
1065 if rollup_count > 0 {
1066 self.storage_budget_rollup_count = self
1067 .storage_budget_rollup_count
1068 .saturating_add(rollup_count as u64);
1069 tracing::info!(
1070 "Storage budget rollup applied: archived_sessions={} budget={} target={} current={}",
1071 rollup_count,
1072 self.storage_budget_max_bytes,
1073 target_bytes,
1074 self.current_file_size_bytes()
1075 );
1076 } else {
1077 tracing::warn!(
1078 "Storage budget exceeded but no completed sessions eligible for rollup (current={} projected={:?} budget={})",
1079 current_size,
1080 projected,
1081 self.storage_budget_max_bytes
1082 );
1083 }
1084
1085 Ok(())
1086 }
1087
1088 fn current_file_size_bytes(&self) -> u64 {
1089 std::fs::metadata(&self.file_path)
1090 .map(|m| m.len())
1091 .unwrap_or(0)
1092 }
1093
1094 fn persist_auto_capture(
1095 &mut self,
1096 event_type: EventType,
1097 raw_text: &str,
1098 confidence: f32,
1099 ) -> McpResult<Option<u64>> {
1100 let mut text = raw_text.trim().to_string();
1101 if text.is_empty() {
1102 return Ok(None);
1103 }
1104
1105 if self.auto_capture_redact {
1106 text = redact_sensitive_tokens(&text);
1107 }
1108
1109 if text.len() > self.auto_capture_max_chars {
1110 text.truncate(self.auto_capture_max_chars);
1111 text.push_str(" …[truncated]");
1112 }
1113
1114 let prev_id = self.last_temporal_node_id;
1115 let (node_id, _) = self.add_event(event_type, &text, confidence, vec![])?;
1116
1117 if let Some(prev) = prev_id {
1119 if let Err(e) = self.link_temporal(prev, node_id) {
1120 tracing::warn!("Failed to link temporal chain: {e}");
1121 }
1122 }
1123 self.last_temporal_node_id = Some(node_id);
1124
1125 self.auto_capture_count = self.auto_capture_count.saturating_add(1);
1126 Ok(Some(node_id))
1127 }
1128
1129 fn projected_file_size_bytes(&self, current_size: u64) -> Option<u64> {
1130 if current_size == 0 || self.graph.node_count() < 2 {
1131 return None;
1132 }
1133 let mut min_ts = u64::MAX;
1134 let mut max_ts = 0u64;
1135 for node in self.graph.nodes() {
1136 min_ts = min_ts.min(node.created_at);
1137 max_ts = max_ts.max(node.created_at);
1138 }
1139 if min_ts == u64::MAX || max_ts <= min_ts {
1140 return None;
1141 }
1142
1143 let span_secs_raw = (max_ts - min_ts) / 1_000_000;
1144 let span_secs = span_secs_raw.max(7 * 24 * 3600) as f64;
1146 let per_sec = current_size as f64 / span_secs;
1147 let horizon_secs = (self.storage_budget_horizon_years as f64) * 365.25 * 24.0 * 3600.0;
1148 let projected = (per_sec * horizon_secs).round();
1149 Some(projected.max(0.0).min(u64::MAX as f64) as u64)
1150 }
1151
1152 fn tier_counts(&self) -> (usize, usize, usize) {
1153 let mut hot = 0usize;
1154 let mut warm = 0usize;
1155 let mut cold = 0usize;
1156
1157 for node in self.graph.nodes() {
1158 if node.event_type == EventType::Episode {
1159 continue;
1160 }
1161 if node.decay_score >= self.hot_min_decay {
1162 hot += 1;
1163 } else if node.decay_score >= self.warm_min_decay {
1164 warm += 1;
1165 } else {
1166 cold += 1;
1167 }
1168 }
1169
1170 (hot, warm, cold)
1171 }
1172
1173 fn create_migration_checkpoint(&self, from_version: u32) -> McpResult<Option<PathBuf>> {
1174 if !self.file_path.exists() {
1175 return Ok(None);
1176 }
1177
1178 let migration_dir = resolve_migration_dir(&self.file_path);
1179 std::fs::create_dir_all(&migration_dir).map_err(McpError::Io)?;
1180
1181 let ts = chrono::Utc::now().format("%Y%m%d%H%M%S");
1182 let stem = self
1183 .file_path
1184 .file_stem()
1185 .and_then(OsStr::to_str)
1186 .unwrap_or("brain");
1187 let checkpoint = migration_dir.join(format!("{stem}.v{from_version}.{ts}.amem.checkpoint"));
1188 std::fs::copy(&self.file_path, &checkpoint).map_err(McpError::Io)?;
1189 Ok(Some(checkpoint))
1190 }
1191
1192 fn next_backup_path(&self) -> PathBuf {
1193 let ts = chrono::Utc::now().format("%Y%m%d%H%M%S");
1194 let stem = self
1195 .file_path
1196 .file_stem()
1197 .and_then(OsStr::to_str)
1198 .unwrap_or("brain");
1199 self.backups_dir.join(format!("{stem}.{ts}.amem.bak"))
1200 }
1201
1202 fn prune_old_backups(&self) -> McpResult<()> {
1203 let mut entries = std::fs::read_dir(&self.backups_dir)
1204 .map_err(McpError::Io)?
1205 .filter_map(Result::ok)
1206 .filter(|entry| {
1207 entry
1208 .file_name()
1209 .to_str()
1210 .map(|name| name.ends_with(".amem.bak"))
1211 .unwrap_or(false)
1212 })
1213 .collect::<Vec<_>>();
1214
1215 if entries.len() <= self.backup_retention {
1216 return Ok(());
1217 }
1218
1219 entries.sort_by_key(|entry| {
1220 entry
1221 .metadata()
1222 .and_then(|m| m.modified())
1223 .ok()
1224 .unwrap_or(std::time::SystemTime::UNIX_EPOCH)
1225 });
1226 let to_remove = entries.len().saturating_sub(self.backup_retention);
1227 for entry in entries.into_iter().take(to_remove) {
1228 let _ = std::fs::remove_file(entry.path());
1229 }
1230 Ok(())
1231 }
1232}
1233
1234fn resolve_backups_dir(memory_path: &std::path::Path) -> PathBuf {
1235 if let Ok(custom) = std::env::var("AMEM_AUTO_BACKUP_DIR") {
1236 let trimmed = custom.trim();
1237 if !trimmed.is_empty() {
1238 return PathBuf::from(trimmed);
1239 }
1240 }
1241
1242 let parent = memory_path.parent().unwrap_or(std::path::Path::new("."));
1243 parent.join(".amem-backups")
1244}
1245
1246fn resolve_migration_dir(memory_path: &Path) -> PathBuf {
1247 let parent = memory_path.parent().unwrap_or(std::path::Path::new("."));
1248 parent.join(".amem-migrations")
1249}
1250
1251fn read_storage_version(path: &Path) -> Option<u32> {
1252 let mut file = std::fs::File::open(path).ok()?;
1253 let mut header = [0u8; 8];
1254 file.read_exact(&mut header).ok()?;
1255 if &header[0..4] != b"AMEM" {
1256 return None;
1257 }
1258 Some(u32::from_le_bytes([
1259 header[4], header[5], header[6], header[7],
1260 ]))
1261}
1262
1263fn read_env_u64(name: &str, default_value: u64) -> u64 {
1264 std::env::var(name)
1265 .ok()
1266 .and_then(|v| v.parse::<u64>().ok())
1267 .unwrap_or(default_value)
1268}
1269
1270fn read_env_u32(name: &str, default_value: u32) -> u32 {
1271 std::env::var(name)
1272 .ok()
1273 .and_then(|v| v.parse::<u32>().ok())
1274 .unwrap_or(default_value)
1275}
1276
1277fn read_env_usize(name: &str, default_value: usize) -> usize {
1278 std::env::var(name)
1279 .ok()
1280 .and_then(|v| v.parse::<usize>().ok())
1281 .unwrap_or(default_value)
1282}
1283
1284fn read_env_f32(name: &str, default_value: f32) -> f32 {
1285 std::env::var(name)
1286 .ok()
1287 .and_then(|v| v.parse::<f32>().ok())
1288 .unwrap_or(default_value)
1289}
1290
1291fn read_env_bool(name: &str, default_value: bool) -> bool {
1292 std::env::var(name)
1293 .ok()
1294 .map(|v| {
1295 matches!(
1296 v.trim().to_ascii_lowercase().as_str(),
1297 "1" | "true" | "yes" | "on"
1298 )
1299 })
1300 .unwrap_or(default_value)
1301}
1302
1303fn read_env_string(name: &str) -> Option<String> {
1304 std::env::var(name).ok().map(|v| v.trim().to_string())
1305}
1306
1307fn resolve_health_ledger_dir() -> PathBuf {
1308 if let Some(custom) = read_env_string("AMEM_HEALTH_LEDGER_DIR") {
1309 if !custom.is_empty() {
1310 return PathBuf::from(custom);
1311 }
1312 }
1313 if let Some(custom) = read_env_string("AGENTRA_HEALTH_LEDGER_DIR") {
1314 if !custom.is_empty() {
1315 return PathBuf::from(custom);
1316 }
1317 }
1318
1319 let home = std::env::var("HOME")
1320 .ok()
1321 .map(PathBuf::from)
1322 .unwrap_or_else(|| PathBuf::from("."));
1323 home.join(".agentra").join("health-ledger")
1324}
1325
1326fn extract_prompt_capture_text(
1327 prompt_name: &str,
1328 arguments: Option<&Value>,
1329) -> McpResult<Option<String>> {
1330 let args = arguments.unwrap_or(&Value::Null);
1331 let fields = collect_text_fields_by_keys(
1332 args,
1333 &[
1334 "information",
1335 "context",
1336 "topic",
1337 "old_belief",
1338 "new_information",
1339 "reason",
1340 "summary",
1341 "instruction",
1342 "prompt",
1343 "query",
1344 ],
1345 8,
1346 );
1347 if fields.is_empty() {
1348 return Ok(None);
1349 }
1350 let joined = fields.join(" | ");
1351 Ok(Some(format!(
1352 "[auto-capture][prompt] template={prompt_name} input={joined}"
1353 )))
1354}
1355
1356fn extract_safe_tool_capture_text(
1357 tool_name: &str,
1358 arguments: Option<&Value>,
1359) -> McpResult<Option<String>> {
1360 let args = arguments.unwrap_or(&Value::Null);
1361 let keys = ["feedback", "summary", "note"];
1362 if tool_name != "session_end" {
1363 let explicit_feedback = collect_text_fields_by_keys(args, &["feedback", "note"], 4);
1365 if explicit_feedback.is_empty() {
1366 return Ok(None);
1367 }
1368 }
1369 let fields = collect_text_fields_by_keys(args, &keys, 6);
1370 if fields.is_empty() {
1371 return Ok(None);
1372 }
1373 Ok(Some(format!(
1374 "[auto-capture][feedback] tool={tool_name} context={}",
1375 fields.join(" | ")
1376 )))
1377}
1378
1379fn extract_full_tool_capture_text(
1380 tool_name: &str,
1381 arguments: Option<&Value>,
1382) -> McpResult<Option<String>> {
1383 if tool_name == "memory_add" {
1384 return Ok(None);
1385 }
1386 let args = arguments.unwrap_or(&Value::Null);
1387 let preferred = collect_text_fields_by_keys(
1388 args,
1389 &[
1390 "query",
1391 "content",
1392 "prompt",
1393 "new_content",
1394 "reason",
1395 "summary",
1396 "topic",
1397 "instruction",
1398 "information",
1399 "context",
1400 "feedback",
1401 ],
1402 10,
1403 );
1404
1405 let fields = if preferred.is_empty() {
1406 collect_all_string_like_fields(args, 8)
1407 } else {
1408 preferred
1409 };
1410
1411 if fields.is_empty() {
1412 return Ok(None);
1413 }
1414 Ok(Some(format!(
1415 "[auto-capture][tool] tool={tool_name} input={}",
1416 fields.join(" | ")
1417 )))
1418}
1419
1420fn collect_text_fields_by_keys(value: &Value, keys: &[&str], limit: usize) -> Vec<String> {
1421 let mut out = Vec::new();
1422 let mut seen = std::collections::BTreeSet::<String>::new();
1423
1424 fn walk(
1425 value: &Value,
1426 path: String,
1427 keys: &[&str],
1428 out: &mut Vec<String>,
1429 seen: &mut std::collections::BTreeSet<String>,
1430 limit: usize,
1431 ) {
1432 if out.len() >= limit {
1433 return;
1434 }
1435 match value {
1436 Value::Object(map) => {
1437 for (k, v) in map {
1438 if out.len() >= limit {
1439 break;
1440 }
1441 let next = if path.is_empty() {
1442 k.to_string()
1443 } else {
1444 format!("{path}.{k}")
1445 };
1446 let key_match = keys
1447 .iter()
1448 .any(|needle| k.eq_ignore_ascii_case(needle) || next.ends_with(needle));
1449 if key_match {
1450 if let Some(s) = value_to_compact_string(v) {
1451 let entry = format!("{next}={s}");
1452 if seen.insert(entry.clone()) {
1453 out.push(entry);
1454 }
1455 }
1456 }
1457 walk(v, next, keys, out, seen, limit);
1458 }
1459 }
1460 Value::Array(items) => {
1461 for (idx, item) in items.iter().enumerate() {
1462 if out.len() >= limit {
1463 break;
1464 }
1465 let next = format!("{path}[{idx}]");
1466 walk(item, next, keys, out, seen, limit);
1467 }
1468 }
1469 _ => {}
1470 }
1471 }
1472
1473 walk(value, String::new(), keys, &mut out, &mut seen, limit);
1474 out
1475}
1476
1477fn collect_all_string_like_fields(value: &Value, limit: usize) -> Vec<String> {
1478 let mut out = Vec::new();
1479 fn walk(value: &Value, path: String, out: &mut Vec<String>, limit: usize) {
1480 if out.len() >= limit {
1481 return;
1482 }
1483 match value {
1484 Value::Object(map) => {
1485 for (k, v) in map {
1486 if out.len() >= limit {
1487 break;
1488 }
1489 let next = if path.is_empty() {
1490 k.to_string()
1491 } else {
1492 format!("{path}.{k}")
1493 };
1494 walk(v, next, out, limit);
1495 }
1496 }
1497 Value::Array(items) => {
1498 for (idx, item) in items.iter().enumerate() {
1499 if out.len() >= limit {
1500 break;
1501 }
1502 walk(item, format!("{path}[{idx}]"), out, limit);
1503 }
1504 }
1505 _ => {
1506 if let Some(s) = value_to_compact_string(value) {
1507 out.push(format!("{path}={s}"));
1508 }
1509 }
1510 }
1511 }
1512 walk(value, String::new(), &mut out, limit);
1513 out
1514}
1515
1516fn value_to_compact_string(value: &Value) -> Option<String> {
1517 match value {
1518 Value::String(s) => {
1519 let trimmed = s.trim();
1520 if trimmed.is_empty() {
1521 None
1522 } else {
1523 Some(trimmed.to_string())
1524 }
1525 }
1526 Value::Number(n) => Some(n.to_string()),
1527 Value::Bool(b) => Some(b.to_string()),
1528 Value::Null => None,
1529 Value::Array(arr) => {
1530 if arr.is_empty() {
1531 None
1532 } else {
1533 Some(format!("<array:{}>", arr.len()))
1534 }
1535 }
1536 Value::Object(map) => {
1537 if map.is_empty() {
1538 None
1539 } else {
1540 Some(format!("<object:{}>", map.len()))
1541 }
1542 }
1543 }
1544}
1545
1546fn redact_sensitive_tokens(text: &str) -> String {
1547 text.split_whitespace()
1548 .map(redact_token)
1549 .collect::<Vec<_>>()
1550 .join(" ")
1551}
1552
1553fn redact_token(token: &str) -> String {
1554 let trimmed = token.trim_matches(|c: char| c == '"' || c == '\'' || c == ',' || c == ';');
1555 let lower = trimmed.to_ascii_lowercase();
1556 if trimmed.starts_with("/Users/")
1557 || trimmed.starts_with("C:\\Users\\")
1558 || trimmed.contains("/Users/")
1559 || trimmed.contains("C:\\Users\\")
1560 {
1561 return "[REDACTED_PATH]".to_string();
1562 }
1563 if trimmed.contains('@') && trimmed.contains('.') {
1564 return "[REDACTED_EMAIL]".to_string();
1565 }
1566 if lower.starts_with("sk-")
1567 || lower.contains("api_key")
1568 || lower.contains("access_token")
1569 || lower.contains("bearer")
1570 || lower.contains("authorization")
1571 {
1572 return "[REDACTED_SECRET]".to_string();
1573 }
1574 if looks_like_long_secret(trimmed) {
1575 return "[REDACTED_SECRET]".to_string();
1576 }
1577 token.to_string()
1578}
1579
1580fn looks_like_long_secret(token: &str) -> bool {
1581 if token.len() < 24 {
1582 return false;
1583 }
1584 token
1585 .chars()
1586 .all(|c| c.is_ascii_alphanumeric() || c == '_' || c == '-' || c == '.')
1587}
1588
1589struct FileLock {
1595 lock_path: PathBuf,
1596}
1597
1598impl FileLock {
1599 fn acquire(data_path: &Path) -> McpResult<Self> {
1602 let lock_path = data_path.with_extension("amem.lock");
1603 let stale_threshold = Duration::from_secs(60);
1604 let max_attempts = 200; for attempt in 0..max_attempts {
1607 match OpenOptions::new()
1608 .write(true)
1609 .create_new(true)
1610 .open(&lock_path)
1611 {
1612 Ok(_file) => {
1613 if attempt > 0 {
1614 tracing::debug!(
1615 "Acquired file lock after {} attempts: {}",
1616 attempt + 1,
1617 lock_path.display()
1618 );
1619 }
1620 return Ok(FileLock { lock_path });
1621 }
1622 Err(e) if e.kind() == std::io::ErrorKind::AlreadyExists => {
1623 if let Ok(meta) = std::fs::metadata(&lock_path) {
1625 let is_stale = meta
1626 .modified()
1627 .ok()
1628 .and_then(|m| m.elapsed().ok())
1629 .map(|age| age > stale_threshold)
1630 .unwrap_or(false);
1631 if is_stale {
1632 tracing::warn!("Removing stale lock file: {}", lock_path.display());
1633 let _ = std::fs::remove_file(&lock_path);
1634 continue;
1635 }
1636 }
1637 std::thread::sleep(Duration::from_millis(50));
1638 }
1639 Err(e) => {
1640 return Err(McpError::Io(e));
1641 }
1642 }
1643 }
1644
1645 Err(McpError::AgenticMemory(format!(
1646 "Timed out waiting for file lock: {}",
1647 lock_path.display()
1648 )))
1649 }
1650}
1651
1652impl Drop for FileLock {
1653 fn drop(&mut self) {
1654 let _ = std::fs::remove_file(&self.lock_path);
1655 }
1656}
1657
1658#[cfg(test)]
1659mod tests {
1660 use super::*;
1661 use serde_json::json;
1662
1663 #[test]
1664 fn budget_projection_available_with_timeline() {
1665 let dir = tempfile::tempdir().expect("tempdir");
1666 let brain = dir.path().join("projection.amem");
1667 let mut manager = SessionManager::open(brain.to_str().expect("path")).expect("open");
1668
1669 let (id_a, _) = manager
1670 .add_event(EventType::Fact, "old fact", 0.9, vec![])
1671 .expect("add fact");
1672 let (_id_b, _) = manager
1673 .add_event(EventType::Fact, "new fact", 0.9, vec![])
1674 .expect("add fact");
1675
1676 {
1677 let graph = manager.graph_mut();
1678 let old = graph.get_node_mut(id_a).expect("node");
1679 old.created_at = old.created_at.saturating_sub(15 * 24 * 3600 * 1_000_000);
1680 }
1681 manager.save().expect("save");
1682 let size = manager.current_file_size_bytes();
1683 let projected = manager.projected_file_size_bytes(size);
1684 assert!(size > 0);
1685 assert!(projected.is_some());
1686 }
1687
1688 #[test]
1689 fn budget_auto_rollup_archives_completed_session() {
1690 let dir = tempfile::tempdir().expect("tempdir");
1691 let brain = dir.path().join("rollup.amem");
1692 let mut manager = SessionManager::open(brain.to_str().expect("path")).expect("open");
1693
1694 let _ = manager
1696 .add_event(EventType::Fact, "alpha", 0.8, vec![])
1697 .expect("add");
1698 let _ = manager
1699 .add_event(EventType::Decision, "beta", 0.9, vec![])
1700 .expect("add");
1701 manager.start_session(None).expect("session");
1702 manager.save().expect("save");
1703
1704 manager.storage_budget_mode = StorageBudgetMode::AutoRollup;
1706 manager.storage_budget_max_bytes = 1;
1707 manager.storage_budget_target_fraction = 0.5;
1708
1709 manager
1710 .maybe_enforce_storage_budget()
1711 .expect("enforce budget");
1712
1713 let episode_count = manager
1714 .graph()
1715 .nodes()
1716 .iter()
1717 .filter(|n| n.event_type == EventType::Episode)
1718 .count();
1719 assert!(episode_count >= 1);
1720 assert!(manager.storage_budget_rollup_count >= 1);
1721 }
1722
1723 #[test]
1724 fn auto_capture_off_noop() {
1725 let dir = tempfile::tempdir().expect("tempdir");
1726 let brain = dir.path().join("capture-off.amem");
1727 let mut manager = SessionManager::open(brain.to_str().expect("path")).expect("open");
1728 manager.auto_capture_mode = AutoCaptureMode::Off;
1729
1730 let captured = manager
1731 .capture_prompt_request(
1732 "remember",
1733 Some(&json!({"information":"hello world","context":"ctx"})),
1734 )
1735 .expect("capture");
1736 assert!(captured.is_none());
1737 assert_eq!(manager.graph().node_count(), 0);
1738 }
1739
1740 #[test]
1741 fn auto_capture_full_records_and_redacts() {
1742 let dir = tempfile::tempdir().expect("tempdir");
1743 let brain = dir.path().join("capture-full.amem");
1744 let mut manager = SessionManager::open(brain.to_str().expect("path")).expect("open");
1745 manager.auto_capture_mode = AutoCaptureMode::Full;
1746 manager.auto_capture_redact = true;
1747
1748 manager
1749 .capture_tool_call(
1750 "memory_query",
1751 Some(&json!({
1752 "query":"Find anything about token sk-THISISALONGSECRET123456",
1753 "context":"/Users/omoshola/Documents/private.txt",
1754 "reason":"email me at test@example.com"
1755 })),
1756 )
1757 .expect("capture");
1758
1759 assert!(manager.graph().node_count() >= 1);
1760 let latest = manager
1761 .graph()
1762 .nodes()
1763 .iter()
1764 .max_by_key(|n| n.id)
1765 .expect("node");
1766 assert!(latest.content.contains("[auto-capture][tool]"));
1767 assert!(latest.content.contains("[REDACTED_SECRET]"));
1768 assert!(latest.content.contains("[REDACTED_PATH]"));
1769 assert!(latest.content.contains("[REDACTED_EMAIL]"));
1770 }
1771
1772 #[test]
1773 fn auto_capture_temporal_chain() {
1774 let dir = tempfile::tempdir().expect("tempdir");
1775 let brain = dir.path().join("chain.amem");
1776 let mut manager = SessionManager::open(brain.to_str().expect("path")).expect("open");
1777 manager.auto_capture_mode = AutoCaptureMode::Full;
1778
1779 let id1 = manager
1781 .capture_tool_call("memory_query", Some(&json!({"query": "first question"})))
1782 .expect("capture")
1783 .expect("node_id");
1784
1785 assert_eq!(manager.last_temporal_node_id(), Some(id1));
1786
1787 let id2 = manager
1789 .capture_tool_call(
1790 "memory_similar",
1791 Some(&json!({"query_text": "second question"})),
1792 )
1793 .expect("capture")
1794 .expect("node_id");
1795
1796 assert_eq!(manager.last_temporal_node_id(), Some(id2));
1797
1798 let has_temporal = manager.graph().edges().iter().any(|e| {
1800 e.source_id == id1 && e.target_id == id2 && e.edge_type == EdgeType::TemporalNext
1801 });
1802 assert!(has_temporal, "Expected TemporalNext edge from id1 to id2");
1803 }
1804
1805 #[test]
1806 fn temporal_chain_resets_on_new_session() {
1807 let dir = tempfile::tempdir().expect("tempdir");
1808 let brain = dir.path().join("reset.amem");
1809 let mut manager = SessionManager::open(brain.to_str().expect("path")).expect("open");
1810 manager.auto_capture_mode = AutoCaptureMode::Full;
1811
1812 let _id1 = manager
1813 .capture_tool_call("memory_query", Some(&json!({"query": "first"})))
1814 .expect("capture");
1815
1816 assert!(manager.last_temporal_node_id().is_some());
1817
1818 manager.start_session(None).expect("new session");
1820 assert!(manager.last_temporal_node_id().is_none());
1821 }
1822
1823 #[test]
1824 fn memory_add_joins_temporal_chain() {
1825 let dir = tempfile::tempdir().expect("tempdir");
1826 let brain = dir.path().join("splice.amem");
1827 let mut manager = SessionManager::open(brain.to_str().expect("path")).expect("open");
1828 manager.auto_capture_mode = AutoCaptureMode::Full;
1829
1830 let id1 = manager
1832 .capture_tool_call("memory_query", Some(&json!({"query": "something"})))
1833 .expect("capture")
1834 .expect("node_id");
1835
1836 let (id2, _) = manager
1838 .add_event(EventType::Fact, "User prefers dark mode", 0.9, vec![])
1839 .expect("add_event");
1840 manager.link_temporal(id1, id2).expect("link");
1841 manager.advance_temporal_chain(id2);
1842
1843 assert_eq!(manager.last_temporal_node_id(), Some(id2));
1844
1845 let has_edge = manager.graph().edges().iter().any(|e| {
1846 e.source_id == id1 && e.target_id == id2 && e.edge_type == EdgeType::TemporalNext
1847 });
1848 assert!(has_edge, "memory_add node should be linked into chain");
1849 }
1850}