Skip to main content

hirn_engine/tools/
agent.rs

1//! [`MemoryAgent`] — autonomous consolidation and cleanup loop.
2//!
3//! Runs on a configurable interval, performing: consolidation → dead link
4//! cleanup → compaction trigger. Uses [`MemoryToolkit`] internally (same
5//! Cedar policies apply).
6
7use std::sync::Arc;
8
9use hirn_core::error::HirnResult;
10use hirn_core::types::AgentId;
11
12use super::MemoryToolkit;
13use crate::db::HirnDB;
14
15/// Autonomous memory maintenance agent.
16///
17/// Periodically consolidates, detects contradictions, cleans dead links, and
18/// triggers compaction. Shuts down gracefully on cancellation signal.
19pub struct MemoryAgent {
20    toolkit: MemoryToolkit,
21    agent_id: AgentId,
22    interval: std::time::Duration,
23    cancel: tokio::sync::watch::Receiver<bool>,
24}
25
26/// Metrics emitted after each agent loop iteration.
27#[derive(Debug, Clone, Default)]
28pub struct AgentLoopMetrics {
29    pub duration_ms: u64,
30    pub memories_consolidated: usize,
31    pub causal_edges_discovered: usize,
32    pub contradictions_found: usize,
33}
34
35impl MemoryAgent {
36    /// Create a new agent with the given database and configuration.
37    ///
38    /// # Arguments
39    /// - `db` — shared database handle
40    /// - `agent_id` — Cedar principal for policy enforcement
41    /// - `interval` — time between loop iterations
42    /// - `cancel` — watch receiver; when `true` is sent, agent shuts down
43    pub fn new(
44        db: Arc<HirnDB>,
45        agent_id: AgentId,
46        interval: std::time::Duration,
47        cancel: tokio::sync::watch::Receiver<bool>,
48    ) -> Self {
49        Self {
50            toolkit: MemoryToolkit::new(db),
51            agent_id,
52            interval,
53            cancel,
54        }
55    }
56
57    /// Run the autonomous loop until cancelled.
58    ///
59    /// Each iteration: consolidation → metrics emission.
60    /// Returns `Ok(())` on graceful shutdown.
61    pub async fn run(&mut self) -> HirnResult<()> {
62        tracing::info!(
63            agent_id = %self.agent_id.as_str(),
64            interval_secs = self.interval.as_secs(),
65            "MemoryAgent started"
66        );
67
68        loop {
69            tokio::select! {
70                result = self.cancel.changed() => {
71                    if result.is_err() || *self.cancel.borrow() {
72                        tracing::info!("MemoryAgent shutting down");
73                        return Ok(());
74                    }
75                }
76                _ = tokio::time::sleep(self.interval) => {
77                    let metrics = self.run_cycle().await;
78                    tracing::info!(
79                        duration_ms = metrics.duration_ms,
80                        consolidated = metrics.memories_consolidated,
81                        causal = metrics.causal_edges_discovered,
82                        contradictions = metrics.contradictions_found,
83                        "MemoryAgent cycle complete"
84                    );
85                }
86            }
87        }
88    }
89
90    /// Execute a single maintenance cycle.
91    ///
92    /// Enforces `Action::Consolidate` Cedar policy before proceeding.
93    /// If authorization fails, the entire cycle is skipped (logged, not fatal).
94    async fn run_cycle(&self) -> AgentLoopMetrics {
95        let start = std::time::Instant::now();
96        let mut metrics = AgentLoopMetrics::default();
97
98        // Cedar enforcement: system agent must have consolidate permission.
99        let db = self.toolkit.db();
100        let realm = &db.config().default_realm;
101        if let Err(e) = db
102            .enforce(
103                self.agent_id.as_str(),
104                crate::policy::Action::Consolidate,
105                realm,
106                "",
107            )
108            .await
109        {
110            tracing::warn!(
111                agent_id = %self.agent_id.as_str(),
112                error = %e,
113                "MemoryAgent cycle denied by Cedar policy"
114            );
115            metrics.duration_ms = start.elapsed().as_millis() as u64;
116            return metrics;
117        }
118
119        // Phase 1: Consolidation.
120        match db.consolidate().execute().await {
121            Ok(result) => {
122                metrics.memories_consolidated = result.concepts_extracted;
123                metrics.causal_edges_discovered = result.causal_edges_discovered;
124            }
125            Err(e) => {
126                tracing::warn!(error = %e, "MemoryAgent consolidation failed");
127            }
128        }
129
130        // Phase 2: Decay expired memories.
131        if let Err(e) = db.decay_memories().await {
132            tracing::warn!(error = %e, "MemoryAgent decay failed");
133        }
134
135        // Phase 3: Purge expired working memories.
136        if let Err(e) = db.purge_expired().await {
137            tracing::warn!(error = %e, "MemoryAgent purge failed");
138        }
139
140        metrics.duration_ms = start.elapsed().as_millis() as u64;
141        metrics
142    }
143
144    /// Run exactly one cycle (for testing).
145    pub async fn run_once(&self) -> AgentLoopMetrics {
146        self.run_cycle().await
147    }
148}