Skip to main content

agentic_memory_mcp/session/
manager.rs

1//! Graph lifecycle management, file I/O, and session tracking.
2
3use std::collections::HashMap;
4use std::ffi::OsStr;
5use std::fs::OpenOptions;
6use std::io::Read as _;
7use std::path::{Path, PathBuf};
8use std::time::{Duration, Instant, SystemTime};
9
10use agentic_memory::{
11    AmemReader, AmemWriter, CognitiveEventBuilder, Edge, EdgeType, EventType, MemoryGraph,
12    QueryEngine, WriteEngine,
13};
14use serde_json::Value;
15
16use crate::types::{McpError, McpResult};
17
18/// Default auto-save interval.
19const DEFAULT_AUTO_SAVE_SECS: u64 = 30;
20/// Default backup interval.
21const DEFAULT_BACKUP_INTERVAL_SECS: u64 = 900;
22/// Default number of backups to retain per brain file.
23const DEFAULT_BACKUP_RETENTION: usize = 24;
24/// Default maintenance sleep-cycle interval.
25const DEFAULT_SLEEP_CYCLE_SECS: u64 = 1800;
26/// Minimum completed-session size before auto-archive.
27const DEFAULT_ARCHIVE_MIN_SESSION_NODES: usize = 25;
28/// Default hot-tier threshold (decay score).
29const DEFAULT_HOT_MIN_DECAY: f32 = 0.7;
30/// Default warm-tier threshold (decay score).
31const DEFAULT_WARM_MIN_DECAY: f32 = 0.3;
32/// Default sustained mutation rate threshold before throttling heavy maintenance.
33const DEFAULT_SLA_MAX_MUTATIONS_PER_MIN: u32 = 240;
34/// Default interval for writing health-ledger snapshots.
35const DEFAULT_HEALTH_LEDGER_EMIT_SECS: u64 = 30;
36/// Default long-horizon storage budget target (2 GiB over 20 years).
37const DEFAULT_STORAGE_BUDGET_BYTES: u64 = 2 * 1024 * 1024 * 1024;
38/// Default storage budget projection horizon.
39const DEFAULT_STORAGE_BUDGET_HORIZON_YEARS: u32 = 20;
40/// Default maximum chars persisted for one auto-captured prompt/feedback item.
41const DEFAULT_AUTO_CAPTURE_MAX_CHARS: usize = 2048;
42/// Current `.amem` storage version used by this server.
43const CURRENT_AMEM_VERSION: u32 = 1;
44
45#[derive(Debug, Clone, Copy)]
46enum AutonomicProfile {
47    Desktop,
48    Cloud,
49    Aggressive,
50}
51
52#[derive(Debug, Clone, Copy, PartialEq, Eq)]
53enum StorageMigrationPolicy {
54    AutoSafe,
55    Strict,
56    Off,
57}
58
59#[derive(Debug, Clone, Copy, PartialEq, Eq)]
60enum StorageBudgetMode {
61    AutoRollup,
62    Warn,
63    Off,
64}
65
66#[derive(Debug, Clone, Copy, PartialEq, Eq)]
67enum AutoCaptureMode {
68    /// Capture prompt-focused events and feedback context.
69    Safe,
70    /// Capture broader tool input text (except explicit memory_add payload duplication).
71    Full,
72    /// Disable automatic capture.
73    Off,
74}
75
76#[derive(Debug, Clone, Copy)]
77struct ProfileDefaults {
78    auto_save_secs: u64,
79    backup_secs: u64,
80    backup_retention: usize,
81    sleep_cycle_secs: u64,
82    sleep_idle_secs: u64,
83    archive_min_session_nodes: usize,
84    hot_min_decay: f32,
85    warm_min_decay: f32,
86    sla_max_mutations_per_min: u32,
87}
88
89impl AutonomicProfile {
90    fn from_env(name: &str) -> Self {
91        let raw = read_env_string(name).unwrap_or_else(|| "desktop".to_string());
92        match raw.trim().to_ascii_lowercase().as_str() {
93            "cloud" => Self::Cloud,
94            "aggressive" => Self::Aggressive,
95            _ => Self::Desktop,
96        }
97    }
98
99    fn defaults(self) -> ProfileDefaults {
100        match self {
101            Self::Desktop => ProfileDefaults {
102                auto_save_secs: DEFAULT_AUTO_SAVE_SECS,
103                backup_secs: DEFAULT_BACKUP_INTERVAL_SECS,
104                backup_retention: DEFAULT_BACKUP_RETENTION,
105                sleep_cycle_secs: DEFAULT_SLEEP_CYCLE_SECS,
106                sleep_idle_secs: 180,
107                archive_min_session_nodes: DEFAULT_ARCHIVE_MIN_SESSION_NODES,
108                hot_min_decay: DEFAULT_HOT_MIN_DECAY,
109                warm_min_decay: DEFAULT_WARM_MIN_DECAY,
110                sla_max_mutations_per_min: DEFAULT_SLA_MAX_MUTATIONS_PER_MIN,
111            },
112            Self::Cloud => ProfileDefaults {
113                auto_save_secs: 15,
114                backup_secs: 600,
115                backup_retention: 48,
116                sleep_cycle_secs: 900,
117                sleep_idle_secs: 90,
118                archive_min_session_nodes: 50,
119                hot_min_decay: 0.75,
120                warm_min_decay: 0.4,
121                sla_max_mutations_per_min: 600,
122            },
123            Self::Aggressive => ProfileDefaults {
124                auto_save_secs: 10,
125                backup_secs: 300,
126                backup_retention: 16,
127                sleep_cycle_secs: 300,
128                sleep_idle_secs: 45,
129                archive_min_session_nodes: 15,
130                hot_min_decay: 0.8,
131                warm_min_decay: 0.5,
132                sla_max_mutations_per_min: 900,
133            },
134        }
135    }
136
137    fn as_str(self) -> &'static str {
138        match self {
139            Self::Desktop => "desktop",
140            Self::Cloud => "cloud",
141            Self::Aggressive => "aggressive",
142        }
143    }
144}
145
146impl StorageMigrationPolicy {
147    fn from_env(name: &str) -> Self {
148        let raw = read_env_string(name).unwrap_or_else(|| "auto-safe".to_string());
149        match raw.trim().to_ascii_lowercase().as_str() {
150            "strict" => Self::Strict,
151            "off" | "disabled" | "none" => Self::Off,
152            _ => Self::AutoSafe,
153        }
154    }
155
156    fn as_str(self) -> &'static str {
157        match self {
158            Self::AutoSafe => "auto-safe",
159            Self::Strict => "strict",
160            Self::Off => "off",
161        }
162    }
163}
164
165impl StorageBudgetMode {
166    fn from_env(name: &str) -> Self {
167        let raw = read_env_string(name).unwrap_or_else(|| "auto-rollup".to_string());
168        match raw.trim().to_ascii_lowercase().as_str() {
169            "warn" => Self::Warn,
170            "off" | "disabled" | "none" => Self::Off,
171            _ => Self::AutoRollup,
172        }
173    }
174
175    fn as_str(self) -> &'static str {
176        match self {
177            Self::AutoRollup => "auto-rollup",
178            Self::Warn => "warn",
179            Self::Off => "off",
180        }
181    }
182}
183
184impl AutoCaptureMode {
185    fn from_env(name: &str) -> Self {
186        let raw = read_env_string(name).unwrap_or_else(|| "safe".to_string());
187        match raw.trim().to_ascii_lowercase().as_str() {
188            "full" => Self::Full,
189            "off" | "disabled" | "none" => Self::Off,
190            _ => Self::Safe,
191        }
192    }
193
194    fn as_str(self) -> &'static str {
195        match self {
196            Self::Safe => "safe",
197            Self::Full => "full",
198            Self::Off => "off",
199        }
200    }
201}
202
203/// Manages the memory graph lifecycle, file I/O, and session state.
204pub struct SessionManager {
205    graph: MemoryGraph,
206    query_engine: QueryEngine,
207    write_engine: WriteEngine,
208    file_path: PathBuf,
209    current_session: u32,
210    profile: AutonomicProfile,
211    migration_policy: StorageMigrationPolicy,
212    dirty: bool,
213    last_save: Instant,
214    auto_save_interval: Duration,
215    backup_interval: Duration,
216    backup_retention: usize,
217    backups_dir: PathBuf,
218    save_generation: u64,
219    last_backup_generation: u64,
220    last_backup: Instant,
221    sleep_cycle_interval: Duration,
222    archive_min_session_nodes: usize,
223    hot_min_decay: f32,
224    warm_min_decay: f32,
225    sla_max_mutations_per_min: u32,
226    last_sleep_cycle: Instant,
227    sleep_idle_min: Duration,
228    last_activity: Instant,
229    mutation_window_started: Instant,
230    mutation_window_count: u32,
231    maintenance_throttle_count: u64,
232    last_health_ledger_emit: Instant,
233    health_ledger_emit_interval: Duration,
234    storage_budget_mode: StorageBudgetMode,
235    storage_budget_max_bytes: u64,
236    storage_budget_horizon_years: u32,
237    storage_budget_target_fraction: f32,
238    storage_budget_rollup_count: u64,
239    auto_capture_mode: AutoCaptureMode,
240    auto_capture_redact: bool,
241    auto_capture_max_chars: usize,
242    auto_capture_count: u64,
243    /// ID of the last node added to the temporal chain in this session.
244    /// Used to create TemporalNext edges between consecutive captures.
245    last_temporal_node_id: Option<u64>,
246    /// Last known file modification time (for detecting external writes).
247    last_file_mtime: Option<SystemTime>,
248    /// Multi-context workspace manager for cross-memory queries.
249    workspace_manager: super::workspace::WorkspaceManager,
250}
251
252impl SessionManager {
253    /// Open or create a memory file at the given path.
254    pub fn open(path: &str) -> McpResult<Self> {
255        let file_path = PathBuf::from(path);
256        let dimension = agentic_memory::DEFAULT_DIMENSION;
257        let file_existed = file_path.exists();
258        let profile = AutonomicProfile::from_env("AMEM_AUTONOMIC_PROFILE");
259        let defaults = profile.defaults();
260        let migration_policy = StorageMigrationPolicy::from_env("AMEM_STORAGE_MIGRATION_POLICY");
261        let detected_version = if file_existed {
262            read_storage_version(&file_path)
263        } else {
264            None
265        };
266        let legacy_version = detected_version.filter(|v| *v < CURRENT_AMEM_VERSION);
267
268        let graph = if file_existed {
269            tracing::info!("Opening existing memory file: {}", file_path.display());
270            AmemReader::read_from_file(&file_path)
271                .map_err(|e| McpError::AgenticMemory(format!("Failed to read memory file: {e}")))?
272        } else {
273            tracing::info!("Creating new memory file: {}", file_path.display());
274            // Ensure parent directory exists
275            if let Some(parent) = file_path.parent() {
276                std::fs::create_dir_all(parent).map_err(|e| {
277                    McpError::Io(std::io::Error::other(format!(
278                        "Failed to create directory {}: {e}",
279                        parent.display()
280                    )))
281                })?;
282            }
283            MemoryGraph::new(dimension)
284        };
285
286        // Determine the next session ID from existing sessions.
287        // Incorporate PID to avoid collisions when multiple MCP instances share
288        // the same .amem file (e.g. two Claude Code windows on different projects).
289        let session_ids = graph.session_index().session_ids();
290        let max_existing = session_ids.iter().copied().max().unwrap_or(0);
291        let pid_component = std::process::id() % 1000;
292        let current_session = max_existing.saturating_add(1).saturating_add(pid_component);
293
294        tracing::info!(
295            "Session {} started. Graph has {} nodes, {} edges.",
296            current_session,
297            graph.node_count(),
298            graph.edge_count()
299        );
300        tracing::info!(
301            "Autonomic profile={} migration_policy={}",
302            profile.as_str(),
303            migration_policy.as_str()
304        );
305
306        let auto_save_secs = read_env_u64("AMEM_AUTOSAVE_SECS", defaults.auto_save_secs);
307        let backup_secs = read_env_u64("AMEM_AUTO_BACKUP_SECS", defaults.backup_secs).max(30);
308        let backup_retention =
309            read_env_usize("AMEM_AUTO_BACKUP_RETENTION", defaults.backup_retention).max(1);
310        let backups_dir = resolve_backups_dir(&file_path);
311        let sleep_cycle_secs =
312            read_env_u64("AMEM_SLEEP_CYCLE_SECS", defaults.sleep_cycle_secs).max(60);
313        let sleep_idle_secs =
314            read_env_u64("AMEM_SLEEP_IDLE_SECS", defaults.sleep_idle_secs).max(30);
315        let archive_min_session_nodes = read_env_usize(
316            "AMEM_ARCHIVE_MIN_SESSION_NODES",
317            defaults.archive_min_session_nodes,
318        )
319        .max(1);
320        let hot_min_decay =
321            read_env_f32("AMEM_TIER_HOT_MIN_DECAY", defaults.hot_min_decay).clamp(0.0, 1.0);
322        let warm_min_decay = read_env_f32("AMEM_TIER_WARM_MIN_DECAY", defaults.warm_min_decay)
323            .clamp(0.0, 1.0)
324            .min(hot_min_decay);
325        let sla_max_mutations_per_min = read_env_u32(
326            "AMEM_SLA_MAX_MUTATIONS_PER_MIN",
327            defaults.sla_max_mutations_per_min,
328        )
329        .max(1);
330        let health_ledger_emit_interval = Duration::from_secs(
331            read_env_u64(
332                "AMEM_HEALTH_LEDGER_EMIT_SECS",
333                DEFAULT_HEALTH_LEDGER_EMIT_SECS,
334            )
335            .max(5),
336        );
337        let storage_budget_mode = StorageBudgetMode::from_env("AMEM_STORAGE_BUDGET_MODE");
338        let storage_budget_max_bytes =
339            read_env_u64("AMEM_STORAGE_BUDGET_BYTES", DEFAULT_STORAGE_BUDGET_BYTES).max(1);
340        let storage_budget_horizon_years = read_env_u32(
341            "AMEM_STORAGE_BUDGET_HORIZON_YEARS",
342            DEFAULT_STORAGE_BUDGET_HORIZON_YEARS,
343        )
344        .max(1);
345        let storage_budget_target_fraction =
346            read_env_f32("AMEM_STORAGE_BUDGET_TARGET_FRACTION", 0.85).clamp(0.50, 0.99);
347        let auto_capture_mode = AutoCaptureMode::from_env("AMEM_AUTO_CAPTURE_MODE");
348        let auto_capture_redact = read_env_bool("AMEM_AUTO_CAPTURE_REDACT", true);
349        let auto_capture_max_chars = read_env_usize(
350            "AMEM_AUTO_CAPTURE_MAX_CHARS",
351            DEFAULT_AUTO_CAPTURE_MAX_CHARS,
352        )
353        .clamp(256, 16384);
354
355        let mut manager = Self {
356            graph,
357            query_engine: QueryEngine::new(),
358            write_engine: WriteEngine::new(dimension),
359            file_path,
360            current_session,
361            profile,
362            migration_policy,
363            dirty: false,
364            last_save: Instant::now(),
365            auto_save_interval: Duration::from_secs(auto_save_secs),
366            backup_interval: Duration::from_secs(backup_secs),
367            backup_retention,
368            backups_dir,
369            save_generation: if file_existed { 1 } else { 0 },
370            last_backup_generation: 0,
371            last_backup: Instant::now(),
372            sleep_cycle_interval: Duration::from_secs(sleep_cycle_secs),
373            archive_min_session_nodes,
374            hot_min_decay,
375            warm_min_decay,
376            sla_max_mutations_per_min,
377            last_sleep_cycle: Instant::now(),
378            sleep_idle_min: Duration::from_secs(sleep_idle_secs),
379            last_activity: Instant::now(),
380            mutation_window_started: Instant::now(),
381            mutation_window_count: 0,
382            maintenance_throttle_count: 0,
383            last_health_ledger_emit: Instant::now()
384                .checked_sub(health_ledger_emit_interval)
385                .unwrap_or_else(Instant::now),
386            health_ledger_emit_interval,
387            storage_budget_mode,
388            storage_budget_max_bytes,
389            storage_budget_horizon_years,
390            storage_budget_target_fraction,
391            storage_budget_rollup_count: 0,
392            auto_capture_mode,
393            auto_capture_redact,
394            auto_capture_max_chars,
395            auto_capture_count: 0,
396            last_temporal_node_id: None,
397            last_file_mtime: if file_existed {
398                std::fs::metadata(path).and_then(|m| m.modified()).ok()
399            } else {
400                None
401            },
402            workspace_manager: super::workspace::WorkspaceManager::new(),
403        };
404
405        if let Some(version) = legacy_version {
406            match migration_policy {
407                StorageMigrationPolicy::Strict => {
408                    return Err(McpError::AgenticMemory(format!(
409                        "Legacy .amem version {} blocked by strict migration policy",
410                        version
411                    )));
412                }
413                StorageMigrationPolicy::Off => {
414                    tracing::warn!(
415                        "Legacy storage version detected (v{}), auto-migration disabled by policy",
416                        version
417                    );
418                }
419                StorageMigrationPolicy::AutoSafe => {
420                    if let Some(checkpoint) = manager.create_migration_checkpoint(version)? {
421                        tracing::info!(
422                            "Legacy storage version detected (v{}), checkpoint created at {}",
423                            version,
424                            checkpoint.display()
425                        );
426                    }
427                    manager.dirty = true;
428                    manager.save()?;
429                    tracing::info!(
430                        "Auto-migrated memory storage from v{} to v{} at {}",
431                        version,
432                        CURRENT_AMEM_VERSION,
433                        manager.file_path.display()
434                    );
435                }
436            }
437        }
438
439        Ok(manager)
440    }
441
442    /// Get an immutable reference to the graph.
443    pub fn graph(&self) -> &MemoryGraph {
444        &self.graph
445    }
446
447    /// Get a mutable reference to the graph and mark as dirty.
448    pub fn graph_mut(&mut self) -> &mut MemoryGraph {
449        self.dirty = true;
450        self.last_activity = Instant::now();
451        self.record_mutation();
452        &mut self.graph
453    }
454
455    /// Get the query engine.
456    pub fn query_engine(&self) -> &QueryEngine {
457        &self.query_engine
458    }
459
460    /// Get the write engine.
461    pub fn write_engine(&self) -> &WriteEngine {
462        &self.write_engine
463    }
464
465    /// Get the workspace manager (immutable).
466    pub fn workspace_manager(&self) -> &super::workspace::WorkspaceManager {
467        &self.workspace_manager
468    }
469
470    /// Get the workspace manager (mutable).
471    pub fn workspace_manager_mut(&mut self) -> &mut super::workspace::WorkspaceManager {
472        &mut self.workspace_manager
473    }
474
475    /// Current session ID.
476    pub fn current_session_id(&self) -> u32 {
477        self.current_session
478    }
479
480    /// Start a new session, optionally with an explicit ID.
481    pub fn start_session(&mut self, explicit_id: Option<u32>) -> McpResult<u32> {
482        let session_id = explicit_id.unwrap_or_else(|| {
483            let ids = self.graph.session_index().session_ids();
484            let max_indexed = ids.iter().copied().max().unwrap_or(0);
485            // Ensure monotonic: new session must be > current session.
486            max_indexed.max(self.current_session).saturating_add(1)
487        });
488
489        self.current_session = session_id;
490        self.last_temporal_node_id = None;
491        self.last_activity = Instant::now();
492        tracing::info!("Started session {session_id}");
493        Ok(session_id)
494    }
495
496    /// End a session and optionally create an episode summary.
497    pub fn end_session_with_episode(&mut self, session_id: u32, summary: &str) -> McpResult<u64> {
498        let episode_id = self
499            .write_engine
500            .compress_session(&mut self.graph, session_id, summary)
501            .map_err(|e| McpError::AgenticMemory(format!("Failed to compress session: {e}")))?;
502
503        self.dirty = true;
504        self.last_activity = Instant::now();
505        self.record_mutation();
506        self.save()?;
507
508        tracing::info!("Ended session {session_id}, created episode node {episode_id}");
509
510        Ok(episode_id)
511    }
512
513    /// Save the graph to file with file-locking for concurrent session safety.
514    ///
515    /// When multiple MCP instances share the same `.amem` file, this method:
516    /// 1. Acquires an exclusive file lock (sidecar `.amem.lock`)
517    /// 2. Checks if the file was modified externally (by another instance)
518    /// 3. If so, re-reads the disk graph and merges our session's new nodes
519    /// 4. Writes the merged graph and releases the lock
520    pub fn save(&mut self) -> McpResult<()> {
521        if !self.dirty {
522            return Ok(());
523        }
524
525        let _lock = FileLock::acquire(&self.file_path)?;
526
527        // Detect external modifications from concurrent sessions.
528        if self.file_path.exists() {
529            let current_mtime = std::fs::metadata(&self.file_path)
530                .and_then(|m| m.modified())
531                .ok();
532            if let (Some(current), Some(last_known)) = (current_mtime, self.last_file_mtime) {
533                if current > last_known {
534                    tracing::info!("Detected external modification, merging with disk state");
535                    self.merge_with_disk()?;
536                }
537            }
538        }
539
540        let writer = AmemWriter::new(self.graph.dimension());
541        writer
542            .write_to_file(&self.graph, &self.file_path)
543            .map_err(|e| McpError::AgenticMemory(format!("Failed to write memory file: {e}")))?;
544
545        // Update our mtime tracking after successful write.
546        self.last_file_mtime = std::fs::metadata(&self.file_path)
547            .and_then(|m| m.modified())
548            .ok();
549
550        self.dirty = false;
551        self.last_save = Instant::now();
552        self.save_generation = self.save_generation.saturating_add(1);
553        tracing::debug!("Saved memory file: {}", self.file_path.display());
554        Ok(())
555    }
556
557    /// Merge our session's nodes/edges with the latest disk state.
558    ///
559    /// This handles the case where another MCP instance wrote to the same file
560    /// since we last read it. We re-read the disk, then re-add our session's
561    /// nodes on top of the latest state.
562    fn merge_with_disk(&mut self) -> McpResult<()> {
563        let disk_graph = AmemReader::read_from_file(&self.file_path)
564            .map_err(|e| McpError::AgenticMemory(format!("Failed to re-read for merge: {e}")))?;
565
566        // Collect our session's nodes (those we created in this process).
567        let our_nodes: Vec<_> = self
568            .graph
569            .nodes()
570            .iter()
571            .filter(|n| n.session_id == self.current_session)
572            .cloned()
573            .collect();
574
575        // Collect edges where source belongs to our session.
576        let our_node_ids: std::collections::HashSet<u64> = our_nodes.iter().map(|n| n.id).collect();
577        let our_edges: Vec<_> = self
578            .graph
579            .edges()
580            .iter()
581            .filter(|e| our_node_ids.contains(&e.source_id) || our_node_ids.contains(&e.target_id))
582            .cloned()
583            .collect();
584
585        // Replace our graph with the latest disk state.
586        self.graph = disk_graph;
587
588        // Re-add our session's nodes with fresh IDs from the merged graph.
589        let mut id_map: HashMap<u64, u64> = HashMap::new();
590        for node in &our_nodes {
591            let event = CognitiveEventBuilder::new(node.event_type, node.content.clone())
592                .session_id(self.current_session)
593                .confidence(node.confidence)
594                .build();
595            let result = self
596                .write_engine
597                .ingest(&mut self.graph, vec![event], vec![])
598                .map_err(|e| McpError::AgenticMemory(format!("Merge node re-add failed: {e}")))?;
599            if let Some(&new_id) = result.new_node_ids.first() {
600                id_map.insert(node.id, new_id);
601            }
602        }
603
604        // Re-add our session's edges with remapped IDs.
605        for edge in &our_edges {
606            let source = id_map
607                .get(&edge.source_id)
608                .copied()
609                .unwrap_or(edge.source_id);
610            let target = id_map
611                .get(&edge.target_id)
612                .copied()
613                .unwrap_or(edge.target_id);
614            let new_edge = Edge::new(source, target, edge.edge_type, edge.weight);
615            if let Err(e) = self.graph.add_edge(new_edge) {
616                tracing::warn!("Merge edge re-add skipped: {e}");
617            }
618        }
619
620        tracing::info!(
621            "Merged {} nodes and {} edges from session {} into disk state",
622            our_nodes.len(),
623            our_edges.len(),
624            self.current_session
625        );
626        Ok(())
627    }
628
629    /// Check if auto-save is needed and save if so.
630    pub fn maybe_auto_save(&mut self) -> McpResult<()> {
631        if self.dirty && self.last_save.elapsed() >= self.auto_save_interval {
632            self.save()?;
633        }
634        Ok(())
635    }
636
637    /// Runs autonomous maintenance: sleep-cycle, auto-save, and periodic backup.
638    pub fn run_maintenance_tick(&mut self) -> McpResult<()> {
639        if self.should_throttle_maintenance() {
640            self.maintenance_throttle_count = self.maintenance_throttle_count.saturating_add(1);
641            self.maybe_auto_save()?;
642            self.emit_health_ledger("throttled")?;
643            tracing::debug!(
644                "Maintenance throttled by SLA guard: mutation_rate={} threshold={}",
645                self.mutation_rate_per_min(),
646                self.sla_max_mutations_per_min
647            );
648            return Ok(());
649        }
650
651        self.maybe_run_sleep_cycle()?;
652        self.maybe_auto_save()?;
653        self.maybe_enforce_storage_budget()?;
654        self.maybe_auto_backup()?;
655        self.emit_health_ledger("normal")?;
656        Ok(())
657    }
658
659    /// Run a periodic sleep-cycle: decay refresh + tier balancing + auto-archive.
660    pub fn maybe_run_sleep_cycle(&mut self) -> McpResult<()> {
661        if self.last_sleep_cycle.elapsed() < self.sleep_cycle_interval {
662            return Ok(());
663        }
664        if self.last_activity.elapsed() < self.sleep_idle_min {
665            return Ok(());
666        }
667
668        let now = agentic_memory::now_micros();
669        let decay_report = self
670            .write_engine
671            .run_decay(&mut self.graph, now)
672            .map_err(|e| McpError::AgenticMemory(format!("Sleep-cycle decay failed: {e}")))?;
673        let archived_sessions = self.auto_archive_completed_sessions()?;
674
675        if decay_report.nodes_decayed > 0 || archived_sessions > 0 {
676            self.dirty = true;
677            self.save()?;
678        }
679
680        let (hot, warm, cold) = self.tier_counts();
681        self.last_sleep_cycle = Instant::now();
682        tracing::info!(
683            "Sleep-cycle complete: decayed={} archived_sessions={} tiers(h/w/c)={}/{}/{}",
684            decay_report.nodes_decayed,
685            archived_sessions,
686            hot,
687            warm,
688            cold
689        );
690        Ok(())
691    }
692
693    /// Periodic backup of persisted state with retention pruning.
694    pub fn maybe_auto_backup(&mut self) -> McpResult<()> {
695        if self.last_backup.elapsed() < self.backup_interval {
696            return Ok(());
697        }
698        if self.save_generation <= self.last_backup_generation {
699            return Ok(());
700        }
701        if !self.file_path.exists() {
702            return Ok(());
703        }
704
705        std::fs::create_dir_all(&self.backups_dir).map_err(McpError::Io)?;
706        let backup_path = self.next_backup_path();
707        std::fs::copy(&self.file_path, &backup_path).map_err(McpError::Io)?;
708        self.last_backup_generation = self.save_generation;
709        self.last_backup = Instant::now();
710        self.prune_old_backups()?;
711        tracing::info!("Auto-backup written: {}", backup_path.display());
712        Ok(())
713    }
714
715    /// Mark the graph as dirty (needs saving).
716    pub fn mark_dirty(&mut self) {
717        self.dirty = true;
718        self.last_activity = Instant::now();
719        self.record_mutation();
720    }
721
722    /// Get the file path.
723    pub fn file_path(&self) -> &PathBuf {
724        &self.file_path
725    }
726
727    /// The ID of the most recent node in the temporal chain for this session.
728    pub fn last_temporal_node_id(&self) -> Option<u64> {
729        self.last_temporal_node_id
730    }
731
732    /// Advance the temporal chain pointer to the given node ID.
733    pub fn advance_temporal_chain(&mut self, node_id: u64) {
734        self.last_temporal_node_id = Some(node_id);
735    }
736
737    /// Create a TemporalNext edge from `prev_id` to `next_id` (forward in time).
738    pub fn link_temporal(&mut self, prev_id: u64, next_id: u64) -> McpResult<()> {
739        let edge = Edge::new(prev_id, next_id, EdgeType::TemporalNext, 1.0);
740        self.graph
741            .add_edge(edge)
742            .map_err(|e| McpError::AgenticMemory(format!("Failed to add temporal edge: {e}")))?;
743        self.dirty = true;
744        Ok(())
745    }
746
747    /// Background maintenance loop interval.
748    pub fn maintenance_interval(&self) -> Duration {
749        self.auto_save_interval
750            .min(self.backup_interval)
751            .min(self.sleep_cycle_interval)
752    }
753
754    /// Capture a prompt template invocation (`prompts/get`) into memory.
755    pub fn capture_prompt_request(
756        &mut self,
757        prompt_name: &str,
758        arguments: Option<&Value>,
759    ) -> McpResult<Option<u64>> {
760        if self.auto_capture_mode == AutoCaptureMode::Off {
761            return Ok(None);
762        }
763        match extract_prompt_capture_text(prompt_name, arguments)? {
764            Some(text) => self.persist_auto_capture(EventType::Fact, &text, 0.90),
765            None => Ok(None),
766        }
767    }
768
769    /// Capture a tool call input context into memory based on capture mode.
770    pub fn capture_tool_call(
771        &mut self,
772        tool_name: &str,
773        arguments: Option<&Value>,
774    ) -> McpResult<Option<u64>> {
775        if self.auto_capture_mode == AutoCaptureMode::Off {
776            return Ok(None);
777        }
778
779        let text = match self.auto_capture_mode {
780            AutoCaptureMode::Safe => extract_safe_tool_capture_text(tool_name, arguments)?,
781            AutoCaptureMode::Full => extract_full_tool_capture_text(tool_name, arguments)?,
782            AutoCaptureMode::Off => None,
783        };
784        match text {
785            Some(v) => self.persist_auto_capture(EventType::Inference, &v, 0.82),
786            None => Ok(None),
787        }
788    }
789
790    /// Add a cognitive event to the graph.
791    pub fn add_event(
792        &mut self,
793        event_type: EventType,
794        content: &str,
795        confidence: f32,
796        edges: Vec<(u64, EdgeType, f32)>,
797    ) -> McpResult<(u64, usize)> {
798        let event = CognitiveEventBuilder::new(event_type, content.to_string())
799            .session_id(self.current_session)
800            .confidence(confidence)
801            .build();
802
803        // First, add the node to get its assigned ID
804        let result = self
805            .write_engine
806            .ingest(&mut self.graph, vec![event], vec![])
807            .map_err(|e| McpError::AgenticMemory(format!("Failed to add event: {e}")))?;
808
809        let node_id = result.new_node_ids.first().copied().ok_or_else(|| {
810            McpError::InternalError("No node ID returned from ingest".to_string())
811        })?;
812
813        // Then add edges with the correct source_id
814        let mut edge_count = 0;
815        for (target_id, edge_type, weight) in &edges {
816            let edge = Edge::new(node_id, *target_id, *edge_type, *weight);
817            self.graph
818                .add_edge(edge)
819                .map_err(|e| McpError::AgenticMemory(format!("Failed to add edge: {e}")))?;
820            edge_count += 1;
821        }
822
823        self.dirty = true;
824        self.last_activity = Instant::now();
825        self.record_mutation();
826        self.maybe_auto_save()?;
827
828        Ok((node_id, edge_count))
829    }
830
831    /// Correct a previous belief.
832    pub fn correct_node(&mut self, old_node_id: u64, new_content: &str) -> McpResult<u64> {
833        let new_id = self
834            .write_engine
835            .correct(
836                &mut self.graph,
837                old_node_id,
838                new_content,
839                self.current_session,
840            )
841            .map_err(|e| McpError::AgenticMemory(format!("Failed to correct node: {e}")))?;
842
843        self.dirty = true;
844        self.last_activity = Instant::now();
845        self.record_mutation();
846        self.maybe_auto_save()?;
847
848        Ok(new_id)
849    }
850
851    fn record_mutation(&mut self) {
852        if self.mutation_window_started.elapsed() >= Duration::from_secs(60) {
853            self.mutation_window_started = Instant::now();
854            self.mutation_window_count = 0;
855        }
856        self.mutation_window_count = self.mutation_window_count.saturating_add(1);
857    }
858
859    fn mutation_rate_per_min(&self) -> u32 {
860        let elapsed = self.mutation_window_started.elapsed().as_secs().max(1);
861        let scaled = (self.mutation_window_count as u64)
862            .saturating_mul(60)
863            .saturating_div(elapsed);
864        scaled.min(u32::MAX as u64) as u32
865    }
866
867    fn should_throttle_maintenance(&self) -> bool {
868        self.mutation_rate_per_min() > self.sla_max_mutations_per_min
869    }
870
871    fn emit_health_ledger(&mut self, maintenance_mode: &str) -> McpResult<()> {
872        if self.last_health_ledger_emit.elapsed() < self.health_ledger_emit_interval {
873            return Ok(());
874        }
875
876        let dir = resolve_health_ledger_dir();
877        std::fs::create_dir_all(&dir).map_err(McpError::Io)?;
878        let path = dir.join("agentic-memory.json");
879        let tmp = dir.join("agentic-memory.json.tmp");
880        let (hot, warm, cold) = self.tier_counts();
881        let current_size_bytes = self.current_file_size_bytes();
882        let projected_size_bytes = self.projected_file_size_bytes(current_size_bytes);
883        let over_budget = current_size_bytes > self.storage_budget_max_bytes
884            || projected_size_bytes
885                .map(|v| v > self.storage_budget_max_bytes)
886                .unwrap_or(false);
887        let payload = serde_json::json!({
888            "project": "AgenticMemory",
889            "timestamp": chrono::Utc::now().to_rfc3339(),
890            "status": "ok",
891            "autonomic": {
892                "profile": self.profile.as_str(),
893                "migration_policy": self.migration_policy.as_str(),
894                "maintenance_mode": maintenance_mode,
895                "throttle_count": self.maintenance_throttle_count,
896            },
897            "sla": {
898                "mutation_rate_per_min": self.mutation_rate_per_min(),
899                "max_mutations_per_min": self.sla_max_mutations_per_min
900            },
901            "storage": {
902                "file": self.file_path.display().to_string(),
903                "dirty": self.dirty,
904                "save_generation": self.save_generation,
905                "backup_retention": self.backup_retention,
906            },
907            "storage_budget": {
908                "mode": self.storage_budget_mode.as_str(),
909                "max_bytes": self.storage_budget_max_bytes,
910                "horizon_years": self.storage_budget_horizon_years,
911                "target_fraction": self.storage_budget_target_fraction,
912                "current_size_bytes": current_size_bytes,
913                "projected_size_bytes": projected_size_bytes,
914                "over_budget": over_budget,
915                "rollup_count": self.storage_budget_rollup_count,
916            },
917            "auto_capture": {
918                "mode": self.auto_capture_mode.as_str(),
919                "redact": self.auto_capture_redact,
920                "max_chars": self.auto_capture_max_chars,
921                "captured_count": self.auto_capture_count
922            },
923            "graph": {
924                "nodes": self.graph.node_count(),
925                "edges": self.graph.edge_count(),
926                "tiers": {
927                    "hot": hot,
928                    "warm": warm,
929                    "cold": cold,
930                },
931            },
932        });
933        let bytes = serde_json::to_vec_pretty(&payload).map_err(|e| {
934            McpError::AgenticMemory(format!("Failed to encode health ledger payload: {e}"))
935        })?;
936        std::fs::write(&tmp, bytes).map_err(McpError::Io)?;
937        std::fs::rename(&tmp, &path).map_err(McpError::Io)?;
938        self.last_health_ledger_emit = Instant::now();
939        Ok(())
940    }
941}
942
943impl Drop for SessionManager {
944    fn drop(&mut self) {
945        if self.dirty {
946            if let Err(e) = self.save() {
947                tracing::error!("Failed to save on drop: {e}");
948            }
949        }
950        if let Err(e) = self.maybe_auto_backup() {
951            tracing::error!("Failed auto-backup on drop: {e}");
952        }
953    }
954}
955
956impl SessionManager {
957    fn auto_archive_completed_sessions(&mut self) -> McpResult<usize> {
958        self.auto_archive_completed_sessions_with_min(self.archive_min_session_nodes)
959    }
960
961    fn auto_archive_completed_sessions_with_min(
962        &mut self,
963        min_session_nodes: usize,
964    ) -> McpResult<usize> {
965        let mut session_ids = self.graph.session_index().session_ids();
966        session_ids.sort_unstable();
967
968        let mut archived = 0usize;
969        for session_id in session_ids {
970            if session_id >= self.current_session {
971                continue;
972            }
973
974            let node_ids = self.graph.session_index().get_session(session_id).to_vec();
975            if node_ids.is_empty() {
976                continue;
977            }
978
979            let mut has_episode = false;
980            let mut event_nodes = 0usize;
981            let mut hot = 0usize;
982            let mut warm = 0usize;
983            let mut cold = 0usize;
984
985            for node_id in &node_ids {
986                if let Some(node) = self.graph.get_node(*node_id) {
987                    if node.event_type == EventType::Episode {
988                        has_episode = true;
989                        continue;
990                    }
991                    event_nodes += 1;
992                    if node.decay_score >= self.hot_min_decay {
993                        hot += 1;
994                    } else if node.decay_score >= self.warm_min_decay {
995                        warm += 1;
996                    } else {
997                        cold += 1;
998                    }
999                }
1000            }
1001
1002            if has_episode || event_nodes < min_session_nodes {
1003                continue;
1004            }
1005
1006            let summary = format!(
1007                "Auto-archive session {}: {} events ({} hot / {} warm / {} cold)",
1008                session_id, event_nodes, hot, warm, cold
1009            );
1010            self.write_engine
1011                .compress_session(&mut self.graph, session_id, &summary)
1012                .map_err(|e| {
1013                    McpError::AgenticMemory(format!(
1014                        "Auto-archive failed for session {session_id}: {e}"
1015                    ))
1016                })?;
1017            archived = archived.saturating_add(1);
1018        }
1019
1020        Ok(archived)
1021    }
1022
1023    fn maybe_enforce_storage_budget(&mut self) -> McpResult<()> {
1024        if self.storage_budget_mode == StorageBudgetMode::Off {
1025            return Ok(());
1026        }
1027
1028        let current_size = self.current_file_size_bytes();
1029        if current_size == 0 {
1030            return Ok(());
1031        }
1032        let projected = self.projected_file_size_bytes(current_size);
1033        let over_current = current_size > self.storage_budget_max_bytes;
1034        let over_projected = projected
1035            .map(|v| v > self.storage_budget_max_bytes)
1036            .unwrap_or(false);
1037
1038        if !over_current && !over_projected {
1039            return Ok(());
1040        }
1041
1042        if self.storage_budget_mode == StorageBudgetMode::Warn {
1043            tracing::warn!(
1044                "Storage budget warning: current={} projected={:?} budget={} (mode=warn)",
1045                current_size,
1046                projected,
1047                self.storage_budget_max_bytes
1048            );
1049            return Ok(());
1050        }
1051
1052        let target_bytes = ((self.storage_budget_max_bytes as f64
1053            * self.storage_budget_target_fraction as f64)
1054            .round() as u64)
1055            .max(1);
1056        let mut rollup_count = 0usize;
1057        let mut threshold = self.archive_min_session_nodes.saturating_div(2).max(1);
1058
1059        for _ in 0..3 {
1060            let archived = self.auto_archive_completed_sessions_with_min(threshold)?;
1061            if archived == 0 {
1062                if threshold > 1 {
1063                    threshold = 1;
1064                    continue;
1065                }
1066                break;
1067            }
1068            rollup_count += archived;
1069            self.dirty = true;
1070            self.save()?;
1071            let new_size = self.current_file_size_bytes();
1072            if new_size <= target_bytes {
1073                break;
1074            }
1075            threshold = 1;
1076        }
1077
1078        if rollup_count > 0 {
1079            self.storage_budget_rollup_count = self
1080                .storage_budget_rollup_count
1081                .saturating_add(rollup_count as u64);
1082            tracing::info!(
1083                "Storage budget rollup applied: archived_sessions={} budget={} target={} current={}",
1084                rollup_count,
1085                self.storage_budget_max_bytes,
1086                target_bytes,
1087                self.current_file_size_bytes()
1088            );
1089        } else {
1090            tracing::warn!(
1091                "Storage budget exceeded but no completed sessions eligible for rollup (current={} projected={:?} budget={})",
1092                current_size,
1093                projected,
1094                self.storage_budget_max_bytes
1095            );
1096        }
1097
1098        Ok(())
1099    }
1100
1101    fn current_file_size_bytes(&self) -> u64 {
1102        std::fs::metadata(&self.file_path)
1103            .map(|m| m.len())
1104            .unwrap_or(0)
1105    }
1106
1107    fn persist_auto_capture(
1108        &mut self,
1109        event_type: EventType,
1110        raw_text: &str,
1111        confidence: f32,
1112    ) -> McpResult<Option<u64>> {
1113        let mut text = raw_text.trim().to_string();
1114        if text.is_empty() {
1115            return Ok(None);
1116        }
1117
1118        if self.auto_capture_redact {
1119            text = redact_sensitive_tokens(&text);
1120        }
1121
1122        if text.len() > self.auto_capture_max_chars {
1123            text.truncate(self.auto_capture_max_chars);
1124            text.push_str(" …[truncated]");
1125        }
1126
1127        let prev_id = self.last_temporal_node_id;
1128        let (node_id, _) = self.add_event(event_type, &text, confidence, vec![])?;
1129
1130        // Chain this capture to the previous node in the session's temporal thread.
1131        if let Some(prev) = prev_id {
1132            if let Err(e) = self.link_temporal(prev, node_id) {
1133                tracing::warn!("Failed to link temporal chain: {e}");
1134            }
1135        }
1136        self.last_temporal_node_id = Some(node_id);
1137
1138        self.auto_capture_count = self.auto_capture_count.saturating_add(1);
1139        Ok(Some(node_id))
1140    }
1141
1142    fn projected_file_size_bytes(&self, current_size: u64) -> Option<u64> {
1143        if current_size == 0 || self.graph.node_count() < 2 {
1144            return None;
1145        }
1146        let mut min_ts = u64::MAX;
1147        let mut max_ts = 0u64;
1148        for node in self.graph.nodes() {
1149            min_ts = min_ts.min(node.created_at);
1150            max_ts = max_ts.max(node.created_at);
1151        }
1152        if min_ts == u64::MAX || max_ts <= min_ts {
1153            return None;
1154        }
1155
1156        let span_secs_raw = (max_ts - min_ts) / 1_000_000;
1157        // Clamp to at least one week to avoid unstable extrapolation on tiny windows.
1158        let span_secs = span_secs_raw.max(7 * 24 * 3600) as f64;
1159        let per_sec = current_size as f64 / span_secs;
1160        let horizon_secs = (self.storage_budget_horizon_years as f64) * 365.25 * 24.0 * 3600.0;
1161        let projected = (per_sec * horizon_secs).round();
1162        Some(projected.max(0.0).min(u64::MAX as f64) as u64)
1163    }
1164
1165    fn tier_counts(&self) -> (usize, usize, usize) {
1166        let mut hot = 0usize;
1167        let mut warm = 0usize;
1168        let mut cold = 0usize;
1169
1170        for node in self.graph.nodes() {
1171            if node.event_type == EventType::Episode {
1172                continue;
1173            }
1174            if node.decay_score >= self.hot_min_decay {
1175                hot += 1;
1176            } else if node.decay_score >= self.warm_min_decay {
1177                warm += 1;
1178            } else {
1179                cold += 1;
1180            }
1181        }
1182
1183        (hot, warm, cold)
1184    }
1185
1186    fn create_migration_checkpoint(&self, from_version: u32) -> McpResult<Option<PathBuf>> {
1187        if !self.file_path.exists() {
1188            return Ok(None);
1189        }
1190
1191        let migration_dir = resolve_migration_dir(&self.file_path);
1192        std::fs::create_dir_all(&migration_dir).map_err(McpError::Io)?;
1193
1194        let ts = chrono::Utc::now().format("%Y%m%d%H%M%S");
1195        let stem = self
1196            .file_path
1197            .file_stem()
1198            .and_then(OsStr::to_str)
1199            .unwrap_or("brain");
1200        let checkpoint = migration_dir.join(format!("{stem}.v{from_version}.{ts}.amem.checkpoint"));
1201        std::fs::copy(&self.file_path, &checkpoint).map_err(McpError::Io)?;
1202        Ok(Some(checkpoint))
1203    }
1204
1205    fn next_backup_path(&self) -> PathBuf {
1206        let ts = chrono::Utc::now().format("%Y%m%d%H%M%S");
1207        let stem = self
1208            .file_path
1209            .file_stem()
1210            .and_then(OsStr::to_str)
1211            .unwrap_or("brain");
1212        self.backups_dir.join(format!("{stem}.{ts}.amem.bak"))
1213    }
1214
1215    fn prune_old_backups(&self) -> McpResult<()> {
1216        let mut entries = std::fs::read_dir(&self.backups_dir)
1217            .map_err(McpError::Io)?
1218            .filter_map(Result::ok)
1219            .filter(|entry| {
1220                entry
1221                    .file_name()
1222                    .to_str()
1223                    .map(|name| name.ends_with(".amem.bak"))
1224                    .unwrap_or(false)
1225            })
1226            .collect::<Vec<_>>();
1227
1228        if entries.len() <= self.backup_retention {
1229            return Ok(());
1230        }
1231
1232        entries.sort_by_key(|entry| {
1233            entry
1234                .metadata()
1235                .and_then(|m| m.modified())
1236                .ok()
1237                .unwrap_or(std::time::SystemTime::UNIX_EPOCH)
1238        });
1239        let to_remove = entries.len().saturating_sub(self.backup_retention);
1240        for entry in entries.into_iter().take(to_remove) {
1241            let _ = std::fs::remove_file(entry.path());
1242        }
1243        Ok(())
1244    }
1245}
1246
1247fn resolve_backups_dir(memory_path: &std::path::Path) -> PathBuf {
1248    if let Ok(custom) = std::env::var("AMEM_AUTO_BACKUP_DIR") {
1249        let trimmed = custom.trim();
1250        if !trimmed.is_empty() {
1251            return PathBuf::from(trimmed);
1252        }
1253    }
1254
1255    let parent = memory_path.parent().unwrap_or(std::path::Path::new("."));
1256    parent.join(".amem-backups")
1257}
1258
1259fn resolve_migration_dir(memory_path: &Path) -> PathBuf {
1260    let parent = memory_path.parent().unwrap_or(std::path::Path::new("."));
1261    parent.join(".amem-migrations")
1262}
1263
1264fn read_storage_version(path: &Path) -> Option<u32> {
1265    let mut file = std::fs::File::open(path).ok()?;
1266    let mut header = [0u8; 8];
1267    file.read_exact(&mut header).ok()?;
1268    if &header[0..4] != b"AMEM" {
1269        return None;
1270    }
1271    Some(u32::from_le_bytes([
1272        header[4], header[5], header[6], header[7],
1273    ]))
1274}
1275
1276fn read_env_u64(name: &str, default_value: u64) -> u64 {
1277    std::env::var(name)
1278        .ok()
1279        .and_then(|v| v.parse::<u64>().ok())
1280        .unwrap_or(default_value)
1281}
1282
1283fn read_env_u32(name: &str, default_value: u32) -> u32 {
1284    std::env::var(name)
1285        .ok()
1286        .and_then(|v| v.parse::<u32>().ok())
1287        .unwrap_or(default_value)
1288}
1289
1290fn read_env_usize(name: &str, default_value: usize) -> usize {
1291    std::env::var(name)
1292        .ok()
1293        .and_then(|v| v.parse::<usize>().ok())
1294        .unwrap_or(default_value)
1295}
1296
1297fn read_env_f32(name: &str, default_value: f32) -> f32 {
1298    std::env::var(name)
1299        .ok()
1300        .and_then(|v| v.parse::<f32>().ok())
1301        .unwrap_or(default_value)
1302}
1303
1304fn read_env_bool(name: &str, default_value: bool) -> bool {
1305    std::env::var(name)
1306        .ok()
1307        .map(|v| {
1308            matches!(
1309                v.trim().to_ascii_lowercase().as_str(),
1310                "1" | "true" | "yes" | "on"
1311            )
1312        })
1313        .unwrap_or(default_value)
1314}
1315
1316fn read_env_string(name: &str) -> Option<String> {
1317    std::env::var(name).ok().map(|v| v.trim().to_string())
1318}
1319
1320fn resolve_health_ledger_dir() -> PathBuf {
1321    if let Some(custom) = read_env_string("AMEM_HEALTH_LEDGER_DIR") {
1322        if !custom.is_empty() {
1323            return PathBuf::from(custom);
1324        }
1325    }
1326    if let Some(custom) = read_env_string("AGENTRA_HEALTH_LEDGER_DIR") {
1327        if !custom.is_empty() {
1328            return PathBuf::from(custom);
1329        }
1330    }
1331
1332    let home = std::env::var("HOME")
1333        .ok()
1334        .map(PathBuf::from)
1335        .unwrap_or_else(|| PathBuf::from("."));
1336    home.join(".agentra").join("health-ledger")
1337}
1338
1339fn extract_prompt_capture_text(
1340    prompt_name: &str,
1341    arguments: Option<&Value>,
1342) -> McpResult<Option<String>> {
1343    let args = arguments.unwrap_or(&Value::Null);
1344    let fields = collect_text_fields_by_keys(
1345        args,
1346        &[
1347            "information",
1348            "context",
1349            "topic",
1350            "old_belief",
1351            "new_information",
1352            "reason",
1353            "summary",
1354            "instruction",
1355            "prompt",
1356            "query",
1357        ],
1358        8,
1359    );
1360    if fields.is_empty() {
1361        return Ok(None);
1362    }
1363    let joined = fields.join(" | ");
1364    Ok(Some(format!(
1365        "[auto-capture][prompt] template={prompt_name} input={joined}"
1366    )))
1367}
1368
1369fn extract_safe_tool_capture_text(
1370    tool_name: &str,
1371    arguments: Option<&Value>,
1372) -> McpResult<Option<String>> {
1373    let args = arguments.unwrap_or(&Value::Null);
1374    let keys = ["feedback", "summary", "note"];
1375    if tool_name != "session_end" {
1376        // Keep safe mode low-noise and non-invasive: only capture explicit feedback fields.
1377        let explicit_feedback = collect_text_fields_by_keys(args, &["feedback", "note"], 4);
1378        if explicit_feedback.is_empty() {
1379            return Ok(None);
1380        }
1381    }
1382    let fields = collect_text_fields_by_keys(args, &keys, 6);
1383    if fields.is_empty() {
1384        return Ok(None);
1385    }
1386    Ok(Some(format!(
1387        "[auto-capture][feedback] tool={tool_name} context={}",
1388        fields.join(" | ")
1389    )))
1390}
1391
1392fn extract_full_tool_capture_text(
1393    tool_name: &str,
1394    arguments: Option<&Value>,
1395) -> McpResult<Option<String>> {
1396    if tool_name == "memory_add" {
1397        return Ok(None);
1398    }
1399    let args = arguments.unwrap_or(&Value::Null);
1400    let preferred = collect_text_fields_by_keys(
1401        args,
1402        &[
1403            "query",
1404            "content",
1405            "prompt",
1406            "new_content",
1407            "reason",
1408            "summary",
1409            "topic",
1410            "instruction",
1411            "information",
1412            "context",
1413            "feedback",
1414        ],
1415        10,
1416    );
1417
1418    let fields = if preferred.is_empty() {
1419        collect_all_string_like_fields(args, 8)
1420    } else {
1421        preferred
1422    };
1423
1424    if fields.is_empty() {
1425        return Ok(None);
1426    }
1427    Ok(Some(format!(
1428        "[auto-capture][tool] tool={tool_name} input={}",
1429        fields.join(" | ")
1430    )))
1431}
1432
1433fn collect_text_fields_by_keys(value: &Value, keys: &[&str], limit: usize) -> Vec<String> {
1434    let mut out = Vec::new();
1435    let mut seen = std::collections::BTreeSet::<String>::new();
1436
1437    fn walk(
1438        value: &Value,
1439        path: String,
1440        keys: &[&str],
1441        out: &mut Vec<String>,
1442        seen: &mut std::collections::BTreeSet<String>,
1443        limit: usize,
1444    ) {
1445        if out.len() >= limit {
1446            return;
1447        }
1448        match value {
1449            Value::Object(map) => {
1450                for (k, v) in map {
1451                    if out.len() >= limit {
1452                        break;
1453                    }
1454                    let next = if path.is_empty() {
1455                        k.to_string()
1456                    } else {
1457                        format!("{path}.{k}")
1458                    };
1459                    let key_match = keys
1460                        .iter()
1461                        .any(|needle| k.eq_ignore_ascii_case(needle) || next.ends_with(needle));
1462                    if key_match {
1463                        if let Some(s) = value_to_compact_string(v) {
1464                            let entry = format!("{next}={s}");
1465                            if seen.insert(entry.clone()) {
1466                                out.push(entry);
1467                            }
1468                        }
1469                    }
1470                    walk(v, next, keys, out, seen, limit);
1471                }
1472            }
1473            Value::Array(items) => {
1474                for (idx, item) in items.iter().enumerate() {
1475                    if out.len() >= limit {
1476                        break;
1477                    }
1478                    let next = format!("{path}[{idx}]");
1479                    walk(item, next, keys, out, seen, limit);
1480                }
1481            }
1482            _ => {}
1483        }
1484    }
1485
1486    walk(value, String::new(), keys, &mut out, &mut seen, limit);
1487    out
1488}
1489
1490fn collect_all_string_like_fields(value: &Value, limit: usize) -> Vec<String> {
1491    let mut out = Vec::new();
1492    fn walk(value: &Value, path: String, out: &mut Vec<String>, limit: usize) {
1493        if out.len() >= limit {
1494            return;
1495        }
1496        match value {
1497            Value::Object(map) => {
1498                for (k, v) in map {
1499                    if out.len() >= limit {
1500                        break;
1501                    }
1502                    let next = if path.is_empty() {
1503                        k.to_string()
1504                    } else {
1505                        format!("{path}.{k}")
1506                    };
1507                    walk(v, next, out, limit);
1508                }
1509            }
1510            Value::Array(items) => {
1511                for (idx, item) in items.iter().enumerate() {
1512                    if out.len() >= limit {
1513                        break;
1514                    }
1515                    walk(item, format!("{path}[{idx}]"), out, limit);
1516                }
1517            }
1518            _ => {
1519                if let Some(s) = value_to_compact_string(value) {
1520                    out.push(format!("{path}={s}"));
1521                }
1522            }
1523        }
1524    }
1525    walk(value, String::new(), &mut out, limit);
1526    out
1527}
1528
1529fn value_to_compact_string(value: &Value) -> Option<String> {
1530    match value {
1531        Value::String(s) => {
1532            let trimmed = s.trim();
1533            if trimmed.is_empty() {
1534                None
1535            } else {
1536                Some(trimmed.to_string())
1537            }
1538        }
1539        Value::Number(n) => Some(n.to_string()),
1540        Value::Bool(b) => Some(b.to_string()),
1541        Value::Null => None,
1542        Value::Array(arr) => {
1543            if arr.is_empty() {
1544                None
1545            } else {
1546                Some(format!("<array:{}>", arr.len()))
1547            }
1548        }
1549        Value::Object(map) => {
1550            if map.is_empty() {
1551                None
1552            } else {
1553                Some(format!("<object:{}>", map.len()))
1554            }
1555        }
1556    }
1557}
1558
1559fn redact_sensitive_tokens(text: &str) -> String {
1560    text.split_whitespace()
1561        .map(redact_token)
1562        .collect::<Vec<_>>()
1563        .join(" ")
1564}
1565
1566fn redact_token(token: &str) -> String {
1567    let trimmed = token.trim_matches(|c: char| c == '"' || c == '\'' || c == ',' || c == ';');
1568    let lower = trimmed.to_ascii_lowercase();
1569    if trimmed.starts_with("/Users/")
1570        || trimmed.starts_with("C:\\Users\\")
1571        || trimmed.contains("/Users/")
1572        || trimmed.contains("C:\\Users\\")
1573    {
1574        return "[REDACTED_PATH]".to_string();
1575    }
1576    if trimmed.contains('@') && trimmed.contains('.') {
1577        return "[REDACTED_EMAIL]".to_string();
1578    }
1579    if lower.starts_with("sk-")
1580        || lower.contains("api_key")
1581        || lower.contains("access_token")
1582        || lower.contains("bearer")
1583        || lower.contains("authorization")
1584    {
1585        return "[REDACTED_SECRET]".to_string();
1586    }
1587    if looks_like_long_secret(trimmed) {
1588        return "[REDACTED_SECRET]".to_string();
1589    }
1590    token.to_string()
1591}
1592
1593fn looks_like_long_secret(token: &str) -> bool {
1594    if token.len() < 24 {
1595        return false;
1596    }
1597    token
1598        .chars()
1599        .all(|c| c.is_ascii_alphanumeric() || c == '_' || c == '-' || c == '.')
1600}
1601
1602/// File-based exclusive lock for concurrent `.amem` access.
1603///
1604/// Uses a sidecar `.amem.lock` file with `create_new` (O_EXCL) for atomic
1605/// creation. Stale locks older than 60 seconds are auto-cleaned. The lock
1606/// is released on drop.
1607struct FileLock {
1608    lock_path: PathBuf,
1609}
1610
1611impl FileLock {
1612    /// Acquire an exclusive lock for the given data file.
1613    /// Spins with a 50ms backoff until the lock is available.
1614    fn acquire(data_path: &Path) -> McpResult<Self> {
1615        let lock_path = data_path.with_extension("amem.lock");
1616        let stale_threshold = Duration::from_secs(60);
1617        let max_attempts = 200; // 200 * 50ms = 10 seconds max wait
1618
1619        for attempt in 0..max_attempts {
1620            match OpenOptions::new()
1621                .write(true)
1622                .create_new(true)
1623                .open(&lock_path)
1624            {
1625                Ok(_file) => {
1626                    if attempt > 0 {
1627                        tracing::debug!(
1628                            "Acquired file lock after {} attempts: {}",
1629                            attempt + 1,
1630                            lock_path.display()
1631                        );
1632                    }
1633                    return Ok(FileLock { lock_path });
1634                }
1635                Err(e) if e.kind() == std::io::ErrorKind::AlreadyExists => {
1636                    // Check if the lock is stale (owner crashed).
1637                    if let Ok(meta) = std::fs::metadata(&lock_path) {
1638                        let is_stale = meta
1639                            .modified()
1640                            .ok()
1641                            .and_then(|m| m.elapsed().ok())
1642                            .map(|age| age > stale_threshold)
1643                            .unwrap_or(false);
1644                        if is_stale {
1645                            tracing::warn!("Removing stale lock file: {}", lock_path.display());
1646                            let _ = std::fs::remove_file(&lock_path);
1647                            continue;
1648                        }
1649                    }
1650                    std::thread::sleep(Duration::from_millis(50));
1651                }
1652                Err(e) => {
1653                    return Err(McpError::Io(e));
1654                }
1655            }
1656        }
1657
1658        Err(McpError::AgenticMemory(format!(
1659            "Timed out waiting for file lock: {}",
1660            lock_path.display()
1661        )))
1662    }
1663}
1664
1665impl Drop for FileLock {
1666    fn drop(&mut self) {
1667        let _ = std::fs::remove_file(&self.lock_path);
1668    }
1669}
1670
1671#[cfg(test)]
1672mod tests {
1673    use super::*;
1674    use serde_json::json;
1675
1676    #[test]
1677    fn budget_projection_available_with_timeline() {
1678        let dir = tempfile::tempdir().expect("tempdir");
1679        let brain = dir.path().join("projection.amem");
1680        let mut manager = SessionManager::open(brain.to_str().expect("path")).expect("open");
1681
1682        let (id_a, _) = manager
1683            .add_event(EventType::Fact, "old fact", 0.9, vec![])
1684            .expect("add fact");
1685        let (_id_b, _) = manager
1686            .add_event(EventType::Fact, "new fact", 0.9, vec![])
1687            .expect("add fact");
1688
1689        {
1690            let graph = manager.graph_mut();
1691            let old = graph.get_node_mut(id_a).expect("node");
1692            old.created_at = old.created_at.saturating_sub(15 * 24 * 3600 * 1_000_000);
1693        }
1694        manager.save().expect("save");
1695        let size = manager.current_file_size_bytes();
1696        let projected = manager.projected_file_size_bytes(size);
1697        assert!(size > 0);
1698        assert!(projected.is_some());
1699    }
1700
1701    #[test]
1702    fn budget_auto_rollup_archives_completed_session() {
1703        let dir = tempfile::tempdir().expect("tempdir");
1704        let brain = dir.path().join("rollup.amem");
1705        let mut manager = SessionManager::open(brain.to_str().expect("path")).expect("open");
1706
1707        // Build current session with enough content, then advance so it becomes completed.
1708        let _ = manager
1709            .add_event(EventType::Fact, "alpha", 0.8, vec![])
1710            .expect("add");
1711        let _ = manager
1712            .add_event(EventType::Decision, "beta", 0.9, vec![])
1713            .expect("add");
1714        manager.start_session(None).expect("session");
1715        manager.save().expect("save");
1716
1717        // Force tiny budget to trigger rollup.
1718        manager.storage_budget_mode = StorageBudgetMode::AutoRollup;
1719        manager.storage_budget_max_bytes = 1;
1720        manager.storage_budget_target_fraction = 0.5;
1721
1722        manager
1723            .maybe_enforce_storage_budget()
1724            .expect("enforce budget");
1725
1726        let episode_count = manager
1727            .graph()
1728            .nodes()
1729            .iter()
1730            .filter(|n| n.event_type == EventType::Episode)
1731            .count();
1732        assert!(episode_count >= 1);
1733        assert!(manager.storage_budget_rollup_count >= 1);
1734    }
1735
1736    #[test]
1737    fn auto_capture_off_noop() {
1738        let dir = tempfile::tempdir().expect("tempdir");
1739        let brain = dir.path().join("capture-off.amem");
1740        let mut manager = SessionManager::open(brain.to_str().expect("path")).expect("open");
1741        manager.auto_capture_mode = AutoCaptureMode::Off;
1742
1743        let captured = manager
1744            .capture_prompt_request(
1745                "remember",
1746                Some(&json!({"information":"hello world","context":"ctx"})),
1747            )
1748            .expect("capture");
1749        assert!(captured.is_none());
1750        assert_eq!(manager.graph().node_count(), 0);
1751    }
1752
1753    #[test]
1754    fn auto_capture_full_records_and_redacts() {
1755        let dir = tempfile::tempdir().expect("tempdir");
1756        let brain = dir.path().join("capture-full.amem");
1757        let mut manager = SessionManager::open(brain.to_str().expect("path")).expect("open");
1758        manager.auto_capture_mode = AutoCaptureMode::Full;
1759        manager.auto_capture_redact = true;
1760
1761        manager
1762            .capture_tool_call(
1763                "memory_query",
1764                Some(&json!({
1765                    "query":"Find anything about token sk-THISISALONGSECRET123456",
1766                    "context":"/Users/omoshola/Documents/private.txt",
1767                    "reason":"email me at test@example.com"
1768                })),
1769            )
1770            .expect("capture");
1771
1772        assert!(manager.graph().node_count() >= 1);
1773        let latest = manager
1774            .graph()
1775            .nodes()
1776            .iter()
1777            .max_by_key(|n| n.id)
1778            .expect("node");
1779        assert!(latest.content.contains("[auto-capture][tool]"));
1780        assert!(latest.content.contains("[REDACTED_SECRET]"));
1781        assert!(latest.content.contains("[REDACTED_PATH]"));
1782        assert!(latest.content.contains("[REDACTED_EMAIL]"));
1783    }
1784
1785    #[test]
1786    fn auto_capture_temporal_chain() {
1787        let dir = tempfile::tempdir().expect("tempdir");
1788        let brain = dir.path().join("chain.amem");
1789        let mut manager = SessionManager::open(brain.to_str().expect("path")).expect("open");
1790        manager.auto_capture_mode = AutoCaptureMode::Full;
1791
1792        // First capture: no predecessor.
1793        let id1 = manager
1794            .capture_tool_call("memory_query", Some(&json!({"query": "first question"})))
1795            .expect("capture")
1796            .expect("node_id");
1797
1798        assert_eq!(manager.last_temporal_node_id(), Some(id1));
1799
1800        // Second capture: should link to first.
1801        let id2 = manager
1802            .capture_tool_call(
1803                "memory_similar",
1804                Some(&json!({"query_text": "second question"})),
1805            )
1806            .expect("capture")
1807            .expect("node_id");
1808
1809        assert_eq!(manager.last_temporal_node_id(), Some(id2));
1810
1811        // Verify the TemporalNext edge exists: id1 -> id2.
1812        let has_temporal = manager.graph().edges().iter().any(|e| {
1813            e.source_id == id1 && e.target_id == id2 && e.edge_type == EdgeType::TemporalNext
1814        });
1815        assert!(has_temporal, "Expected TemporalNext edge from id1 to id2");
1816    }
1817
1818    #[test]
1819    fn temporal_chain_resets_on_new_session() {
1820        let dir = tempfile::tempdir().expect("tempdir");
1821        let brain = dir.path().join("reset.amem");
1822        let mut manager = SessionManager::open(brain.to_str().expect("path")).expect("open");
1823        manager.auto_capture_mode = AutoCaptureMode::Full;
1824
1825        let _id1 = manager
1826            .capture_tool_call("memory_query", Some(&json!({"query": "first"})))
1827            .expect("capture");
1828
1829        assert!(manager.last_temporal_node_id().is_some());
1830
1831        // Starting a new session should reset the chain.
1832        manager.start_session(None).expect("new session");
1833        assert!(manager.last_temporal_node_id().is_none());
1834    }
1835
1836    #[test]
1837    fn memory_add_joins_temporal_chain() {
1838        let dir = tempfile::tempdir().expect("tempdir");
1839        let brain = dir.path().join("splice.amem");
1840        let mut manager = SessionManager::open(brain.to_str().expect("path")).expect("open");
1841        manager.auto_capture_mode = AutoCaptureMode::Full;
1842
1843        // Create a chain head via auto-capture.
1844        let id1 = manager
1845            .capture_tool_call("memory_query", Some(&json!({"query": "something"})))
1846            .expect("capture")
1847            .expect("node_id");
1848
1849        // Simulate what memory_add tool does: add_event + link_temporal + advance.
1850        let (id2, _) = manager
1851            .add_event(EventType::Fact, "User prefers dark mode", 0.9, vec![])
1852            .expect("add_event");
1853        manager.link_temporal(id1, id2).expect("link");
1854        manager.advance_temporal_chain(id2);
1855
1856        assert_eq!(manager.last_temporal_node_id(), Some(id2));
1857
1858        let has_edge = manager.graph().edges().iter().any(|e| {
1859            e.source_id == id1 && e.target_id == id2 && e.edge_type == EdgeType::TemporalNext
1860        });
1861        assert!(has_edge, "memory_add node should be linked into chain");
1862    }
1863}