Skip to main content

agentic_memory_mcp/session/
manager.rs

1//! Graph lifecycle management, file I/O, and session tracking.
2
3use std::collections::HashMap;
4use std::ffi::OsStr;
5use std::fs::OpenOptions;
6use std::io::Read as _;
7use std::path::{Path, PathBuf};
8use std::time::{Duration, Instant, SystemTime};
9
10use agentic_memory::{
11    AmemReader, AmemWriter, CognitiveEventBuilder, Edge, EdgeType, EventType, MemoryGraph,
12    QueryEngine, WriteEngine,
13};
14use serde_json::Value;
15
16use crate::types::{McpError, McpResult};
17
18/// Default auto-save interval.
19const DEFAULT_AUTO_SAVE_SECS: u64 = 30;
20/// Default backup interval.
21const DEFAULT_BACKUP_INTERVAL_SECS: u64 = 900;
22/// Default number of backups to retain per brain file.
23const DEFAULT_BACKUP_RETENTION: usize = 24;
24/// Default maintenance sleep-cycle interval.
25const DEFAULT_SLEEP_CYCLE_SECS: u64 = 1800;
26/// Minimum completed-session size before auto-archive.
27const DEFAULT_ARCHIVE_MIN_SESSION_NODES: usize = 25;
28/// Default hot-tier threshold (decay score).
29const DEFAULT_HOT_MIN_DECAY: f32 = 0.7;
30/// Default warm-tier threshold (decay score).
31const DEFAULT_WARM_MIN_DECAY: f32 = 0.3;
32/// Default sustained mutation rate threshold before throttling heavy maintenance.
33const DEFAULT_SLA_MAX_MUTATIONS_PER_MIN: u32 = 240;
34/// Default interval for writing health-ledger snapshots.
35const DEFAULT_HEALTH_LEDGER_EMIT_SECS: u64 = 30;
36/// Default long-horizon storage budget target (2 GiB over 20 years).
37const DEFAULT_STORAGE_BUDGET_BYTES: u64 = 2 * 1024 * 1024 * 1024;
38/// Default storage budget projection horizon.
39const DEFAULT_STORAGE_BUDGET_HORIZON_YEARS: u32 = 20;
40/// Default maximum chars persisted for one auto-captured prompt/feedback item.
41const DEFAULT_AUTO_CAPTURE_MAX_CHARS: usize = 2048;
42/// Current `.amem` storage version used by this server.
43const CURRENT_AMEM_VERSION: u32 = 1;
44
45#[derive(Debug, Clone, Copy)]
46enum AutonomicProfile {
47    Desktop,
48    Cloud,
49    Aggressive,
50}
51
52#[derive(Debug, Clone, Copy, PartialEq, Eq)]
53enum StorageMigrationPolicy {
54    AutoSafe,
55    Strict,
56    Off,
57}
58
59#[derive(Debug, Clone, Copy, PartialEq, Eq)]
60enum StorageBudgetMode {
61    AutoRollup,
62    Warn,
63    Off,
64}
65
66#[derive(Debug, Clone, Copy, PartialEq, Eq)]
67enum AutoCaptureMode {
68    /// Capture prompt-focused events and feedback context.
69    Safe,
70    /// Capture broader tool input text (except explicit memory_add payload duplication).
71    Full,
72    /// Disable automatic capture.
73    Off,
74}
75
76#[derive(Debug, Clone, Copy)]
77struct ProfileDefaults {
78    auto_save_secs: u64,
79    backup_secs: u64,
80    backup_retention: usize,
81    sleep_cycle_secs: u64,
82    sleep_idle_secs: u64,
83    archive_min_session_nodes: usize,
84    hot_min_decay: f32,
85    warm_min_decay: f32,
86    sla_max_mutations_per_min: u32,
87}
88
89impl AutonomicProfile {
90    fn from_env(name: &str) -> Self {
91        let raw = read_env_string(name).unwrap_or_else(|| "desktop".to_string());
92        match raw.trim().to_ascii_lowercase().as_str() {
93            "cloud" => Self::Cloud,
94            "aggressive" => Self::Aggressive,
95            _ => Self::Desktop,
96        }
97    }
98
99    fn defaults(self) -> ProfileDefaults {
100        match self {
101            Self::Desktop => ProfileDefaults {
102                auto_save_secs: DEFAULT_AUTO_SAVE_SECS,
103                backup_secs: DEFAULT_BACKUP_INTERVAL_SECS,
104                backup_retention: DEFAULT_BACKUP_RETENTION,
105                sleep_cycle_secs: DEFAULT_SLEEP_CYCLE_SECS,
106                sleep_idle_secs: 180,
107                archive_min_session_nodes: DEFAULT_ARCHIVE_MIN_SESSION_NODES,
108                hot_min_decay: DEFAULT_HOT_MIN_DECAY,
109                warm_min_decay: DEFAULT_WARM_MIN_DECAY,
110                sla_max_mutations_per_min: DEFAULT_SLA_MAX_MUTATIONS_PER_MIN,
111            },
112            Self::Cloud => ProfileDefaults {
113                auto_save_secs: 15,
114                backup_secs: 600,
115                backup_retention: 48,
116                sleep_cycle_secs: 900,
117                sleep_idle_secs: 90,
118                archive_min_session_nodes: 50,
119                hot_min_decay: 0.75,
120                warm_min_decay: 0.4,
121                sla_max_mutations_per_min: 600,
122            },
123            Self::Aggressive => ProfileDefaults {
124                auto_save_secs: 10,
125                backup_secs: 300,
126                backup_retention: 16,
127                sleep_cycle_secs: 300,
128                sleep_idle_secs: 45,
129                archive_min_session_nodes: 15,
130                hot_min_decay: 0.8,
131                warm_min_decay: 0.5,
132                sla_max_mutations_per_min: 900,
133            },
134        }
135    }
136
137    fn as_str(self) -> &'static str {
138        match self {
139            Self::Desktop => "desktop",
140            Self::Cloud => "cloud",
141            Self::Aggressive => "aggressive",
142        }
143    }
144}
145
146impl StorageMigrationPolicy {
147    fn from_env(name: &str) -> Self {
148        let raw = read_env_string(name).unwrap_or_else(|| "auto-safe".to_string());
149        match raw.trim().to_ascii_lowercase().as_str() {
150            "strict" => Self::Strict,
151            "off" | "disabled" | "none" => Self::Off,
152            _ => Self::AutoSafe,
153        }
154    }
155
156    fn as_str(self) -> &'static str {
157        match self {
158            Self::AutoSafe => "auto-safe",
159            Self::Strict => "strict",
160            Self::Off => "off",
161        }
162    }
163}
164
165impl StorageBudgetMode {
166    fn from_env(name: &str) -> Self {
167        let raw = read_env_string(name).unwrap_or_else(|| "auto-rollup".to_string());
168        match raw.trim().to_ascii_lowercase().as_str() {
169            "warn" => Self::Warn,
170            "off" | "disabled" | "none" => Self::Off,
171            _ => Self::AutoRollup,
172        }
173    }
174
175    fn as_str(self) -> &'static str {
176        match self {
177            Self::AutoRollup => "auto-rollup",
178            Self::Warn => "warn",
179            Self::Off => "off",
180        }
181    }
182}
183
184impl AutoCaptureMode {
185    fn from_env(name: &str) -> Self {
186        let raw = read_env_string(name).unwrap_or_else(|| "safe".to_string());
187        match raw.trim().to_ascii_lowercase().as_str() {
188            "full" => Self::Full,
189            "off" | "disabled" | "none" => Self::Off,
190            _ => Self::Safe,
191        }
192    }
193
194    fn as_str(self) -> &'static str {
195        match self {
196            Self::Safe => "safe",
197            Self::Full => "full",
198            Self::Off => "off",
199        }
200    }
201}
202
203/// Manages the memory graph lifecycle, file I/O, and session state.
204pub struct SessionManager {
205    graph: MemoryGraph,
206    query_engine: QueryEngine,
207    write_engine: WriteEngine,
208    file_path: PathBuf,
209    current_session: u32,
210    profile: AutonomicProfile,
211    migration_policy: StorageMigrationPolicy,
212    dirty: bool,
213    last_save: Instant,
214    auto_save_interval: Duration,
215    backup_interval: Duration,
216    backup_retention: usize,
217    backups_dir: PathBuf,
218    save_generation: u64,
219    last_backup_generation: u64,
220    last_backup: Instant,
221    sleep_cycle_interval: Duration,
222    archive_min_session_nodes: usize,
223    hot_min_decay: f32,
224    warm_min_decay: f32,
225    sla_max_mutations_per_min: u32,
226    last_sleep_cycle: Instant,
227    sleep_idle_min: Duration,
228    last_activity: Instant,
229    mutation_window_started: Instant,
230    mutation_window_count: u32,
231    maintenance_throttle_count: u64,
232    last_health_ledger_emit: Instant,
233    health_ledger_emit_interval: Duration,
234    storage_budget_mode: StorageBudgetMode,
235    storage_budget_max_bytes: u64,
236    storage_budget_horizon_years: u32,
237    storage_budget_target_fraction: f32,
238    storage_budget_rollup_count: u64,
239    auto_capture_mode: AutoCaptureMode,
240    auto_capture_redact: bool,
241    auto_capture_max_chars: usize,
242    auto_capture_count: u64,
243    /// ID of the last node added to the temporal chain in this session.
244    /// Used to create TemporalNext edges between consecutive captures.
245    last_temporal_node_id: Option<u64>,
246    /// Last known file modification time (for detecting external writes).
247    last_file_mtime: Option<SystemTime>,
248}
249
250impl SessionManager {
251    /// Open or create a memory file at the given path.
252    pub fn open(path: &str) -> McpResult<Self> {
253        let file_path = PathBuf::from(path);
254        let dimension = agentic_memory::DEFAULT_DIMENSION;
255        let file_existed = file_path.exists();
256        let profile = AutonomicProfile::from_env("AMEM_AUTONOMIC_PROFILE");
257        let defaults = profile.defaults();
258        let migration_policy = StorageMigrationPolicy::from_env("AMEM_STORAGE_MIGRATION_POLICY");
259        let detected_version = if file_existed {
260            read_storage_version(&file_path)
261        } else {
262            None
263        };
264        let legacy_version = detected_version.filter(|v| *v < CURRENT_AMEM_VERSION);
265
266        let graph = if file_existed {
267            tracing::info!("Opening existing memory file: {}", file_path.display());
268            AmemReader::read_from_file(&file_path)
269                .map_err(|e| McpError::AgenticMemory(format!("Failed to read memory file: {e}")))?
270        } else {
271            tracing::info!("Creating new memory file: {}", file_path.display());
272            // Ensure parent directory exists
273            if let Some(parent) = file_path.parent() {
274                std::fs::create_dir_all(parent).map_err(|e| {
275                    McpError::Io(std::io::Error::other(format!(
276                        "Failed to create directory {}: {e}",
277                        parent.display()
278                    )))
279                })?;
280            }
281            MemoryGraph::new(dimension)
282        };
283
284        // Determine the next session ID from existing sessions.
285        // Incorporate PID to avoid collisions when multiple MCP instances share
286        // the same .amem file (e.g. two Claude Code windows on different projects).
287        let session_ids = graph.session_index().session_ids();
288        let max_existing = session_ids.iter().copied().max().unwrap_or(0);
289        let pid_component = (std::process::id() % 1000) as u32;
290        let current_session = max_existing.saturating_add(1).saturating_add(pid_component);
291
292        tracing::info!(
293            "Session {} started. Graph has {} nodes, {} edges.",
294            current_session,
295            graph.node_count(),
296            graph.edge_count()
297        );
298        tracing::info!(
299            "Autonomic profile={} migration_policy={}",
300            profile.as_str(),
301            migration_policy.as_str()
302        );
303
304        let auto_save_secs = read_env_u64("AMEM_AUTOSAVE_SECS", defaults.auto_save_secs);
305        let backup_secs = read_env_u64("AMEM_AUTO_BACKUP_SECS", defaults.backup_secs).max(30);
306        let backup_retention =
307            read_env_usize("AMEM_AUTO_BACKUP_RETENTION", defaults.backup_retention).max(1);
308        let backups_dir = resolve_backups_dir(&file_path);
309        let sleep_cycle_secs =
310            read_env_u64("AMEM_SLEEP_CYCLE_SECS", defaults.sleep_cycle_secs).max(60);
311        let sleep_idle_secs =
312            read_env_u64("AMEM_SLEEP_IDLE_SECS", defaults.sleep_idle_secs).max(30);
313        let archive_min_session_nodes = read_env_usize(
314            "AMEM_ARCHIVE_MIN_SESSION_NODES",
315            defaults.archive_min_session_nodes,
316        )
317        .max(1);
318        let hot_min_decay =
319            read_env_f32("AMEM_TIER_HOT_MIN_DECAY", defaults.hot_min_decay).clamp(0.0, 1.0);
320        let warm_min_decay = read_env_f32("AMEM_TIER_WARM_MIN_DECAY", defaults.warm_min_decay)
321            .clamp(0.0, 1.0)
322            .min(hot_min_decay);
323        let sla_max_mutations_per_min = read_env_u32(
324            "AMEM_SLA_MAX_MUTATIONS_PER_MIN",
325            defaults.sla_max_mutations_per_min,
326        )
327        .max(1);
328        let health_ledger_emit_interval = Duration::from_secs(
329            read_env_u64(
330                "AMEM_HEALTH_LEDGER_EMIT_SECS",
331                DEFAULT_HEALTH_LEDGER_EMIT_SECS,
332            )
333            .max(5),
334        );
335        let storage_budget_mode = StorageBudgetMode::from_env("AMEM_STORAGE_BUDGET_MODE");
336        let storage_budget_max_bytes =
337            read_env_u64("AMEM_STORAGE_BUDGET_BYTES", DEFAULT_STORAGE_BUDGET_BYTES).max(1);
338        let storage_budget_horizon_years = read_env_u32(
339            "AMEM_STORAGE_BUDGET_HORIZON_YEARS",
340            DEFAULT_STORAGE_BUDGET_HORIZON_YEARS,
341        )
342        .max(1);
343        let storage_budget_target_fraction =
344            read_env_f32("AMEM_STORAGE_BUDGET_TARGET_FRACTION", 0.85).clamp(0.50, 0.99);
345        let auto_capture_mode = AutoCaptureMode::from_env("AMEM_AUTO_CAPTURE_MODE");
346        let auto_capture_redact = read_env_bool("AMEM_AUTO_CAPTURE_REDACT", true);
347        let auto_capture_max_chars = read_env_usize(
348            "AMEM_AUTO_CAPTURE_MAX_CHARS",
349            DEFAULT_AUTO_CAPTURE_MAX_CHARS,
350        )
351        .clamp(256, 16384);
352
353        let mut manager = Self {
354            graph,
355            query_engine: QueryEngine::new(),
356            write_engine: WriteEngine::new(dimension),
357            file_path,
358            current_session,
359            profile,
360            migration_policy,
361            dirty: false,
362            last_save: Instant::now(),
363            auto_save_interval: Duration::from_secs(auto_save_secs),
364            backup_interval: Duration::from_secs(backup_secs),
365            backup_retention,
366            backups_dir,
367            save_generation: if file_existed { 1 } else { 0 },
368            last_backup_generation: 0,
369            last_backup: Instant::now(),
370            sleep_cycle_interval: Duration::from_secs(sleep_cycle_secs),
371            archive_min_session_nodes,
372            hot_min_decay,
373            warm_min_decay,
374            sla_max_mutations_per_min,
375            last_sleep_cycle: Instant::now(),
376            sleep_idle_min: Duration::from_secs(sleep_idle_secs),
377            last_activity: Instant::now(),
378            mutation_window_started: Instant::now(),
379            mutation_window_count: 0,
380            maintenance_throttle_count: 0,
381            last_health_ledger_emit: Instant::now()
382                .checked_sub(health_ledger_emit_interval)
383                .unwrap_or_else(Instant::now),
384            health_ledger_emit_interval,
385            storage_budget_mode,
386            storage_budget_max_bytes,
387            storage_budget_horizon_years,
388            storage_budget_target_fraction,
389            storage_budget_rollup_count: 0,
390            auto_capture_mode,
391            auto_capture_redact,
392            auto_capture_max_chars,
393            auto_capture_count: 0,
394            last_temporal_node_id: None,
395            last_file_mtime: if file_existed {
396                std::fs::metadata(path).and_then(|m| m.modified()).ok()
397            } else {
398                None
399            },
400        };
401
402        if let Some(version) = legacy_version {
403            match migration_policy {
404                StorageMigrationPolicy::Strict => {
405                    return Err(McpError::AgenticMemory(format!(
406                        "Legacy .amem version {} blocked by strict migration policy",
407                        version
408                    )));
409                }
410                StorageMigrationPolicy::Off => {
411                    tracing::warn!(
412                        "Legacy storage version detected (v{}), auto-migration disabled by policy",
413                        version
414                    );
415                }
416                StorageMigrationPolicy::AutoSafe => {
417                    if let Some(checkpoint) = manager.create_migration_checkpoint(version)? {
418                        tracing::info!(
419                            "Legacy storage version detected (v{}), checkpoint created at {}",
420                            version,
421                            checkpoint.display()
422                        );
423                    }
424                    manager.dirty = true;
425                    manager.save()?;
426                    tracing::info!(
427                        "Auto-migrated memory storage from v{} to v{} at {}",
428                        version,
429                        CURRENT_AMEM_VERSION,
430                        manager.file_path.display()
431                    );
432                }
433            }
434        }
435
436        Ok(manager)
437    }
438
439    /// Get an immutable reference to the graph.
440    pub fn graph(&self) -> &MemoryGraph {
441        &self.graph
442    }
443
444    /// Get a mutable reference to the graph and mark as dirty.
445    pub fn graph_mut(&mut self) -> &mut MemoryGraph {
446        self.dirty = true;
447        self.last_activity = Instant::now();
448        self.record_mutation();
449        &mut self.graph
450    }
451
452    /// Get the query engine.
453    pub fn query_engine(&self) -> &QueryEngine {
454        &self.query_engine
455    }
456
457    /// Get the write engine.
458    pub fn write_engine(&self) -> &WriteEngine {
459        &self.write_engine
460    }
461
462    /// Current session ID.
463    pub fn current_session_id(&self) -> u32 {
464        self.current_session
465    }
466
467    /// Start a new session, optionally with an explicit ID.
468    pub fn start_session(&mut self, explicit_id: Option<u32>) -> McpResult<u32> {
469        let session_id = explicit_id.unwrap_or_else(|| {
470            let ids = self.graph.session_index().session_ids();
471            let max_indexed = ids.iter().copied().max().unwrap_or(0);
472            // Ensure monotonic: new session must be > current session.
473            max_indexed.max(self.current_session).saturating_add(1)
474        });
475
476        self.current_session = session_id;
477        self.last_temporal_node_id = None;
478        self.last_activity = Instant::now();
479        tracing::info!("Started session {session_id}");
480        Ok(session_id)
481    }
482
483    /// End a session and optionally create an episode summary.
484    pub fn end_session_with_episode(&mut self, session_id: u32, summary: &str) -> McpResult<u64> {
485        let episode_id = self
486            .write_engine
487            .compress_session(&mut self.graph, session_id, summary)
488            .map_err(|e| McpError::AgenticMemory(format!("Failed to compress session: {e}")))?;
489
490        self.dirty = true;
491        self.last_activity = Instant::now();
492        self.record_mutation();
493        self.save()?;
494
495        tracing::info!("Ended session {session_id}, created episode node {episode_id}");
496
497        Ok(episode_id)
498    }
499
500    /// Save the graph to file with file-locking for concurrent session safety.
501    ///
502    /// When multiple MCP instances share the same `.amem` file, this method:
503    /// 1. Acquires an exclusive file lock (sidecar `.amem.lock`)
504    /// 2. Checks if the file was modified externally (by another instance)
505    /// 3. If so, re-reads the disk graph and merges our session's new nodes
506    /// 4. Writes the merged graph and releases the lock
507    pub fn save(&mut self) -> McpResult<()> {
508        if !self.dirty {
509            return Ok(());
510        }
511
512        let _lock = FileLock::acquire(&self.file_path)?;
513
514        // Detect external modifications from concurrent sessions.
515        if self.file_path.exists() {
516            let current_mtime = std::fs::metadata(&self.file_path)
517                .and_then(|m| m.modified())
518                .ok();
519            if let (Some(current), Some(last_known)) = (current_mtime, self.last_file_mtime) {
520                if current > last_known {
521                    tracing::info!("Detected external modification, merging with disk state");
522                    self.merge_with_disk()?;
523                }
524            }
525        }
526
527        let writer = AmemWriter::new(self.graph.dimension());
528        writer
529            .write_to_file(&self.graph, &self.file_path)
530            .map_err(|e| McpError::AgenticMemory(format!("Failed to write memory file: {e}")))?;
531
532        // Update our mtime tracking after successful write.
533        self.last_file_mtime = std::fs::metadata(&self.file_path)
534            .and_then(|m| m.modified())
535            .ok();
536
537        self.dirty = false;
538        self.last_save = Instant::now();
539        self.save_generation = self.save_generation.saturating_add(1);
540        tracing::debug!("Saved memory file: {}", self.file_path.display());
541        Ok(())
542    }
543
544    /// Merge our session's nodes/edges with the latest disk state.
545    ///
546    /// This handles the case where another MCP instance wrote to the same file
547    /// since we last read it. We re-read the disk, then re-add our session's
548    /// nodes on top of the latest state.
549    fn merge_with_disk(&mut self) -> McpResult<()> {
550        let disk_graph = AmemReader::read_from_file(&self.file_path)
551            .map_err(|e| McpError::AgenticMemory(format!("Failed to re-read for merge: {e}")))?;
552
553        // Collect our session's nodes (those we created in this process).
554        let our_nodes: Vec<_> = self
555            .graph
556            .nodes()
557            .iter()
558            .filter(|n| n.session_id == self.current_session)
559            .cloned()
560            .collect();
561
562        // Collect edges where source belongs to our session.
563        let our_node_ids: std::collections::HashSet<u64> = our_nodes.iter().map(|n| n.id).collect();
564        let our_edges: Vec<_> = self
565            .graph
566            .edges()
567            .iter()
568            .filter(|e| our_node_ids.contains(&e.source_id) || our_node_ids.contains(&e.target_id))
569            .cloned()
570            .collect();
571
572        // Replace our graph with the latest disk state.
573        self.graph = disk_graph;
574
575        // Re-add our session's nodes with fresh IDs from the merged graph.
576        let mut id_map: HashMap<u64, u64> = HashMap::new();
577        for node in &our_nodes {
578            let event = CognitiveEventBuilder::new(node.event_type, node.content.clone())
579                .session_id(self.current_session)
580                .confidence(node.confidence)
581                .build();
582            let result = self
583                .write_engine
584                .ingest(&mut self.graph, vec![event], vec![])
585                .map_err(|e| McpError::AgenticMemory(format!("Merge node re-add failed: {e}")))?;
586            if let Some(&new_id) = result.new_node_ids.first() {
587                id_map.insert(node.id, new_id);
588            }
589        }
590
591        // Re-add our session's edges with remapped IDs.
592        for edge in &our_edges {
593            let source = id_map
594                .get(&edge.source_id)
595                .copied()
596                .unwrap_or(edge.source_id);
597            let target = id_map
598                .get(&edge.target_id)
599                .copied()
600                .unwrap_or(edge.target_id);
601            let new_edge = Edge::new(source, target, edge.edge_type, edge.weight);
602            if let Err(e) = self.graph.add_edge(new_edge) {
603                tracing::warn!("Merge edge re-add skipped: {e}");
604            }
605        }
606
607        tracing::info!(
608            "Merged {} nodes and {} edges from session {} into disk state",
609            our_nodes.len(),
610            our_edges.len(),
611            self.current_session
612        );
613        Ok(())
614    }
615
616    /// Check if auto-save is needed and save if so.
617    pub fn maybe_auto_save(&mut self) -> McpResult<()> {
618        if self.dirty && self.last_save.elapsed() >= self.auto_save_interval {
619            self.save()?;
620        }
621        Ok(())
622    }
623
624    /// Runs autonomous maintenance: sleep-cycle, auto-save, and periodic backup.
625    pub fn run_maintenance_tick(&mut self) -> McpResult<()> {
626        if self.should_throttle_maintenance() {
627            self.maintenance_throttle_count = self.maintenance_throttle_count.saturating_add(1);
628            self.maybe_auto_save()?;
629            self.emit_health_ledger("throttled")?;
630            tracing::debug!(
631                "Maintenance throttled by SLA guard: mutation_rate={} threshold={}",
632                self.mutation_rate_per_min(),
633                self.sla_max_mutations_per_min
634            );
635            return Ok(());
636        }
637
638        self.maybe_run_sleep_cycle()?;
639        self.maybe_auto_save()?;
640        self.maybe_enforce_storage_budget()?;
641        self.maybe_auto_backup()?;
642        self.emit_health_ledger("normal")?;
643        Ok(())
644    }
645
646    /// Run a periodic sleep-cycle: decay refresh + tier balancing + auto-archive.
647    pub fn maybe_run_sleep_cycle(&mut self) -> McpResult<()> {
648        if self.last_sleep_cycle.elapsed() < self.sleep_cycle_interval {
649            return Ok(());
650        }
651        if self.last_activity.elapsed() < self.sleep_idle_min {
652            return Ok(());
653        }
654
655        let now = agentic_memory::now_micros();
656        let decay_report = self
657            .write_engine
658            .run_decay(&mut self.graph, now)
659            .map_err(|e| McpError::AgenticMemory(format!("Sleep-cycle decay failed: {e}")))?;
660        let archived_sessions = self.auto_archive_completed_sessions()?;
661
662        if decay_report.nodes_decayed > 0 || archived_sessions > 0 {
663            self.dirty = true;
664            self.save()?;
665        }
666
667        let (hot, warm, cold) = self.tier_counts();
668        self.last_sleep_cycle = Instant::now();
669        tracing::info!(
670            "Sleep-cycle complete: decayed={} archived_sessions={} tiers(h/w/c)={}/{}/{}",
671            decay_report.nodes_decayed,
672            archived_sessions,
673            hot,
674            warm,
675            cold
676        );
677        Ok(())
678    }
679
680    /// Periodic backup of persisted state with retention pruning.
681    pub fn maybe_auto_backup(&mut self) -> McpResult<()> {
682        if self.last_backup.elapsed() < self.backup_interval {
683            return Ok(());
684        }
685        if self.save_generation <= self.last_backup_generation {
686            return Ok(());
687        }
688        if !self.file_path.exists() {
689            return Ok(());
690        }
691
692        std::fs::create_dir_all(&self.backups_dir).map_err(McpError::Io)?;
693        let backup_path = self.next_backup_path();
694        std::fs::copy(&self.file_path, &backup_path).map_err(McpError::Io)?;
695        self.last_backup_generation = self.save_generation;
696        self.last_backup = Instant::now();
697        self.prune_old_backups()?;
698        tracing::info!("Auto-backup written: {}", backup_path.display());
699        Ok(())
700    }
701
702    /// Mark the graph as dirty (needs saving).
703    pub fn mark_dirty(&mut self) {
704        self.dirty = true;
705        self.last_activity = Instant::now();
706        self.record_mutation();
707    }
708
709    /// Get the file path.
710    pub fn file_path(&self) -> &PathBuf {
711        &self.file_path
712    }
713
714    /// The ID of the most recent node in the temporal chain for this session.
715    pub fn last_temporal_node_id(&self) -> Option<u64> {
716        self.last_temporal_node_id
717    }
718
719    /// Advance the temporal chain pointer to the given node ID.
720    pub fn advance_temporal_chain(&mut self, node_id: u64) {
721        self.last_temporal_node_id = Some(node_id);
722    }
723
724    /// Create a TemporalNext edge from `prev_id` to `next_id` (forward in time).
725    pub fn link_temporal(&mut self, prev_id: u64, next_id: u64) -> McpResult<()> {
726        let edge = Edge::new(prev_id, next_id, EdgeType::TemporalNext, 1.0);
727        self.graph
728            .add_edge(edge)
729            .map_err(|e| McpError::AgenticMemory(format!("Failed to add temporal edge: {e}")))?;
730        self.dirty = true;
731        Ok(())
732    }
733
734    /// Background maintenance loop interval.
735    pub fn maintenance_interval(&self) -> Duration {
736        self.auto_save_interval
737            .min(self.backup_interval)
738            .min(self.sleep_cycle_interval)
739    }
740
741    /// Capture a prompt template invocation (`prompts/get`) into memory.
742    pub fn capture_prompt_request(
743        &mut self,
744        prompt_name: &str,
745        arguments: Option<&Value>,
746    ) -> McpResult<Option<u64>> {
747        if self.auto_capture_mode == AutoCaptureMode::Off {
748            return Ok(None);
749        }
750        match extract_prompt_capture_text(prompt_name, arguments)? {
751            Some(text) => self.persist_auto_capture(EventType::Fact, &text, 0.90),
752            None => Ok(None),
753        }
754    }
755
756    /// Capture a tool call input context into memory based on capture mode.
757    pub fn capture_tool_call(
758        &mut self,
759        tool_name: &str,
760        arguments: Option<&Value>,
761    ) -> McpResult<Option<u64>> {
762        if self.auto_capture_mode == AutoCaptureMode::Off {
763            return Ok(None);
764        }
765
766        let text = match self.auto_capture_mode {
767            AutoCaptureMode::Safe => extract_safe_tool_capture_text(tool_name, arguments)?,
768            AutoCaptureMode::Full => extract_full_tool_capture_text(tool_name, arguments)?,
769            AutoCaptureMode::Off => None,
770        };
771        match text {
772            Some(v) => self.persist_auto_capture(EventType::Inference, &v, 0.82),
773            None => Ok(None),
774        }
775    }
776
777    /// Add a cognitive event to the graph.
778    pub fn add_event(
779        &mut self,
780        event_type: EventType,
781        content: &str,
782        confidence: f32,
783        edges: Vec<(u64, EdgeType, f32)>,
784    ) -> McpResult<(u64, usize)> {
785        let event = CognitiveEventBuilder::new(event_type, content.to_string())
786            .session_id(self.current_session)
787            .confidence(confidence)
788            .build();
789
790        // First, add the node to get its assigned ID
791        let result = self
792            .write_engine
793            .ingest(&mut self.graph, vec![event], vec![])
794            .map_err(|e| McpError::AgenticMemory(format!("Failed to add event: {e}")))?;
795
796        let node_id = result.new_node_ids.first().copied().ok_or_else(|| {
797            McpError::InternalError("No node ID returned from ingest".to_string())
798        })?;
799
800        // Then add edges with the correct source_id
801        let mut edge_count = 0;
802        for (target_id, edge_type, weight) in &edges {
803            let edge = Edge::new(node_id, *target_id, *edge_type, *weight);
804            self.graph
805                .add_edge(edge)
806                .map_err(|e| McpError::AgenticMemory(format!("Failed to add edge: {e}")))?;
807            edge_count += 1;
808        }
809
810        self.dirty = true;
811        self.last_activity = Instant::now();
812        self.record_mutation();
813        self.maybe_auto_save()?;
814
815        Ok((node_id, edge_count))
816    }
817
818    /// Correct a previous belief.
819    pub fn correct_node(&mut self, old_node_id: u64, new_content: &str) -> McpResult<u64> {
820        let new_id = self
821            .write_engine
822            .correct(
823                &mut self.graph,
824                old_node_id,
825                new_content,
826                self.current_session,
827            )
828            .map_err(|e| McpError::AgenticMemory(format!("Failed to correct node: {e}")))?;
829
830        self.dirty = true;
831        self.last_activity = Instant::now();
832        self.record_mutation();
833        self.maybe_auto_save()?;
834
835        Ok(new_id)
836    }
837
838    fn record_mutation(&mut self) {
839        if self.mutation_window_started.elapsed() >= Duration::from_secs(60) {
840            self.mutation_window_started = Instant::now();
841            self.mutation_window_count = 0;
842        }
843        self.mutation_window_count = self.mutation_window_count.saturating_add(1);
844    }
845
846    fn mutation_rate_per_min(&self) -> u32 {
847        let elapsed = self.mutation_window_started.elapsed().as_secs().max(1);
848        let scaled = (self.mutation_window_count as u64)
849            .saturating_mul(60)
850            .saturating_div(elapsed);
851        scaled.min(u32::MAX as u64) as u32
852    }
853
854    fn should_throttle_maintenance(&self) -> bool {
855        self.mutation_rate_per_min() > self.sla_max_mutations_per_min
856    }
857
858    fn emit_health_ledger(&mut self, maintenance_mode: &str) -> McpResult<()> {
859        if self.last_health_ledger_emit.elapsed() < self.health_ledger_emit_interval {
860            return Ok(());
861        }
862
863        let dir = resolve_health_ledger_dir();
864        std::fs::create_dir_all(&dir).map_err(McpError::Io)?;
865        let path = dir.join("agentic-memory.json");
866        let tmp = dir.join("agentic-memory.json.tmp");
867        let (hot, warm, cold) = self.tier_counts();
868        let current_size_bytes = self.current_file_size_bytes();
869        let projected_size_bytes = self.projected_file_size_bytes(current_size_bytes);
870        let over_budget = current_size_bytes > self.storage_budget_max_bytes
871            || projected_size_bytes
872                .map(|v| v > self.storage_budget_max_bytes)
873                .unwrap_or(false);
874        let payload = serde_json::json!({
875            "project": "AgenticMemory",
876            "timestamp": chrono::Utc::now().to_rfc3339(),
877            "status": "ok",
878            "autonomic": {
879                "profile": self.profile.as_str(),
880                "migration_policy": self.migration_policy.as_str(),
881                "maintenance_mode": maintenance_mode,
882                "throttle_count": self.maintenance_throttle_count,
883            },
884            "sla": {
885                "mutation_rate_per_min": self.mutation_rate_per_min(),
886                "max_mutations_per_min": self.sla_max_mutations_per_min
887            },
888            "storage": {
889                "file": self.file_path.display().to_string(),
890                "dirty": self.dirty,
891                "save_generation": self.save_generation,
892                "backup_retention": self.backup_retention,
893            },
894            "storage_budget": {
895                "mode": self.storage_budget_mode.as_str(),
896                "max_bytes": self.storage_budget_max_bytes,
897                "horizon_years": self.storage_budget_horizon_years,
898                "target_fraction": self.storage_budget_target_fraction,
899                "current_size_bytes": current_size_bytes,
900                "projected_size_bytes": projected_size_bytes,
901                "over_budget": over_budget,
902                "rollup_count": self.storage_budget_rollup_count,
903            },
904            "auto_capture": {
905                "mode": self.auto_capture_mode.as_str(),
906                "redact": self.auto_capture_redact,
907                "max_chars": self.auto_capture_max_chars,
908                "captured_count": self.auto_capture_count
909            },
910            "graph": {
911                "nodes": self.graph.node_count(),
912                "edges": self.graph.edge_count(),
913                "tiers": {
914                    "hot": hot,
915                    "warm": warm,
916                    "cold": cold,
917                },
918            },
919        });
920        let bytes = serde_json::to_vec_pretty(&payload).map_err(|e| {
921            McpError::AgenticMemory(format!("Failed to encode health ledger payload: {e}"))
922        })?;
923        std::fs::write(&tmp, bytes).map_err(McpError::Io)?;
924        std::fs::rename(&tmp, &path).map_err(McpError::Io)?;
925        self.last_health_ledger_emit = Instant::now();
926        Ok(())
927    }
928}
929
930impl Drop for SessionManager {
931    fn drop(&mut self) {
932        if self.dirty {
933            if let Err(e) = self.save() {
934                tracing::error!("Failed to save on drop: {e}");
935            }
936        }
937        if let Err(e) = self.maybe_auto_backup() {
938            tracing::error!("Failed auto-backup on drop: {e}");
939        }
940    }
941}
942
943impl SessionManager {
944    fn auto_archive_completed_sessions(&mut self) -> McpResult<usize> {
945        self.auto_archive_completed_sessions_with_min(self.archive_min_session_nodes)
946    }
947
948    fn auto_archive_completed_sessions_with_min(
949        &mut self,
950        min_session_nodes: usize,
951    ) -> McpResult<usize> {
952        let mut session_ids = self.graph.session_index().session_ids();
953        session_ids.sort_unstable();
954
955        let mut archived = 0usize;
956        for session_id in session_ids {
957            if session_id >= self.current_session {
958                continue;
959            }
960
961            let node_ids = self.graph.session_index().get_session(session_id).to_vec();
962            if node_ids.is_empty() {
963                continue;
964            }
965
966            let mut has_episode = false;
967            let mut event_nodes = 0usize;
968            let mut hot = 0usize;
969            let mut warm = 0usize;
970            let mut cold = 0usize;
971
972            for node_id in &node_ids {
973                if let Some(node) = self.graph.get_node(*node_id) {
974                    if node.event_type == EventType::Episode {
975                        has_episode = true;
976                        continue;
977                    }
978                    event_nodes += 1;
979                    if node.decay_score >= self.hot_min_decay {
980                        hot += 1;
981                    } else if node.decay_score >= self.warm_min_decay {
982                        warm += 1;
983                    } else {
984                        cold += 1;
985                    }
986                }
987            }
988
989            if has_episode || event_nodes < min_session_nodes {
990                continue;
991            }
992
993            let summary = format!(
994                "Auto-archive session {}: {} events ({} hot / {} warm / {} cold)",
995                session_id, event_nodes, hot, warm, cold
996            );
997            self.write_engine
998                .compress_session(&mut self.graph, session_id, &summary)
999                .map_err(|e| {
1000                    McpError::AgenticMemory(format!(
1001                        "Auto-archive failed for session {session_id}: {e}"
1002                    ))
1003                })?;
1004            archived = archived.saturating_add(1);
1005        }
1006
1007        Ok(archived)
1008    }
1009
1010    fn maybe_enforce_storage_budget(&mut self) -> McpResult<()> {
1011        if self.storage_budget_mode == StorageBudgetMode::Off {
1012            return Ok(());
1013        }
1014
1015        let current_size = self.current_file_size_bytes();
1016        if current_size == 0 {
1017            return Ok(());
1018        }
1019        let projected = self.projected_file_size_bytes(current_size);
1020        let over_current = current_size > self.storage_budget_max_bytes;
1021        let over_projected = projected
1022            .map(|v| v > self.storage_budget_max_bytes)
1023            .unwrap_or(false);
1024
1025        if !over_current && !over_projected {
1026            return Ok(());
1027        }
1028
1029        if self.storage_budget_mode == StorageBudgetMode::Warn {
1030            tracing::warn!(
1031                "Storage budget warning: current={} projected={:?} budget={} (mode=warn)",
1032                current_size,
1033                projected,
1034                self.storage_budget_max_bytes
1035            );
1036            return Ok(());
1037        }
1038
1039        let target_bytes = ((self.storage_budget_max_bytes as f64
1040            * self.storage_budget_target_fraction as f64)
1041            .round() as u64)
1042            .max(1);
1043        let mut rollup_count = 0usize;
1044        let mut threshold = self.archive_min_session_nodes.saturating_div(2).max(1);
1045
1046        for _ in 0..3 {
1047            let archived = self.auto_archive_completed_sessions_with_min(threshold)?;
1048            if archived == 0 {
1049                if threshold > 1 {
1050                    threshold = 1;
1051                    continue;
1052                }
1053                break;
1054            }
1055            rollup_count += archived;
1056            self.dirty = true;
1057            self.save()?;
1058            let new_size = self.current_file_size_bytes();
1059            if new_size <= target_bytes {
1060                break;
1061            }
1062            threshold = 1;
1063        }
1064
1065        if rollup_count > 0 {
1066            self.storage_budget_rollup_count = self
1067                .storage_budget_rollup_count
1068                .saturating_add(rollup_count as u64);
1069            tracing::info!(
1070                "Storage budget rollup applied: archived_sessions={} budget={} target={} current={}",
1071                rollup_count,
1072                self.storage_budget_max_bytes,
1073                target_bytes,
1074                self.current_file_size_bytes()
1075            );
1076        } else {
1077            tracing::warn!(
1078                "Storage budget exceeded but no completed sessions eligible for rollup (current={} projected={:?} budget={})",
1079                current_size,
1080                projected,
1081                self.storage_budget_max_bytes
1082            );
1083        }
1084
1085        Ok(())
1086    }
1087
1088    fn current_file_size_bytes(&self) -> u64 {
1089        std::fs::metadata(&self.file_path)
1090            .map(|m| m.len())
1091            .unwrap_or(0)
1092    }
1093
1094    fn persist_auto_capture(
1095        &mut self,
1096        event_type: EventType,
1097        raw_text: &str,
1098        confidence: f32,
1099    ) -> McpResult<Option<u64>> {
1100        let mut text = raw_text.trim().to_string();
1101        if text.is_empty() {
1102            return Ok(None);
1103        }
1104
1105        if self.auto_capture_redact {
1106            text = redact_sensitive_tokens(&text);
1107        }
1108
1109        if text.len() > self.auto_capture_max_chars {
1110            text.truncate(self.auto_capture_max_chars);
1111            text.push_str(" …[truncated]");
1112        }
1113
1114        let prev_id = self.last_temporal_node_id;
1115        let (node_id, _) = self.add_event(event_type, &text, confidence, vec![])?;
1116
1117        // Chain this capture to the previous node in the session's temporal thread.
1118        if let Some(prev) = prev_id {
1119            if let Err(e) = self.link_temporal(prev, node_id) {
1120                tracing::warn!("Failed to link temporal chain: {e}");
1121            }
1122        }
1123        self.last_temporal_node_id = Some(node_id);
1124
1125        self.auto_capture_count = self.auto_capture_count.saturating_add(1);
1126        Ok(Some(node_id))
1127    }
1128
1129    fn projected_file_size_bytes(&self, current_size: u64) -> Option<u64> {
1130        if current_size == 0 || self.graph.node_count() < 2 {
1131            return None;
1132        }
1133        let mut min_ts = u64::MAX;
1134        let mut max_ts = 0u64;
1135        for node in self.graph.nodes() {
1136            min_ts = min_ts.min(node.created_at);
1137            max_ts = max_ts.max(node.created_at);
1138        }
1139        if min_ts == u64::MAX || max_ts <= min_ts {
1140            return None;
1141        }
1142
1143        let span_secs_raw = (max_ts - min_ts) / 1_000_000;
1144        // Clamp to at least one week to avoid unstable extrapolation on tiny windows.
1145        let span_secs = span_secs_raw.max(7 * 24 * 3600) as f64;
1146        let per_sec = current_size as f64 / span_secs;
1147        let horizon_secs = (self.storage_budget_horizon_years as f64) * 365.25 * 24.0 * 3600.0;
1148        let projected = (per_sec * horizon_secs).round();
1149        Some(projected.max(0.0).min(u64::MAX as f64) as u64)
1150    }
1151
1152    fn tier_counts(&self) -> (usize, usize, usize) {
1153        let mut hot = 0usize;
1154        let mut warm = 0usize;
1155        let mut cold = 0usize;
1156
1157        for node in self.graph.nodes() {
1158            if node.event_type == EventType::Episode {
1159                continue;
1160            }
1161            if node.decay_score >= self.hot_min_decay {
1162                hot += 1;
1163            } else if node.decay_score >= self.warm_min_decay {
1164                warm += 1;
1165            } else {
1166                cold += 1;
1167            }
1168        }
1169
1170        (hot, warm, cold)
1171    }
1172
1173    fn create_migration_checkpoint(&self, from_version: u32) -> McpResult<Option<PathBuf>> {
1174        if !self.file_path.exists() {
1175            return Ok(None);
1176        }
1177
1178        let migration_dir = resolve_migration_dir(&self.file_path);
1179        std::fs::create_dir_all(&migration_dir).map_err(McpError::Io)?;
1180
1181        let ts = chrono::Utc::now().format("%Y%m%d%H%M%S");
1182        let stem = self
1183            .file_path
1184            .file_stem()
1185            .and_then(OsStr::to_str)
1186            .unwrap_or("brain");
1187        let checkpoint = migration_dir.join(format!("{stem}.v{from_version}.{ts}.amem.checkpoint"));
1188        std::fs::copy(&self.file_path, &checkpoint).map_err(McpError::Io)?;
1189        Ok(Some(checkpoint))
1190    }
1191
1192    fn next_backup_path(&self) -> PathBuf {
1193        let ts = chrono::Utc::now().format("%Y%m%d%H%M%S");
1194        let stem = self
1195            .file_path
1196            .file_stem()
1197            .and_then(OsStr::to_str)
1198            .unwrap_or("brain");
1199        self.backups_dir.join(format!("{stem}.{ts}.amem.bak"))
1200    }
1201
1202    fn prune_old_backups(&self) -> McpResult<()> {
1203        let mut entries = std::fs::read_dir(&self.backups_dir)
1204            .map_err(McpError::Io)?
1205            .filter_map(Result::ok)
1206            .filter(|entry| {
1207                entry
1208                    .file_name()
1209                    .to_str()
1210                    .map(|name| name.ends_with(".amem.bak"))
1211                    .unwrap_or(false)
1212            })
1213            .collect::<Vec<_>>();
1214
1215        if entries.len() <= self.backup_retention {
1216            return Ok(());
1217        }
1218
1219        entries.sort_by_key(|entry| {
1220            entry
1221                .metadata()
1222                .and_then(|m| m.modified())
1223                .ok()
1224                .unwrap_or(std::time::SystemTime::UNIX_EPOCH)
1225        });
1226        let to_remove = entries.len().saturating_sub(self.backup_retention);
1227        for entry in entries.into_iter().take(to_remove) {
1228            let _ = std::fs::remove_file(entry.path());
1229        }
1230        Ok(())
1231    }
1232}
1233
1234fn resolve_backups_dir(memory_path: &std::path::Path) -> PathBuf {
1235    if let Ok(custom) = std::env::var("AMEM_AUTO_BACKUP_DIR") {
1236        let trimmed = custom.trim();
1237        if !trimmed.is_empty() {
1238            return PathBuf::from(trimmed);
1239        }
1240    }
1241
1242    let parent = memory_path.parent().unwrap_or(std::path::Path::new("."));
1243    parent.join(".amem-backups")
1244}
1245
1246fn resolve_migration_dir(memory_path: &Path) -> PathBuf {
1247    let parent = memory_path.parent().unwrap_or(std::path::Path::new("."));
1248    parent.join(".amem-migrations")
1249}
1250
1251fn read_storage_version(path: &Path) -> Option<u32> {
1252    let mut file = std::fs::File::open(path).ok()?;
1253    let mut header = [0u8; 8];
1254    file.read_exact(&mut header).ok()?;
1255    if &header[0..4] != b"AMEM" {
1256        return None;
1257    }
1258    Some(u32::from_le_bytes([
1259        header[4], header[5], header[6], header[7],
1260    ]))
1261}
1262
1263fn read_env_u64(name: &str, default_value: u64) -> u64 {
1264    std::env::var(name)
1265        .ok()
1266        .and_then(|v| v.parse::<u64>().ok())
1267        .unwrap_or(default_value)
1268}
1269
1270fn read_env_u32(name: &str, default_value: u32) -> u32 {
1271    std::env::var(name)
1272        .ok()
1273        .and_then(|v| v.parse::<u32>().ok())
1274        .unwrap_or(default_value)
1275}
1276
1277fn read_env_usize(name: &str, default_value: usize) -> usize {
1278    std::env::var(name)
1279        .ok()
1280        .and_then(|v| v.parse::<usize>().ok())
1281        .unwrap_or(default_value)
1282}
1283
1284fn read_env_f32(name: &str, default_value: f32) -> f32 {
1285    std::env::var(name)
1286        .ok()
1287        .and_then(|v| v.parse::<f32>().ok())
1288        .unwrap_or(default_value)
1289}
1290
1291fn read_env_bool(name: &str, default_value: bool) -> bool {
1292    std::env::var(name)
1293        .ok()
1294        .map(|v| {
1295            matches!(
1296                v.trim().to_ascii_lowercase().as_str(),
1297                "1" | "true" | "yes" | "on"
1298            )
1299        })
1300        .unwrap_or(default_value)
1301}
1302
1303fn read_env_string(name: &str) -> Option<String> {
1304    std::env::var(name).ok().map(|v| v.trim().to_string())
1305}
1306
1307fn resolve_health_ledger_dir() -> PathBuf {
1308    if let Some(custom) = read_env_string("AMEM_HEALTH_LEDGER_DIR") {
1309        if !custom.is_empty() {
1310            return PathBuf::from(custom);
1311        }
1312    }
1313    if let Some(custom) = read_env_string("AGENTRA_HEALTH_LEDGER_DIR") {
1314        if !custom.is_empty() {
1315            return PathBuf::from(custom);
1316        }
1317    }
1318
1319    let home = std::env::var("HOME")
1320        .ok()
1321        .map(PathBuf::from)
1322        .unwrap_or_else(|| PathBuf::from("."));
1323    home.join(".agentra").join("health-ledger")
1324}
1325
1326fn extract_prompt_capture_text(
1327    prompt_name: &str,
1328    arguments: Option<&Value>,
1329) -> McpResult<Option<String>> {
1330    let args = arguments.unwrap_or(&Value::Null);
1331    let fields = collect_text_fields_by_keys(
1332        args,
1333        &[
1334            "information",
1335            "context",
1336            "topic",
1337            "old_belief",
1338            "new_information",
1339            "reason",
1340            "summary",
1341            "instruction",
1342            "prompt",
1343            "query",
1344        ],
1345        8,
1346    );
1347    if fields.is_empty() {
1348        return Ok(None);
1349    }
1350    let joined = fields.join(" | ");
1351    Ok(Some(format!(
1352        "[auto-capture][prompt] template={prompt_name} input={joined}"
1353    )))
1354}
1355
1356fn extract_safe_tool_capture_text(
1357    tool_name: &str,
1358    arguments: Option<&Value>,
1359) -> McpResult<Option<String>> {
1360    let args = arguments.unwrap_or(&Value::Null);
1361    let keys = ["feedback", "summary", "note"];
1362    if tool_name != "session_end" {
1363        // Keep safe mode low-noise and non-invasive: only capture explicit feedback fields.
1364        let explicit_feedback = collect_text_fields_by_keys(args, &["feedback", "note"], 4);
1365        if explicit_feedback.is_empty() {
1366            return Ok(None);
1367        }
1368    }
1369    let fields = collect_text_fields_by_keys(args, &keys, 6);
1370    if fields.is_empty() {
1371        return Ok(None);
1372    }
1373    Ok(Some(format!(
1374        "[auto-capture][feedback] tool={tool_name} context={}",
1375        fields.join(" | ")
1376    )))
1377}
1378
1379fn extract_full_tool_capture_text(
1380    tool_name: &str,
1381    arguments: Option<&Value>,
1382) -> McpResult<Option<String>> {
1383    if tool_name == "memory_add" {
1384        return Ok(None);
1385    }
1386    let args = arguments.unwrap_or(&Value::Null);
1387    let preferred = collect_text_fields_by_keys(
1388        args,
1389        &[
1390            "query",
1391            "content",
1392            "prompt",
1393            "new_content",
1394            "reason",
1395            "summary",
1396            "topic",
1397            "instruction",
1398            "information",
1399            "context",
1400            "feedback",
1401        ],
1402        10,
1403    );
1404
1405    let fields = if preferred.is_empty() {
1406        collect_all_string_like_fields(args, 8)
1407    } else {
1408        preferred
1409    };
1410
1411    if fields.is_empty() {
1412        return Ok(None);
1413    }
1414    Ok(Some(format!(
1415        "[auto-capture][tool] tool={tool_name} input={}",
1416        fields.join(" | ")
1417    )))
1418}
1419
1420fn collect_text_fields_by_keys(value: &Value, keys: &[&str], limit: usize) -> Vec<String> {
1421    let mut out = Vec::new();
1422    let mut seen = std::collections::BTreeSet::<String>::new();
1423
1424    fn walk(
1425        value: &Value,
1426        path: String,
1427        keys: &[&str],
1428        out: &mut Vec<String>,
1429        seen: &mut std::collections::BTreeSet<String>,
1430        limit: usize,
1431    ) {
1432        if out.len() >= limit {
1433            return;
1434        }
1435        match value {
1436            Value::Object(map) => {
1437                for (k, v) in map {
1438                    if out.len() >= limit {
1439                        break;
1440                    }
1441                    let next = if path.is_empty() {
1442                        k.to_string()
1443                    } else {
1444                        format!("{path}.{k}")
1445                    };
1446                    let key_match = keys
1447                        .iter()
1448                        .any(|needle| k.eq_ignore_ascii_case(needle) || next.ends_with(needle));
1449                    if key_match {
1450                        if let Some(s) = value_to_compact_string(v) {
1451                            let entry = format!("{next}={s}");
1452                            if seen.insert(entry.clone()) {
1453                                out.push(entry);
1454                            }
1455                        }
1456                    }
1457                    walk(v, next, keys, out, seen, limit);
1458                }
1459            }
1460            Value::Array(items) => {
1461                for (idx, item) in items.iter().enumerate() {
1462                    if out.len() >= limit {
1463                        break;
1464                    }
1465                    let next = format!("{path}[{idx}]");
1466                    walk(item, next, keys, out, seen, limit);
1467                }
1468            }
1469            _ => {}
1470        }
1471    }
1472
1473    walk(value, String::new(), keys, &mut out, &mut seen, limit);
1474    out
1475}
1476
1477fn collect_all_string_like_fields(value: &Value, limit: usize) -> Vec<String> {
1478    let mut out = Vec::new();
1479    fn walk(value: &Value, path: String, out: &mut Vec<String>, limit: usize) {
1480        if out.len() >= limit {
1481            return;
1482        }
1483        match value {
1484            Value::Object(map) => {
1485                for (k, v) in map {
1486                    if out.len() >= limit {
1487                        break;
1488                    }
1489                    let next = if path.is_empty() {
1490                        k.to_string()
1491                    } else {
1492                        format!("{path}.{k}")
1493                    };
1494                    walk(v, next, out, limit);
1495                }
1496            }
1497            Value::Array(items) => {
1498                for (idx, item) in items.iter().enumerate() {
1499                    if out.len() >= limit {
1500                        break;
1501                    }
1502                    walk(item, format!("{path}[{idx}]"), out, limit);
1503                }
1504            }
1505            _ => {
1506                if let Some(s) = value_to_compact_string(value) {
1507                    out.push(format!("{path}={s}"));
1508                }
1509            }
1510        }
1511    }
1512    walk(value, String::new(), &mut out, limit);
1513    out
1514}
1515
1516fn value_to_compact_string(value: &Value) -> Option<String> {
1517    match value {
1518        Value::String(s) => {
1519            let trimmed = s.trim();
1520            if trimmed.is_empty() {
1521                None
1522            } else {
1523                Some(trimmed.to_string())
1524            }
1525        }
1526        Value::Number(n) => Some(n.to_string()),
1527        Value::Bool(b) => Some(b.to_string()),
1528        Value::Null => None,
1529        Value::Array(arr) => {
1530            if arr.is_empty() {
1531                None
1532            } else {
1533                Some(format!("<array:{}>", arr.len()))
1534            }
1535        }
1536        Value::Object(map) => {
1537            if map.is_empty() {
1538                None
1539            } else {
1540                Some(format!("<object:{}>", map.len()))
1541            }
1542        }
1543    }
1544}
1545
1546fn redact_sensitive_tokens(text: &str) -> String {
1547    text.split_whitespace()
1548        .map(redact_token)
1549        .collect::<Vec<_>>()
1550        .join(" ")
1551}
1552
1553fn redact_token(token: &str) -> String {
1554    let trimmed = token.trim_matches(|c: char| c == '"' || c == '\'' || c == ',' || c == ';');
1555    let lower = trimmed.to_ascii_lowercase();
1556    if trimmed.starts_with("/Users/")
1557        || trimmed.starts_with("C:\\Users\\")
1558        || trimmed.contains("/Users/")
1559        || trimmed.contains("C:\\Users\\")
1560    {
1561        return "[REDACTED_PATH]".to_string();
1562    }
1563    if trimmed.contains('@') && trimmed.contains('.') {
1564        return "[REDACTED_EMAIL]".to_string();
1565    }
1566    if lower.starts_with("sk-")
1567        || lower.contains("api_key")
1568        || lower.contains("access_token")
1569        || lower.contains("bearer")
1570        || lower.contains("authorization")
1571    {
1572        return "[REDACTED_SECRET]".to_string();
1573    }
1574    if looks_like_long_secret(trimmed) {
1575        return "[REDACTED_SECRET]".to_string();
1576    }
1577    token.to_string()
1578}
1579
1580fn looks_like_long_secret(token: &str) -> bool {
1581    if token.len() < 24 {
1582        return false;
1583    }
1584    token
1585        .chars()
1586        .all(|c| c.is_ascii_alphanumeric() || c == '_' || c == '-' || c == '.')
1587}
1588
1589/// File-based exclusive lock for concurrent `.amem` access.
1590///
1591/// Uses a sidecar `.amem.lock` file with `create_new` (O_EXCL) for atomic
1592/// creation. Stale locks older than 60 seconds are auto-cleaned. The lock
1593/// is released on drop.
1594struct FileLock {
1595    lock_path: PathBuf,
1596}
1597
1598impl FileLock {
1599    /// Acquire an exclusive lock for the given data file.
1600    /// Spins with a 50ms backoff until the lock is available.
1601    fn acquire(data_path: &Path) -> McpResult<Self> {
1602        let lock_path = data_path.with_extension("amem.lock");
1603        let stale_threshold = Duration::from_secs(60);
1604        let max_attempts = 200; // 200 * 50ms = 10 seconds max wait
1605
1606        for attempt in 0..max_attempts {
1607            match OpenOptions::new()
1608                .write(true)
1609                .create_new(true)
1610                .open(&lock_path)
1611            {
1612                Ok(_file) => {
1613                    if attempt > 0 {
1614                        tracing::debug!(
1615                            "Acquired file lock after {} attempts: {}",
1616                            attempt + 1,
1617                            lock_path.display()
1618                        );
1619                    }
1620                    return Ok(FileLock { lock_path });
1621                }
1622                Err(e) if e.kind() == std::io::ErrorKind::AlreadyExists => {
1623                    // Check if the lock is stale (owner crashed).
1624                    if let Ok(meta) = std::fs::metadata(&lock_path) {
1625                        let is_stale = meta
1626                            .modified()
1627                            .ok()
1628                            .and_then(|m| m.elapsed().ok())
1629                            .map(|age| age > stale_threshold)
1630                            .unwrap_or(false);
1631                        if is_stale {
1632                            tracing::warn!("Removing stale lock file: {}", lock_path.display());
1633                            let _ = std::fs::remove_file(&lock_path);
1634                            continue;
1635                        }
1636                    }
1637                    std::thread::sleep(Duration::from_millis(50));
1638                }
1639                Err(e) => {
1640                    return Err(McpError::Io(e));
1641                }
1642            }
1643        }
1644
1645        Err(McpError::AgenticMemory(format!(
1646            "Timed out waiting for file lock: {}",
1647            lock_path.display()
1648        )))
1649    }
1650}
1651
1652impl Drop for FileLock {
1653    fn drop(&mut self) {
1654        let _ = std::fs::remove_file(&self.lock_path);
1655    }
1656}
1657
1658#[cfg(test)]
1659mod tests {
1660    use super::*;
1661    use serde_json::json;
1662
1663    #[test]
1664    fn budget_projection_available_with_timeline() {
1665        let dir = tempfile::tempdir().expect("tempdir");
1666        let brain = dir.path().join("projection.amem");
1667        let mut manager = SessionManager::open(brain.to_str().expect("path")).expect("open");
1668
1669        let (id_a, _) = manager
1670            .add_event(EventType::Fact, "old fact", 0.9, vec![])
1671            .expect("add fact");
1672        let (_id_b, _) = manager
1673            .add_event(EventType::Fact, "new fact", 0.9, vec![])
1674            .expect("add fact");
1675
1676        {
1677            let graph = manager.graph_mut();
1678            let old = graph.get_node_mut(id_a).expect("node");
1679            old.created_at = old.created_at.saturating_sub(15 * 24 * 3600 * 1_000_000);
1680        }
1681        manager.save().expect("save");
1682        let size = manager.current_file_size_bytes();
1683        let projected = manager.projected_file_size_bytes(size);
1684        assert!(size > 0);
1685        assert!(projected.is_some());
1686    }
1687
1688    #[test]
1689    fn budget_auto_rollup_archives_completed_session() {
1690        let dir = tempfile::tempdir().expect("tempdir");
1691        let brain = dir.path().join("rollup.amem");
1692        let mut manager = SessionManager::open(brain.to_str().expect("path")).expect("open");
1693
1694        // Build current session with enough content, then advance so it becomes completed.
1695        let _ = manager
1696            .add_event(EventType::Fact, "alpha", 0.8, vec![])
1697            .expect("add");
1698        let _ = manager
1699            .add_event(EventType::Decision, "beta", 0.9, vec![])
1700            .expect("add");
1701        manager.start_session(None).expect("session");
1702        manager.save().expect("save");
1703
1704        // Force tiny budget to trigger rollup.
1705        manager.storage_budget_mode = StorageBudgetMode::AutoRollup;
1706        manager.storage_budget_max_bytes = 1;
1707        manager.storage_budget_target_fraction = 0.5;
1708
1709        manager
1710            .maybe_enforce_storage_budget()
1711            .expect("enforce budget");
1712
1713        let episode_count = manager
1714            .graph()
1715            .nodes()
1716            .iter()
1717            .filter(|n| n.event_type == EventType::Episode)
1718            .count();
1719        assert!(episode_count >= 1);
1720        assert!(manager.storage_budget_rollup_count >= 1);
1721    }
1722
1723    #[test]
1724    fn auto_capture_off_noop() {
1725        let dir = tempfile::tempdir().expect("tempdir");
1726        let brain = dir.path().join("capture-off.amem");
1727        let mut manager = SessionManager::open(brain.to_str().expect("path")).expect("open");
1728        manager.auto_capture_mode = AutoCaptureMode::Off;
1729
1730        let captured = manager
1731            .capture_prompt_request(
1732                "remember",
1733                Some(&json!({"information":"hello world","context":"ctx"})),
1734            )
1735            .expect("capture");
1736        assert!(captured.is_none());
1737        assert_eq!(manager.graph().node_count(), 0);
1738    }
1739
1740    #[test]
1741    fn auto_capture_full_records_and_redacts() {
1742        let dir = tempfile::tempdir().expect("tempdir");
1743        let brain = dir.path().join("capture-full.amem");
1744        let mut manager = SessionManager::open(brain.to_str().expect("path")).expect("open");
1745        manager.auto_capture_mode = AutoCaptureMode::Full;
1746        manager.auto_capture_redact = true;
1747
1748        manager
1749            .capture_tool_call(
1750                "memory_query",
1751                Some(&json!({
1752                    "query":"Find anything about token sk-THISISALONGSECRET123456",
1753                    "context":"/Users/omoshola/Documents/private.txt",
1754                    "reason":"email me at test@example.com"
1755                })),
1756            )
1757            .expect("capture");
1758
1759        assert!(manager.graph().node_count() >= 1);
1760        let latest = manager
1761            .graph()
1762            .nodes()
1763            .iter()
1764            .max_by_key(|n| n.id)
1765            .expect("node");
1766        assert!(latest.content.contains("[auto-capture][tool]"));
1767        assert!(latest.content.contains("[REDACTED_SECRET]"));
1768        assert!(latest.content.contains("[REDACTED_PATH]"));
1769        assert!(latest.content.contains("[REDACTED_EMAIL]"));
1770    }
1771
1772    #[test]
1773    fn auto_capture_temporal_chain() {
1774        let dir = tempfile::tempdir().expect("tempdir");
1775        let brain = dir.path().join("chain.amem");
1776        let mut manager = SessionManager::open(brain.to_str().expect("path")).expect("open");
1777        manager.auto_capture_mode = AutoCaptureMode::Full;
1778
1779        // First capture: no predecessor.
1780        let id1 = manager
1781            .capture_tool_call("memory_query", Some(&json!({"query": "first question"})))
1782            .expect("capture")
1783            .expect("node_id");
1784
1785        assert_eq!(manager.last_temporal_node_id(), Some(id1));
1786
1787        // Second capture: should link to first.
1788        let id2 = manager
1789            .capture_tool_call(
1790                "memory_similar",
1791                Some(&json!({"query_text": "second question"})),
1792            )
1793            .expect("capture")
1794            .expect("node_id");
1795
1796        assert_eq!(manager.last_temporal_node_id(), Some(id2));
1797
1798        // Verify the TemporalNext edge exists: id1 -> id2.
1799        let has_temporal = manager.graph().edges().iter().any(|e| {
1800            e.source_id == id1 && e.target_id == id2 && e.edge_type == EdgeType::TemporalNext
1801        });
1802        assert!(has_temporal, "Expected TemporalNext edge from id1 to id2");
1803    }
1804
1805    #[test]
1806    fn temporal_chain_resets_on_new_session() {
1807        let dir = tempfile::tempdir().expect("tempdir");
1808        let brain = dir.path().join("reset.amem");
1809        let mut manager = SessionManager::open(brain.to_str().expect("path")).expect("open");
1810        manager.auto_capture_mode = AutoCaptureMode::Full;
1811
1812        let _id1 = manager
1813            .capture_tool_call("memory_query", Some(&json!({"query": "first"})))
1814            .expect("capture");
1815
1816        assert!(manager.last_temporal_node_id().is_some());
1817
1818        // Starting a new session should reset the chain.
1819        manager.start_session(None).expect("new session");
1820        assert!(manager.last_temporal_node_id().is_none());
1821    }
1822
1823    #[test]
1824    fn memory_add_joins_temporal_chain() {
1825        let dir = tempfile::tempdir().expect("tempdir");
1826        let brain = dir.path().join("splice.amem");
1827        let mut manager = SessionManager::open(brain.to_str().expect("path")).expect("open");
1828        manager.auto_capture_mode = AutoCaptureMode::Full;
1829
1830        // Create a chain head via auto-capture.
1831        let id1 = manager
1832            .capture_tool_call("memory_query", Some(&json!({"query": "something"})))
1833            .expect("capture")
1834            .expect("node_id");
1835
1836        // Simulate what memory_add tool does: add_event + link_temporal + advance.
1837        let (id2, _) = manager
1838            .add_event(EventType::Fact, "User prefers dark mode", 0.9, vec![])
1839            .expect("add_event");
1840        manager.link_temporal(id1, id2).expect("link");
1841        manager.advance_temporal_chain(id2);
1842
1843        assert_eq!(manager.last_temporal_node_id(), Some(id2));
1844
1845        let has_edge = manager.graph().edges().iter().any(|e| {
1846            e.source_id == id1 && e.target_id == id2 && e.edge_type == EdgeType::TemporalNext
1847        });
1848        assert!(has_edge, "memory_add node should be linked into chain");
1849    }
1850}