Skip to main content

zeph_memory/graph/
experience.rs

1// SPDX-FileCopyrightText: 2026 Andrei G <bug-ops>
2// SPDX-License-Identifier: MIT OR Apache-2.0
3
4//! Experience memory store — records tool execution outcomes and evolution sweeps.
5//!
6//! [`ExperienceStore`] persists agent tool-call outcomes as `experience_nodes` and links
7//! them into a temporal chain (`experience_edges`). The [`EvolutionSweepStats`] type
8//! describes pruning results from [`ExperienceStore::evolution_sweep`].
9
10use std::time::{SystemTime, UNIX_EPOCH};
11
12use zeph_db::{DbPool, sql};
13
14use crate::error::MemoryError;
15use crate::graph::store::GraphStore;
16use crate::types::{EntityId, ExperienceId};
17
18/// Statistics from a single graph evolution sweep.
19#[derive(Debug, Default)]
20pub struct EvolutionSweepStats {
21    /// Number of self-loop edges removed from the knowledge graph.
22    pub pruned_self_loops: usize,
23    /// Number of low-confidence zero-retrieval edges removed.
24    pub pruned_low_confidence: usize,
25}
26
27/// Persistent store for experience memory nodes and edges.
28///
29/// Wraps the `experience_nodes`, `experience_edges`, and `experience_entity_links`
30/// tables created by migration `*_experience_memory.sql`
31/// (`SQLite`: `076_experience_memory.sql`, `PostgreSQL`: `083_experience_memory.sql`).
32pub struct ExperienceStore {
33    pool: DbPool,
34}
35
36impl ExperienceStore {
37    /// Create a new experience store using the provided connection pool.
38    #[must_use]
39    pub fn new(pool: DbPool) -> Self {
40        Self { pool }
41    }
42
43    /// Record a tool execution outcome as an experience node.
44    ///
45    /// Returns the strongly typed row ID of the newly inserted experience node.
46    ///
47    /// # Errors
48    ///
49    /// Returns [`MemoryError`] if the database insert fails.
50    ///
51    /// # Examples
52    ///
53    /// ```no_run
54    /// # use zeph_memory::graph::experience::ExperienceStore;
55    /// # async fn demo(store: &ExperienceStore) -> Result<(), Box<dyn std::error::Error>> {
56    /// let id = store
57    ///     .record_tool_outcome("session-1", 3, "shell", "success", Some("exit 0"), None)
58    ///     .await?;
59    /// # Ok(())
60    /// # }
61    /// ```
62    #[tracing::instrument(
63        skip_all,
64        name = "memory.experience.record",
65        fields(tool_name, outcome)
66    )]
67    pub async fn record_tool_outcome(
68        &self,
69        session_id: &str,
70        turn: i64,
71        tool_name: &str,
72        outcome: &str,
73        detail: Option<&str>,
74        error_ctx: Option<&str>,
75    ) -> Result<ExperienceId, MemoryError> {
76        let now = SystemTime::now()
77            .duration_since(UNIX_EPOCH)
78            .map_or(0, |d| d.as_secs().cast_signed());
79        let id: i64 = zeph_db::query_scalar(sql!(
80            "INSERT INTO experience_nodes
81             (session_id, turn, tool_name, outcome, detail, error_ctx, created_at)
82             VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)
83             RETURNING id"
84        ))
85        .bind(session_id)
86        .bind(turn)
87        .bind(tool_name)
88        .bind(outcome)
89        .bind(detail)
90        .bind(error_ctx)
91        .bind(now)
92        .fetch_one(&self.pool)
93        .await
94        .map_err(MemoryError::from)?;
95        Ok(ExperienceId(id))
96    }
97
98    /// Link an experience node to one or more knowledge graph entities.
99    ///
100    /// Uses `INSERT OR IGNORE` to tolerate duplicate links gracefully.
101    ///
102    /// # Errors
103    ///
104    /// Returns [`MemoryError`] if any database insert fails.
105    pub async fn link_to_entities(
106        &self,
107        experience_id: ExperienceId,
108        entity_ids: &[EntityId],
109    ) -> Result<(), MemoryError> {
110        let _span =
111            tracing::info_span!("memory.experience.link_entities", exp = experience_id.0).entered();
112        for &entity_id in entity_ids {
113            zeph_db::query(sql!(
114                "INSERT INTO experience_entity_links
115                 (experience_id, entity_id) VALUES (?1, ?2)
116                 ON CONFLICT (experience_id, entity_id) DO NOTHING"
117            ))
118            .bind(experience_id.0)
119            .bind(entity_id.0)
120            .execute(&self.pool)
121            .await
122            .map_err(MemoryError::from)?;
123        }
124        Ok(())
125    }
126
127    /// Record a sequential `followed_by` link between two experience nodes.
128    ///
129    /// # Errors
130    ///
131    /// Returns [`MemoryError`] if the database insert fails.
132    pub async fn link_sequential(
133        &self,
134        prev: ExperienceId,
135        next: ExperienceId,
136    ) -> Result<(), MemoryError> {
137        zeph_db::query(sql!(
138            "INSERT INTO experience_edges
139             (source_exp_id, target_exp_id, relation)
140             VALUES (?1, ?2, 'followed_by')"
141        ))
142        .bind(prev.0)
143        .bind(next.0)
144        .execute(&self.pool)
145        .await
146        .map_err(MemoryError::from)?;
147        Ok(())
148    }
149
150    /// Run a graph evolution sweep on the knowledge graph.
151    ///
152    /// Performs two pruning passes on the `graph_edges` table via `graph_store`:
153    /// 1. Removes self-loops (`source_entity_id = target_entity_id`).
154    /// 2. Removes low-confidence edges with zero retrievals that have no expiry set.
155    ///
156    /// This is a maintenance operation; it never blocks the agent loop.
157    ///
158    /// # Errors
159    ///
160    /// Returns [`MemoryError`] if any database operation fails.
161    #[tracing::instrument(skip_all, name = "memory.experience.sweep")]
162    pub async fn evolution_sweep(
163        &self,
164        graph_store: &GraphStore,
165        confidence_threshold: f32,
166    ) -> Result<EvolutionSweepStats, MemoryError> {
167        let self_loops = zeph_db::query(sql!(
168            "DELETE FROM graph_edges WHERE source_entity_id = target_entity_id"
169        ))
170        .execute(graph_store.pool())
171        .await
172        .map_err(MemoryError::from)?
173        .rows_affected();
174
175        let low_conf = zeph_db::query(sql!(
176            "DELETE FROM graph_edges
177             WHERE confidence < ?1 AND retrieval_count = 0 AND valid_to IS NULL"
178        ))
179        .bind(confidence_threshold)
180        .execute(graph_store.pool())
181        .await
182        .map_err(MemoryError::from)?
183        .rows_affected();
184
185        Ok(EvolutionSweepStats {
186            pruned_self_loops: usize::try_from(self_loops).unwrap_or(usize::MAX),
187            pruned_low_confidence: usize::try_from(low_conf).unwrap_or(usize::MAX),
188        })
189    }
190}
191
192#[cfg(test)]
193mod tests {
194    use super::*;
195    use crate::graph::store::GraphStore;
196    use crate::graph::types::EntityType;
197    use crate::store::SqliteStore;
198    use zeph_db::sql;
199
200    async fn setup() -> (ExperienceStore, GraphStore, DbPool) {
201        let store = SqliteStore::new(":memory:").await.unwrap();
202        let pool = store.pool().clone();
203        let exp = ExperienceStore::new(pool.clone());
204        let gs = GraphStore::new(pool.clone());
205        (exp, gs, pool)
206    }
207
208    #[tokio::test]
209    async fn record_tool_outcome_inserts_experience_node() {
210        let (exp, _gs, pool) = setup().await;
211        let id = exp
212            .record_tool_outcome("sess1", 1, "shell", "success", Some("exit 0"), None)
213            .await
214            .unwrap();
215        assert!(id.0 > 0);
216
217        let (sid, turn, tool, outcome): (String, i64, String, String) = sqlx::query_as(sql!(
218            "SELECT session_id, turn, tool_name, outcome
219                 FROM experience_nodes WHERE id = ?1"
220        ))
221        .bind(id.0)
222        .fetch_one(&pool)
223        .await
224        .unwrap();
225        assert_eq!(sid, "sess1");
226        assert_eq!(turn, 1);
227        assert_eq!(tool, "shell");
228        assert_eq!(outcome, "success");
229    }
230
231    #[tokio::test]
232    async fn link_to_entities_populates_link_table() {
233        let (exp, gs, pool) = setup().await;
234        let exp_id = exp
235            .record_tool_outcome("sess2", 1, "shell", "success", None, None)
236            .await
237            .unwrap();
238        let e1 = gs
239            .upsert_entity("Alice", "alice", EntityType::Person, None)
240            .await
241            .unwrap();
242        let e2 = gs
243            .upsert_entity("Bob", "bob", EntityType::Person, None)
244            .await
245            .unwrap();
246
247        exp.link_to_entities(exp_id, &[e1, e2]).await.unwrap();
248
249        let count: i64 = sqlx::query_scalar(sql!(
250            "SELECT COUNT(*) FROM experience_entity_links WHERE experience_id = ?1"
251        ))
252        .bind(exp_id.0)
253        .fetch_one(&pool)
254        .await
255        .unwrap();
256        assert_eq!(count, 2, "both entity links must be inserted");
257
258        // Idempotency: second call must not duplicate rows.
259        exp.link_to_entities(exp_id, &[e1]).await.unwrap();
260        let count2: i64 = sqlx::query_scalar(sql!(
261            "SELECT COUNT(*) FROM experience_entity_links WHERE experience_id = ?1"
262        ))
263        .bind(exp_id.0)
264        .fetch_one(&pool)
265        .await
266        .unwrap();
267        assert_eq!(count2, 2, "INSERT OR IGNORE must prevent duplicate links");
268    }
269
270    #[tokio::test]
271    async fn link_sequential_creates_experience_edge() {
272        let (exp, _gs, pool) = setup().await;
273        let id1 = exp
274            .record_tool_outcome("sess1", 1, "shell", "success", None, None)
275            .await
276            .unwrap();
277        let id2 = exp
278            .record_tool_outcome("sess1", 2, "web_scrape", "success", None, None)
279            .await
280            .unwrap();
281
282        exp.link_sequential(id1, id2).await.unwrap();
283
284        let (src, tgt, rel): (i64, i64, String) = sqlx::query_as(sql!(
285            "SELECT source_exp_id, target_exp_id, relation
286             FROM experience_edges WHERE source_exp_id = ?1"
287        ))
288        .bind(id1.0)
289        .fetch_one(&pool)
290        .await
291        .unwrap();
292        assert_eq!(src, id1.0);
293        assert_eq!(tgt, id2.0);
294        assert_eq!(rel, "followed_by");
295    }
296
297    #[tokio::test]
298    async fn evolution_sweep_prunes_self_loops() {
299        let (exp, gs, pool) = setup().await;
300
301        let e1 = gs
302            .upsert_entity("Alice", "alice", EntityType::Person, Some("person"))
303            .await
304            .unwrap();
305        let e2 = gs
306            .upsert_entity("Bob", "bob", EntityType::Person, Some("person"))
307            .await
308            .unwrap();
309
310        // Drop the self-loop trigger so we can insert a test self-loop.
311        sqlx::query(sql!("DROP TRIGGER IF EXISTS graph_edges_no_self_loops"))
312            .execute(&pool)
313            .await
314            .unwrap();
315
316        sqlx::query(sql!(
317            "INSERT INTO graph_edges
318             (source_entity_id, target_entity_id, relation, fact, confidence, edge_type)
319             VALUES (?1, ?1, 'knows', 'self', 0.5, 'semantic')"
320        ))
321        .bind(e1.0)
322        .execute(&pool)
323        .await
324        .unwrap();
325
326        // Restore the trigger with the original SQL from migration 044.
327        sqlx::query(sql!(
328            "CREATE TRIGGER IF NOT EXISTS graph_edges_no_self_loops
329             BEFORE INSERT ON graph_edges
330             BEGIN
331                 SELECT RAISE(ABORT, 'self-loop edge rejected: source and target entity must differ')
332                 WHERE NEW.source_entity_id = NEW.target_entity_id;
333             END"
334        ))
335        .execute(&pool)
336        .await
337        .unwrap();
338
339        // Insert a normal edge that must survive the sweep.
340        gs.insert_edge(e1.0, e2.0, "knows", "Alice knows Bob", 0.9, None)
341            .await
342            .unwrap();
343
344        let stats = exp.evolution_sweep(&gs, 0.3).await.unwrap();
345        assert_eq!(stats.pruned_self_loops, 1);
346        assert_eq!(stats.pruned_low_confidence, 0);
347
348        let count: i64 = sqlx::query_scalar(sql!(
349            "SELECT COUNT(*) FROM graph_edges
350             WHERE source_entity_id = ?1 AND target_entity_id = ?2"
351        ))
352        .bind(e1.0)
353        .bind(e2.0)
354        .fetch_one(&pool)
355        .await
356        .unwrap();
357        assert_eq!(count, 1);
358    }
359}