Skip to main content

zeph_memory/graph/
experience.rs

1// SPDX-FileCopyrightText: 2026 Andrei G <bug-ops>
2// SPDX-License-Identifier: MIT OR Apache-2.0
3
4//! Experience memory store — records tool execution outcomes and evolution sweeps.
5//!
6//! [`ExperienceStore`] persists agent tool-call outcomes as `experience_nodes` and links
7//! them into a temporal chain (`experience_edges`). The [`EvolutionSweepStats`] type
8//! describes pruning results from [`ExperienceStore::evolution_sweep`].
9
10use std::time::{SystemTime, UNIX_EPOCH};
11
12use zeph_db::{DbPool, sql};
13
14use crate::error::MemoryError;
15use crate::graph::store::GraphStore;
16use crate::types::{EntityId, ExperienceId};
17
18/// Statistics from a single graph evolution sweep.
19#[derive(Debug, Default)]
20pub struct EvolutionSweepStats {
21    /// Number of self-loop edges removed from the knowledge graph.
22    pub pruned_self_loops: usize,
23    /// Number of low-confidence zero-retrieval edges removed.
24    pub pruned_low_confidence: usize,
25}
26
27/// Persistent store for experience memory nodes and edges.
28///
29/// Wraps the `experience_nodes`, `experience_edges`, and `experience_entity_links`
30/// tables created by migration `*_experience_memory.sql`
31/// (`SQLite`: `076_experience_memory.sql`, `PostgreSQL`: `083_experience_memory.sql`).
32pub struct ExperienceStore {
33    pool: DbPool,
34}
35
36impl ExperienceStore {
37    /// Create a new experience store using the provided connection pool.
38    #[must_use]
39    pub fn new(pool: DbPool) -> Self {
40        Self { pool }
41    }
42
43    /// Record a tool execution outcome as an experience node.
44    ///
45    /// Returns the strongly typed row ID of the newly inserted experience node.
46    ///
47    /// # Errors
48    ///
49    /// Returns [`MemoryError`] if the database insert fails.
50    ///
51    /// # Examples
52    ///
53    /// ```no_run
54    /// # use zeph_memory::graph::experience::ExperienceStore;
55    /// # async fn demo(store: &ExperienceStore) -> Result<(), Box<dyn std::error::Error>> {
56    /// let id = store
57    ///     .record_tool_outcome("session-1", 3, "shell", "success", Some("exit 0"), None)
58    ///     .await?;
59    /// # Ok(())
60    /// # }
61    /// ```
62    #[tracing::instrument(
63        skip_all,
64        name = "memory.experience.record",
65        fields(tool_name, outcome)
66    )]
67    pub async fn record_tool_outcome(
68        &self,
69        session_id: &str,
70        turn: i64,
71        tool_name: &str,
72        outcome: &str,
73        detail: Option<&str>,
74        error_ctx: Option<&str>,
75    ) -> Result<ExperienceId, MemoryError> {
76        let now = SystemTime::now()
77            .duration_since(UNIX_EPOCH)
78            .map_or(0, |d| d.as_secs().cast_signed());
79        let id: i64 = zeph_db::query_scalar(sql!(
80            "INSERT INTO experience_nodes
81             (session_id, turn, tool_name, outcome, detail, error_ctx, created_at)
82             VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)
83             RETURNING id"
84        ))
85        .bind(session_id)
86        .bind(turn)
87        .bind(tool_name)
88        .bind(outcome)
89        .bind(detail)
90        .bind(error_ctx)
91        .bind(now)
92        .fetch_one(&self.pool)
93        .await
94        .map_err(MemoryError::from)?;
95        Ok(ExperienceId(id))
96    }
97
98    /// Link an experience node to one or more knowledge graph entities.
99    ///
100    /// Uses `INSERT OR IGNORE` to tolerate duplicate links gracefully.
101    ///
102    /// # Errors
103    ///
104    /// Returns [`MemoryError`] if any database insert fails.
105    #[tracing::instrument(skip_all, name = "memory.experience.link_entities", fields(exp = experience_id.0))]
106    pub async fn link_to_entities(
107        &self,
108        experience_id: ExperienceId,
109        entity_ids: &[EntityId],
110    ) -> Result<(), MemoryError> {
111        for &entity_id in entity_ids {
112            zeph_db::query(sql!(
113                "INSERT INTO experience_entity_links
114                 (experience_id, entity_id) VALUES (?1, ?2)
115                 ON CONFLICT (experience_id, entity_id) DO NOTHING"
116            ))
117            .bind(experience_id.0)
118            .bind(entity_id.0)
119            .execute(&self.pool)
120            .await
121            .map_err(MemoryError::from)?;
122        }
123        Ok(())
124    }
125
126    /// Record a sequential `followed_by` link between two experience nodes.
127    ///
128    /// # Errors
129    ///
130    /// Returns [`MemoryError`] if the database insert fails.
131    pub async fn link_sequential(
132        &self,
133        prev: ExperienceId,
134        next: ExperienceId,
135    ) -> Result<(), MemoryError> {
136        zeph_db::query(sql!(
137            "INSERT INTO experience_edges
138             (source_exp_id, target_exp_id, relation)
139             VALUES (?1, ?2, 'followed_by')"
140        ))
141        .bind(prev.0)
142        .bind(next.0)
143        .execute(&self.pool)
144        .await
145        .map_err(MemoryError::from)?;
146        Ok(())
147    }
148
149    /// Run a graph evolution sweep on the knowledge graph.
150    ///
151    /// Performs two pruning passes on the `graph_edges` table via `graph_store`:
152    /// 1. Removes self-loops (`source_entity_id = target_entity_id`).
153    /// 2. Removes low-confidence edges with zero retrievals that have no expiry set.
154    ///
155    /// This is a maintenance operation; it never blocks the agent loop.
156    ///
157    /// # Errors
158    ///
159    /// Returns [`MemoryError`] if any database operation fails.
160    #[tracing::instrument(skip_all, name = "memory.experience.sweep")]
161    pub async fn evolution_sweep(
162        &self,
163        graph_store: &GraphStore,
164        confidence_threshold: f32,
165    ) -> Result<EvolutionSweepStats, MemoryError> {
166        let self_loops = zeph_db::query(sql!(
167            "DELETE FROM graph_edges WHERE source_entity_id = target_entity_id"
168        ))
169        .execute(graph_store.pool())
170        .await
171        .map_err(MemoryError::from)?
172        .rows_affected();
173
174        let low_conf = zeph_db::query(sql!(
175            "DELETE FROM graph_edges
176             WHERE confidence < ?1 AND retrieval_count = 0 AND valid_to IS NULL"
177        ))
178        .bind(confidence_threshold)
179        .execute(graph_store.pool())
180        .await
181        .map_err(MemoryError::from)?
182        .rows_affected();
183
184        Ok(EvolutionSweepStats {
185            pruned_self_loops: usize::try_from(self_loops).unwrap_or(usize::MAX),
186            pruned_low_confidence: usize::try_from(low_conf).unwrap_or(usize::MAX),
187        })
188    }
189}
190
191#[cfg(test)]
192mod tests {
193    use super::*;
194    use crate::graph::store::GraphStore;
195    use crate::graph::types::EntityType;
196    use crate::store::SqliteStore;
197    use zeph_db::sql;
198
199    async fn setup() -> (ExperienceStore, GraphStore, DbPool) {
200        let store = SqliteStore::new(":memory:").await.unwrap();
201        let pool = store.pool().clone();
202        let exp = ExperienceStore::new(pool.clone());
203        let gs = GraphStore::new(pool.clone());
204        (exp, gs, pool)
205    }
206
207    #[tokio::test]
208    async fn record_tool_outcome_inserts_experience_node() {
209        let (exp, _gs, pool) = setup().await;
210        let id = exp
211            .record_tool_outcome("sess1", 1, "shell", "success", Some("exit 0"), None)
212            .await
213            .unwrap();
214        assert!(id.0 > 0);
215
216        let (sid, turn, tool, outcome): (String, i64, String, String) = sqlx::query_as(sql!(
217            "SELECT session_id, turn, tool_name, outcome
218                 FROM experience_nodes WHERE id = ?1"
219        ))
220        .bind(id.0)
221        .fetch_one(&pool)
222        .await
223        .unwrap();
224        assert_eq!(sid, "sess1");
225        assert_eq!(turn, 1);
226        assert_eq!(tool, "shell");
227        assert_eq!(outcome, "success");
228    }
229
230    #[tokio::test]
231    async fn link_to_entities_populates_link_table() {
232        let (exp, gs, pool) = setup().await;
233        let exp_id = exp
234            .record_tool_outcome("sess2", 1, "shell", "success", None, None)
235            .await
236            .unwrap();
237        let e1 = gs
238            .upsert_entity("Alice", "alice", EntityType::Person, None)
239            .await
240            .unwrap();
241        let e2 = gs
242            .upsert_entity("Bob", "bob", EntityType::Person, None)
243            .await
244            .unwrap();
245
246        exp.link_to_entities(exp_id, &[e1, e2]).await.unwrap();
247
248        let count: i64 = sqlx::query_scalar(sql!(
249            "SELECT COUNT(*) FROM experience_entity_links WHERE experience_id = ?1"
250        ))
251        .bind(exp_id.0)
252        .fetch_one(&pool)
253        .await
254        .unwrap();
255        assert_eq!(count, 2, "both entity links must be inserted");
256
257        // Idempotency: second call must not duplicate rows.
258        exp.link_to_entities(exp_id, &[e1]).await.unwrap();
259        let count2: i64 = sqlx::query_scalar(sql!(
260            "SELECT COUNT(*) FROM experience_entity_links WHERE experience_id = ?1"
261        ))
262        .bind(exp_id.0)
263        .fetch_one(&pool)
264        .await
265        .unwrap();
266        assert_eq!(count2, 2, "INSERT OR IGNORE must prevent duplicate links");
267    }
268
269    #[tokio::test]
270    async fn link_sequential_creates_experience_edge() {
271        let (exp, _gs, pool) = setup().await;
272        let id1 = exp
273            .record_tool_outcome("sess1", 1, "shell", "success", None, None)
274            .await
275            .unwrap();
276        let id2 = exp
277            .record_tool_outcome("sess1", 2, "web_scrape", "success", None, None)
278            .await
279            .unwrap();
280
281        exp.link_sequential(id1, id2).await.unwrap();
282
283        let (src, tgt, rel): (i64, i64, String) = sqlx::query_as(sql!(
284            "SELECT source_exp_id, target_exp_id, relation
285             FROM experience_edges WHERE source_exp_id = ?1"
286        ))
287        .bind(id1.0)
288        .fetch_one(&pool)
289        .await
290        .unwrap();
291        assert_eq!(src, id1.0);
292        assert_eq!(tgt, id2.0);
293        assert_eq!(rel, "followed_by");
294    }
295
296    #[tokio::test]
297    async fn evolution_sweep_prunes_self_loops() {
298        let (exp, gs, pool) = setup().await;
299
300        let e1 = gs
301            .upsert_entity("Alice", "alice", EntityType::Person, Some("person"))
302            .await
303            .unwrap();
304        let e2 = gs
305            .upsert_entity("Bob", "bob", EntityType::Person, Some("person"))
306            .await
307            .unwrap();
308
309        // Drop the self-loop trigger so we can insert a test self-loop.
310        sqlx::query(sql!("DROP TRIGGER IF EXISTS graph_edges_no_self_loops"))
311            .execute(&pool)
312            .await
313            .unwrap();
314
315        sqlx::query(sql!(
316            "INSERT INTO graph_edges
317             (source_entity_id, target_entity_id, relation, fact, confidence, edge_type)
318             VALUES (?1, ?1, 'knows', 'self', 0.5, 'semantic')"
319        ))
320        .bind(e1.0)
321        .execute(&pool)
322        .await
323        .unwrap();
324
325        // Restore the trigger with the original SQL from migration 044.
326        sqlx::query(sql!(
327            "CREATE TRIGGER IF NOT EXISTS graph_edges_no_self_loops
328             BEFORE INSERT ON graph_edges
329             BEGIN
330                 SELECT RAISE(ABORT, 'self-loop edge rejected: source and target entity must differ')
331                 WHERE NEW.source_entity_id = NEW.target_entity_id;
332             END"
333        ))
334        .execute(&pool)
335        .await
336        .unwrap();
337
338        // Insert a normal edge that must survive the sweep.
339        gs.insert_edge(e1.0, e2.0, "knows", "Alice knows Bob", 0.9, None)
340            .await
341            .unwrap();
342
343        let stats = exp.evolution_sweep(&gs, 0.3).await.unwrap();
344        assert_eq!(stats.pruned_self_loops, 1);
345        assert_eq!(stats.pruned_low_confidence, 0);
346
347        let count: i64 = sqlx::query_scalar(sql!(
348            "SELECT COUNT(*) FROM graph_edges
349             WHERE source_entity_id = ?1 AND target_entity_id = ?2"
350        ))
351        .bind(e1.0)
352        .bind(e2.0)
353        .fetch_one(&pool)
354        .await
355        .unwrap();
356        assert_eq!(count, 1);
357    }
358}