1use std::time::{SystemTime, UNIX_EPOCH};
11
12use zeph_db::{DbPool, sql};
13
14use crate::error::MemoryError;
15use crate::graph::store::GraphStore;
16use crate::types::{EntityId, ExperienceId};
17
18#[derive(Debug, Default)]
20pub struct EvolutionSweepStats {
21 pub pruned_self_loops: usize,
23 pub pruned_low_confidence: usize,
25}
26
27pub struct ExperienceStore {
33 pool: DbPool,
34}
35
36impl ExperienceStore {
37 #[must_use]
39 pub fn new(pool: DbPool) -> Self {
40 Self { pool }
41 }
42
43 #[tracing::instrument(
63 skip_all,
64 name = "memory.experience.record",
65 fields(tool_name, outcome)
66 )]
67 pub async fn record_tool_outcome(
68 &self,
69 session_id: &str,
70 turn: i64,
71 tool_name: &str,
72 outcome: &str,
73 detail: Option<&str>,
74 error_ctx: Option<&str>,
75 ) -> Result<ExperienceId, MemoryError> {
76 let now = SystemTime::now()
77 .duration_since(UNIX_EPOCH)
78 .map_or(0, |d| d.as_secs().cast_signed());
79 let id: i64 = zeph_db::query_scalar(sql!(
80 "INSERT INTO experience_nodes
81 (session_id, turn, tool_name, outcome, detail, error_ctx, created_at)
82 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)
83 RETURNING id"
84 ))
85 .bind(session_id)
86 .bind(turn)
87 .bind(tool_name)
88 .bind(outcome)
89 .bind(detail)
90 .bind(error_ctx)
91 .bind(now)
92 .fetch_one(&self.pool)
93 .await
94 .map_err(MemoryError::from)?;
95 Ok(ExperienceId(id))
96 }
97
98 #[tracing::instrument(skip_all, name = "memory.experience.link_entities", fields(exp = experience_id.0))]
106 pub async fn link_to_entities(
107 &self,
108 experience_id: ExperienceId,
109 entity_ids: &[EntityId],
110 ) -> Result<(), MemoryError> {
111 for &entity_id in entity_ids {
112 zeph_db::query(sql!(
113 "INSERT INTO experience_entity_links
114 (experience_id, entity_id) VALUES (?1, ?2)
115 ON CONFLICT (experience_id, entity_id) DO NOTHING"
116 ))
117 .bind(experience_id.0)
118 .bind(entity_id.0)
119 .execute(&self.pool)
120 .await
121 .map_err(MemoryError::from)?;
122 }
123 Ok(())
124 }
125
126 pub async fn link_sequential(
132 &self,
133 prev: ExperienceId,
134 next: ExperienceId,
135 ) -> Result<(), MemoryError> {
136 zeph_db::query(sql!(
137 "INSERT INTO experience_edges
138 (source_exp_id, target_exp_id, relation)
139 VALUES (?1, ?2, 'followed_by')"
140 ))
141 .bind(prev.0)
142 .bind(next.0)
143 .execute(&self.pool)
144 .await
145 .map_err(MemoryError::from)?;
146 Ok(())
147 }
148
149 #[tracing::instrument(skip_all, name = "memory.experience.sweep")]
161 pub async fn evolution_sweep(
162 &self,
163 graph_store: &GraphStore,
164 confidence_threshold: f32,
165 ) -> Result<EvolutionSweepStats, MemoryError> {
166 let self_loops = zeph_db::query(sql!(
167 "DELETE FROM graph_edges WHERE source_entity_id = target_entity_id"
168 ))
169 .execute(graph_store.pool())
170 .await
171 .map_err(MemoryError::from)?
172 .rows_affected();
173
174 let low_conf = zeph_db::query(sql!(
175 "DELETE FROM graph_edges
176 WHERE confidence < ?1 AND retrieval_count = 0 AND valid_to IS NULL"
177 ))
178 .bind(confidence_threshold)
179 .execute(graph_store.pool())
180 .await
181 .map_err(MemoryError::from)?
182 .rows_affected();
183
184 Ok(EvolutionSweepStats {
185 pruned_self_loops: usize::try_from(self_loops).unwrap_or(usize::MAX),
186 pruned_low_confidence: usize::try_from(low_conf).unwrap_or(usize::MAX),
187 })
188 }
189}
190
191#[cfg(test)]
192mod tests {
193 use super::*;
194 use crate::graph::store::GraphStore;
195 use crate::graph::types::EntityType;
196 use crate::store::SqliteStore;
197 use zeph_db::sql;
198
199 async fn setup() -> (ExperienceStore, GraphStore, DbPool) {
200 let store = SqliteStore::new(":memory:").await.unwrap();
201 let pool = store.pool().clone();
202 let exp = ExperienceStore::new(pool.clone());
203 let gs = GraphStore::new(pool.clone());
204 (exp, gs, pool)
205 }
206
207 #[tokio::test]
208 async fn record_tool_outcome_inserts_experience_node() {
209 let (exp, _gs, pool) = setup().await;
210 let id = exp
211 .record_tool_outcome("sess1", 1, "shell", "success", Some("exit 0"), None)
212 .await
213 .unwrap();
214 assert!(id.0 > 0);
215
216 let (sid, turn, tool, outcome): (String, i64, String, String) = sqlx::query_as(sql!(
217 "SELECT session_id, turn, tool_name, outcome
218 FROM experience_nodes WHERE id = ?1"
219 ))
220 .bind(id.0)
221 .fetch_one(&pool)
222 .await
223 .unwrap();
224 assert_eq!(sid, "sess1");
225 assert_eq!(turn, 1);
226 assert_eq!(tool, "shell");
227 assert_eq!(outcome, "success");
228 }
229
230 #[tokio::test]
231 async fn link_to_entities_populates_link_table() {
232 let (exp, gs, pool) = setup().await;
233 let exp_id = exp
234 .record_tool_outcome("sess2", 1, "shell", "success", None, None)
235 .await
236 .unwrap();
237 let e1 = gs
238 .upsert_entity("Alice", "alice", EntityType::Person, None)
239 .await
240 .unwrap();
241 let e2 = gs
242 .upsert_entity("Bob", "bob", EntityType::Person, None)
243 .await
244 .unwrap();
245
246 exp.link_to_entities(exp_id, &[e1, e2]).await.unwrap();
247
248 let count: i64 = sqlx::query_scalar(sql!(
249 "SELECT COUNT(*) FROM experience_entity_links WHERE experience_id = ?1"
250 ))
251 .bind(exp_id.0)
252 .fetch_one(&pool)
253 .await
254 .unwrap();
255 assert_eq!(count, 2, "both entity links must be inserted");
256
257 exp.link_to_entities(exp_id, &[e1]).await.unwrap();
259 let count2: i64 = sqlx::query_scalar(sql!(
260 "SELECT COUNT(*) FROM experience_entity_links WHERE experience_id = ?1"
261 ))
262 .bind(exp_id.0)
263 .fetch_one(&pool)
264 .await
265 .unwrap();
266 assert_eq!(count2, 2, "INSERT OR IGNORE must prevent duplicate links");
267 }
268
269 #[tokio::test]
270 async fn link_sequential_creates_experience_edge() {
271 let (exp, _gs, pool) = setup().await;
272 let id1 = exp
273 .record_tool_outcome("sess1", 1, "shell", "success", None, None)
274 .await
275 .unwrap();
276 let id2 = exp
277 .record_tool_outcome("sess1", 2, "web_scrape", "success", None, None)
278 .await
279 .unwrap();
280
281 exp.link_sequential(id1, id2).await.unwrap();
282
283 let (src, tgt, rel): (i64, i64, String) = sqlx::query_as(sql!(
284 "SELECT source_exp_id, target_exp_id, relation
285 FROM experience_edges WHERE source_exp_id = ?1"
286 ))
287 .bind(id1.0)
288 .fetch_one(&pool)
289 .await
290 .unwrap();
291 assert_eq!(src, id1.0);
292 assert_eq!(tgt, id2.0);
293 assert_eq!(rel, "followed_by");
294 }
295
296 #[tokio::test]
297 async fn evolution_sweep_prunes_self_loops() {
298 let (exp, gs, pool) = setup().await;
299
300 let e1 = gs
301 .upsert_entity("Alice", "alice", EntityType::Person, Some("person"))
302 .await
303 .unwrap();
304 let e2 = gs
305 .upsert_entity("Bob", "bob", EntityType::Person, Some("person"))
306 .await
307 .unwrap();
308
309 sqlx::query(sql!("DROP TRIGGER IF EXISTS graph_edges_no_self_loops"))
311 .execute(&pool)
312 .await
313 .unwrap();
314
315 sqlx::query(sql!(
316 "INSERT INTO graph_edges
317 (source_entity_id, target_entity_id, relation, fact, confidence, edge_type)
318 VALUES (?1, ?1, 'knows', 'self', 0.5, 'semantic')"
319 ))
320 .bind(e1.0)
321 .execute(&pool)
322 .await
323 .unwrap();
324
325 sqlx::query(sql!(
327 "CREATE TRIGGER IF NOT EXISTS graph_edges_no_self_loops
328 BEFORE INSERT ON graph_edges
329 BEGIN
330 SELECT RAISE(ABORT, 'self-loop edge rejected: source and target entity must differ')
331 WHERE NEW.source_entity_id = NEW.target_entity_id;
332 END"
333 ))
334 .execute(&pool)
335 .await
336 .unwrap();
337
338 gs.insert_edge(e1.0, e2.0, "knows", "Alice knows Bob", 0.9, None)
340 .await
341 .unwrap();
342
343 let stats = exp.evolution_sweep(&gs, 0.3).await.unwrap();
344 assert_eq!(stats.pruned_self_loops, 1);
345 assert_eq!(stats.pruned_low_confidence, 0);
346
347 let count: i64 = sqlx::query_scalar(sql!(
348 "SELECT COUNT(*) FROM graph_edges
349 WHERE source_entity_id = ?1 AND target_entity_id = ?2"
350 ))
351 .bind(e1.0)
352 .bind(e2.0)
353 .fetch_one(&pool)
354 .await
355 .unwrap();
356 assert_eq!(count, 1);
357 }
358}