use std::time::{SystemTime, UNIX_EPOCH};
use zeph_db::{DbPool, sql};
use crate::error::MemoryError;
use crate::graph::store::GraphStore;
use crate::types::{EntityId, ExperienceId};
#[derive(Debug, Default)]
pub struct EvolutionSweepStats {
pub pruned_self_loops: usize,
pub pruned_low_confidence: usize,
}
pub struct ExperienceStore {
pool: DbPool,
}
impl ExperienceStore {
#[must_use]
pub fn new(pool: DbPool) -> Self {
Self { pool }
}
#[tracing::instrument(
skip_all,
name = "memory.experience.record",
fields(tool_name, outcome)
)]
pub async fn record_tool_outcome(
&self,
session_id: &str,
turn: i64,
tool_name: &str,
outcome: &str,
detail: Option<&str>,
error_ctx: Option<&str>,
) -> Result<ExperienceId, MemoryError> {
let now = SystemTime::now()
.duration_since(UNIX_EPOCH)
.map_or(0, |d| d.as_secs().cast_signed());
let id: i64 = zeph_db::query_scalar(sql!(
"INSERT INTO experience_nodes
(session_id, turn, tool_name, outcome, detail, error_ctx, created_at)
VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)
RETURNING id"
))
.bind(session_id)
.bind(turn)
.bind(tool_name)
.bind(outcome)
.bind(detail)
.bind(error_ctx)
.bind(now)
.fetch_one(&self.pool)
.await
.map_err(MemoryError::from)?;
Ok(ExperienceId(id))
}
pub async fn link_to_entities(
&self,
experience_id: ExperienceId,
entity_ids: &[EntityId],
) -> Result<(), MemoryError> {
let _span =
tracing::info_span!("memory.experience.link_entities", exp = experience_id.0).entered();
for &entity_id in entity_ids {
zeph_db::query(sql!(
"INSERT INTO experience_entity_links
(experience_id, entity_id) VALUES (?1, ?2)
ON CONFLICT (experience_id, entity_id) DO NOTHING"
))
.bind(experience_id.0)
.bind(entity_id.0)
.execute(&self.pool)
.await
.map_err(MemoryError::from)?;
}
Ok(())
}
pub async fn link_sequential(
&self,
prev: ExperienceId,
next: ExperienceId,
) -> Result<(), MemoryError> {
zeph_db::query(sql!(
"INSERT INTO experience_edges
(source_exp_id, target_exp_id, relation)
VALUES (?1, ?2, 'followed_by')"
))
.bind(prev.0)
.bind(next.0)
.execute(&self.pool)
.await
.map_err(MemoryError::from)?;
Ok(())
}
#[tracing::instrument(skip_all, name = "memory.experience.sweep")]
pub async fn evolution_sweep(
&self,
graph_store: &GraphStore,
confidence_threshold: f32,
) -> Result<EvolutionSweepStats, MemoryError> {
let self_loops = zeph_db::query(sql!(
"DELETE FROM graph_edges WHERE source_entity_id = target_entity_id"
))
.execute(graph_store.pool())
.await
.map_err(MemoryError::from)?
.rows_affected();
let low_conf = zeph_db::query(sql!(
"DELETE FROM graph_edges
WHERE confidence < ?1 AND retrieval_count = 0 AND valid_to IS NULL"
))
.bind(confidence_threshold)
.execute(graph_store.pool())
.await
.map_err(MemoryError::from)?
.rows_affected();
Ok(EvolutionSweepStats {
pruned_self_loops: usize::try_from(self_loops).unwrap_or(usize::MAX),
pruned_low_confidence: usize::try_from(low_conf).unwrap_or(usize::MAX),
})
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::graph::store::GraphStore;
use crate::graph::types::EntityType;
use crate::store::SqliteStore;
use zeph_db::sql;
async fn setup() -> (ExperienceStore, GraphStore, DbPool) {
let store = SqliteStore::new(":memory:").await.unwrap();
let pool = store.pool().clone();
let exp = ExperienceStore::new(pool.clone());
let gs = GraphStore::new(pool.clone());
(exp, gs, pool)
}
#[tokio::test]
async fn record_tool_outcome_inserts_experience_node() {
let (exp, _gs, pool) = setup().await;
let id = exp
.record_tool_outcome("sess1", 1, "shell", "success", Some("exit 0"), None)
.await
.unwrap();
assert!(id.0 > 0);
let (sid, turn, tool, outcome): (String, i64, String, String) = sqlx::query_as(sql!(
"SELECT session_id, turn, tool_name, outcome
FROM experience_nodes WHERE id = ?1"
))
.bind(id.0)
.fetch_one(&pool)
.await
.unwrap();
assert_eq!(sid, "sess1");
assert_eq!(turn, 1);
assert_eq!(tool, "shell");
assert_eq!(outcome, "success");
}
#[tokio::test]
async fn link_to_entities_populates_link_table() {
let (exp, gs, pool) = setup().await;
let exp_id = exp
.record_tool_outcome("sess2", 1, "shell", "success", None, None)
.await
.unwrap();
let e1 = gs
.upsert_entity("Alice", "alice", EntityType::Person, None)
.await
.unwrap();
let e2 = gs
.upsert_entity("Bob", "bob", EntityType::Person, None)
.await
.unwrap();
exp.link_to_entities(exp_id, &[e1, e2]).await.unwrap();
let count: i64 = sqlx::query_scalar(sql!(
"SELECT COUNT(*) FROM experience_entity_links WHERE experience_id = ?1"
))
.bind(exp_id.0)
.fetch_one(&pool)
.await
.unwrap();
assert_eq!(count, 2, "both entity links must be inserted");
exp.link_to_entities(exp_id, &[e1]).await.unwrap();
let count2: i64 = sqlx::query_scalar(sql!(
"SELECT COUNT(*) FROM experience_entity_links WHERE experience_id = ?1"
))
.bind(exp_id.0)
.fetch_one(&pool)
.await
.unwrap();
assert_eq!(count2, 2, "INSERT OR IGNORE must prevent duplicate links");
}
#[tokio::test]
async fn link_sequential_creates_experience_edge() {
let (exp, _gs, pool) = setup().await;
let id1 = exp
.record_tool_outcome("sess1", 1, "shell", "success", None, None)
.await
.unwrap();
let id2 = exp
.record_tool_outcome("sess1", 2, "web_scrape", "success", None, None)
.await
.unwrap();
exp.link_sequential(id1, id2).await.unwrap();
let (src, tgt, rel): (i64, i64, String) = sqlx::query_as(sql!(
"SELECT source_exp_id, target_exp_id, relation
FROM experience_edges WHERE source_exp_id = ?1"
))
.bind(id1.0)
.fetch_one(&pool)
.await
.unwrap();
assert_eq!(src, id1.0);
assert_eq!(tgt, id2.0);
assert_eq!(rel, "followed_by");
}
#[tokio::test]
async fn evolution_sweep_prunes_self_loops() {
let (exp, gs, pool) = setup().await;
let e1 = gs
.upsert_entity("Alice", "alice", EntityType::Person, Some("person"))
.await
.unwrap();
let e2 = gs
.upsert_entity("Bob", "bob", EntityType::Person, Some("person"))
.await
.unwrap();
sqlx::query(sql!("DROP TRIGGER IF EXISTS graph_edges_no_self_loops"))
.execute(&pool)
.await
.unwrap();
sqlx::query(sql!(
"INSERT INTO graph_edges
(source_entity_id, target_entity_id, relation, fact, confidence, edge_type)
VALUES (?1, ?1, 'knows', 'self', 0.5, 'semantic')"
))
.bind(e1.0)
.execute(&pool)
.await
.unwrap();
sqlx::query(sql!(
"CREATE TRIGGER IF NOT EXISTS graph_edges_no_self_loops
BEFORE INSERT ON graph_edges
BEGIN
SELECT RAISE(ABORT, 'self-loop edge rejected: source and target entity must differ')
WHERE NEW.source_entity_id = NEW.target_entity_id;
END"
))
.execute(&pool)
.await
.unwrap();
gs.insert_edge(e1.0, e2.0, "knows", "Alice knows Bob", 0.9, None)
.await
.unwrap();
let stats = exp.evolution_sweep(&gs, 0.3).await.unwrap();
assert_eq!(stats.pruned_self_loops, 1);
assert_eq!(stats.pruned_low_confidence, 0);
let count: i64 = sqlx::query_scalar(sql!(
"SELECT COUNT(*) FROM graph_edges
WHERE source_entity_id = ?1 AND target_entity_id = ?2"
))
.bind(e1.0)
.bind(e2.0)
.fetch_one(&pool)
.await
.unwrap();
assert_eq!(count, 1);
}
}