1use std::time::{SystemTime, UNIX_EPOCH};
11
12use zeph_db::{DbPool, sql};
13
14use crate::error::MemoryError;
15use crate::graph::store::GraphStore;
16use crate::types::{EntityId, ExperienceId};
17
18#[derive(Debug, Default)]
20pub struct EvolutionSweepStats {
21 pub pruned_self_loops: usize,
23 pub pruned_low_confidence: usize,
25}
26
27pub struct ExperienceStore {
33 pool: DbPool,
34}
35
36impl ExperienceStore {
37 #[must_use]
39 pub fn new(pool: DbPool) -> Self {
40 Self { pool }
41 }
42
43 #[tracing::instrument(
63 skip_all,
64 name = "memory.experience.record",
65 fields(tool_name, outcome)
66 )]
67 pub async fn record_tool_outcome(
68 &self,
69 session_id: &str,
70 turn: i64,
71 tool_name: &str,
72 outcome: &str,
73 detail: Option<&str>,
74 error_ctx: Option<&str>,
75 ) -> Result<ExperienceId, MemoryError> {
76 let now = SystemTime::now()
77 .duration_since(UNIX_EPOCH)
78 .map_or(0, |d| d.as_secs().cast_signed());
79 let id: i64 = zeph_db::query_scalar(sql!(
80 "INSERT INTO experience_nodes
81 (session_id, turn, tool_name, outcome, detail, error_ctx, created_at)
82 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)
83 RETURNING id"
84 ))
85 .bind(session_id)
86 .bind(turn)
87 .bind(tool_name)
88 .bind(outcome)
89 .bind(detail)
90 .bind(error_ctx)
91 .bind(now)
92 .fetch_one(&self.pool)
93 .await
94 .map_err(MemoryError::from)?;
95 Ok(ExperienceId(id))
96 }
97
98 pub async fn link_to_entities(
106 &self,
107 experience_id: ExperienceId,
108 entity_ids: &[EntityId],
109 ) -> Result<(), MemoryError> {
110 let _span =
111 tracing::info_span!("memory.experience.link_entities", exp = experience_id.0).entered();
112 for &entity_id in entity_ids {
113 zeph_db::query(sql!(
114 "INSERT INTO experience_entity_links
115 (experience_id, entity_id) VALUES (?1, ?2)
116 ON CONFLICT (experience_id, entity_id) DO NOTHING"
117 ))
118 .bind(experience_id.0)
119 .bind(entity_id.0)
120 .execute(&self.pool)
121 .await
122 .map_err(MemoryError::from)?;
123 }
124 Ok(())
125 }
126
127 pub async fn link_sequential(
133 &self,
134 prev: ExperienceId,
135 next: ExperienceId,
136 ) -> Result<(), MemoryError> {
137 zeph_db::query(sql!(
138 "INSERT INTO experience_edges
139 (source_exp_id, target_exp_id, relation)
140 VALUES (?1, ?2, 'followed_by')"
141 ))
142 .bind(prev.0)
143 .bind(next.0)
144 .execute(&self.pool)
145 .await
146 .map_err(MemoryError::from)?;
147 Ok(())
148 }
149
150 #[tracing::instrument(skip_all, name = "memory.experience.sweep")]
162 pub async fn evolution_sweep(
163 &self,
164 graph_store: &GraphStore,
165 confidence_threshold: f32,
166 ) -> Result<EvolutionSweepStats, MemoryError> {
167 let self_loops = zeph_db::query(sql!(
168 "DELETE FROM graph_edges WHERE source_entity_id = target_entity_id"
169 ))
170 .execute(graph_store.pool())
171 .await
172 .map_err(MemoryError::from)?
173 .rows_affected();
174
175 let low_conf = zeph_db::query(sql!(
176 "DELETE FROM graph_edges
177 WHERE confidence < ?1 AND retrieval_count = 0 AND valid_to IS NULL"
178 ))
179 .bind(confidence_threshold)
180 .execute(graph_store.pool())
181 .await
182 .map_err(MemoryError::from)?
183 .rows_affected();
184
185 Ok(EvolutionSweepStats {
186 pruned_self_loops: usize::try_from(self_loops).unwrap_or(usize::MAX),
187 pruned_low_confidence: usize::try_from(low_conf).unwrap_or(usize::MAX),
188 })
189 }
190}
191
192#[cfg(test)]
193mod tests {
194 use super::*;
195 use crate::graph::store::GraphStore;
196 use crate::graph::types::EntityType;
197 use crate::store::SqliteStore;
198 use zeph_db::sql;
199
200 async fn setup() -> (ExperienceStore, GraphStore, DbPool) {
201 let store = SqliteStore::new(":memory:").await.unwrap();
202 let pool = store.pool().clone();
203 let exp = ExperienceStore::new(pool.clone());
204 let gs = GraphStore::new(pool.clone());
205 (exp, gs, pool)
206 }
207
208 #[tokio::test]
209 async fn record_tool_outcome_inserts_experience_node() {
210 let (exp, _gs, pool) = setup().await;
211 let id = exp
212 .record_tool_outcome("sess1", 1, "shell", "success", Some("exit 0"), None)
213 .await
214 .unwrap();
215 assert!(id.0 > 0);
216
217 let (sid, turn, tool, outcome): (String, i64, String, String) = sqlx::query_as(sql!(
218 "SELECT session_id, turn, tool_name, outcome
219 FROM experience_nodes WHERE id = ?1"
220 ))
221 .bind(id.0)
222 .fetch_one(&pool)
223 .await
224 .unwrap();
225 assert_eq!(sid, "sess1");
226 assert_eq!(turn, 1);
227 assert_eq!(tool, "shell");
228 assert_eq!(outcome, "success");
229 }
230
231 #[tokio::test]
232 async fn link_to_entities_populates_link_table() {
233 let (exp, gs, pool) = setup().await;
234 let exp_id = exp
235 .record_tool_outcome("sess2", 1, "shell", "success", None, None)
236 .await
237 .unwrap();
238 let e1 = gs
239 .upsert_entity("Alice", "alice", EntityType::Person, None)
240 .await
241 .unwrap();
242 let e2 = gs
243 .upsert_entity("Bob", "bob", EntityType::Person, None)
244 .await
245 .unwrap();
246
247 exp.link_to_entities(exp_id, &[e1, e2]).await.unwrap();
248
249 let count: i64 = sqlx::query_scalar(sql!(
250 "SELECT COUNT(*) FROM experience_entity_links WHERE experience_id = ?1"
251 ))
252 .bind(exp_id.0)
253 .fetch_one(&pool)
254 .await
255 .unwrap();
256 assert_eq!(count, 2, "both entity links must be inserted");
257
258 exp.link_to_entities(exp_id, &[e1]).await.unwrap();
260 let count2: i64 = sqlx::query_scalar(sql!(
261 "SELECT COUNT(*) FROM experience_entity_links WHERE experience_id = ?1"
262 ))
263 .bind(exp_id.0)
264 .fetch_one(&pool)
265 .await
266 .unwrap();
267 assert_eq!(count2, 2, "INSERT OR IGNORE must prevent duplicate links");
268 }
269
270 #[tokio::test]
271 async fn link_sequential_creates_experience_edge() {
272 let (exp, _gs, pool) = setup().await;
273 let id1 = exp
274 .record_tool_outcome("sess1", 1, "shell", "success", None, None)
275 .await
276 .unwrap();
277 let id2 = exp
278 .record_tool_outcome("sess1", 2, "web_scrape", "success", None, None)
279 .await
280 .unwrap();
281
282 exp.link_sequential(id1, id2).await.unwrap();
283
284 let (src, tgt, rel): (i64, i64, String) = sqlx::query_as(sql!(
285 "SELECT source_exp_id, target_exp_id, relation
286 FROM experience_edges WHERE source_exp_id = ?1"
287 ))
288 .bind(id1.0)
289 .fetch_one(&pool)
290 .await
291 .unwrap();
292 assert_eq!(src, id1.0);
293 assert_eq!(tgt, id2.0);
294 assert_eq!(rel, "followed_by");
295 }
296
297 #[tokio::test]
298 async fn evolution_sweep_prunes_self_loops() {
299 let (exp, gs, pool) = setup().await;
300
301 let e1 = gs
302 .upsert_entity("Alice", "alice", EntityType::Person, Some("person"))
303 .await
304 .unwrap();
305 let e2 = gs
306 .upsert_entity("Bob", "bob", EntityType::Person, Some("person"))
307 .await
308 .unwrap();
309
310 sqlx::query(sql!("DROP TRIGGER IF EXISTS graph_edges_no_self_loops"))
312 .execute(&pool)
313 .await
314 .unwrap();
315
316 sqlx::query(sql!(
317 "INSERT INTO graph_edges
318 (source_entity_id, target_entity_id, relation, fact, confidence, edge_type)
319 VALUES (?1, ?1, 'knows', 'self', 0.5, 'semantic')"
320 ))
321 .bind(e1.0)
322 .execute(&pool)
323 .await
324 .unwrap();
325
326 sqlx::query(sql!(
328 "CREATE TRIGGER IF NOT EXISTS graph_edges_no_self_loops
329 BEFORE INSERT ON graph_edges
330 BEGIN
331 SELECT RAISE(ABORT, 'self-loop edge rejected: source and target entity must differ')
332 WHERE NEW.source_entity_id = NEW.target_entity_id;
333 END"
334 ))
335 .execute(&pool)
336 .await
337 .unwrap();
338
339 gs.insert_edge(e1.0, e2.0, "knows", "Alice knows Bob", 0.9, None)
341 .await
342 .unwrap();
343
344 let stats = exp.evolution_sweep(&gs, 0.3).await.unwrap();
345 assert_eq!(stats.pruned_self_loops, 1);
346 assert_eq!(stats.pruned_low_confidence, 0);
347
348 let count: i64 = sqlx::query_scalar(sql!(
349 "SELECT COUNT(*) FROM graph_edges
350 WHERE source_entity_id = ?1 AND target_entity_id = ?2"
351 ))
352 .bind(e1.0)
353 .bind(e2.0)
354 .fetch_one(&pool)
355 .await
356 .unwrap();
357 assert_eq!(count, 1);
358 }
359}