use zeph_db::{query, query_as, query_scalar, sql};
use super::DbStore;
use crate::error::MemoryError;
use crate::store::compression_guidelines::redact_sensitive;
#[derive(Debug, Clone)]
pub struct NewTrajectoryEntry<'a> {
pub conversation_id: Option<i64>,
pub turn_index: i64,
pub kind: &'a str,
pub intent: &'a str,
pub outcome: &'a str,
pub tools_used: &'a str,
pub confidence: f64,
}
#[derive(Debug, Clone, sqlx::FromRow)]
pub struct TrajectoryEntryRow {
pub id: i64,
pub conversation_id: Option<i64>,
pub turn_index: i64,
pub kind: String,
pub intent: String,
pub outcome: String,
pub tools_used: String,
pub confidence: f64,
pub created_at: String,
pub updated_at: String,
}
impl DbStore {
pub async fn insert_trajectory_entry(
&self,
entry: NewTrajectoryEntry<'_>,
) -> Result<i64, MemoryError> {
let intent = redact_sensitive(entry.intent);
let outcome = redact_sensitive(entry.outcome);
let (id,): (i64,) = query_as(sql!(
"INSERT INTO trajectory_memory
(conversation_id, turn_index, kind, intent, outcome, tools_used, confidence)
VALUES (?, ?, ?, ?, ?, ?, ?)
RETURNING id"
))
.bind(entry.conversation_id)
.bind(entry.turn_index)
.bind(entry.kind)
.bind(intent.as_ref())
.bind(outcome.as_ref())
.bind(entry.tools_used)
.bind(entry.confidence)
.fetch_one(self.pool())
.await?;
Ok(id)
}
pub async fn load_trajectory_entries(
&self,
kind: Option<&str>,
limit: usize,
) -> Result<Vec<TrajectoryEntryRow>, MemoryError> {
let rows: Vec<TrajectoryEntryRow> = match kind {
Some(k) => {
query_as(sql!(
"SELECT id, conversation_id, turn_index, kind, intent, outcome,
tools_used, confidence, created_at, updated_at
FROM trajectory_memory
WHERE kind = ?
ORDER BY confidence DESC, created_at DESC
LIMIT ?"
))
.bind(k)
.bind(i64::try_from(limit).unwrap_or(i64::MAX))
.fetch_all(self.pool())
.await?
}
None => {
query_as(sql!(
"SELECT id, conversation_id, turn_index, kind, intent, outcome,
tools_used, confidence, created_at, updated_at
FROM trajectory_memory
ORDER BY confidence DESC, created_at DESC
LIMIT ?"
))
.bind(i64::try_from(limit).unwrap_or(i64::MAX))
.fetch_all(self.pool())
.await?
}
};
Ok(rows)
}
pub async fn count_trajectory_entries(&self) -> Result<i64, MemoryError> {
let count: i64 = query_scalar(sql!("SELECT COUNT(*) FROM trajectory_memory"))
.fetch_one(self.pool())
.await?;
Ok(count)
}
pub async fn trajectory_last_extracted_message_id(
&self,
conversation_id: i64,
) -> Result<i64, MemoryError> {
let id: Option<i64> = query_scalar(sql!(
"SELECT last_extracted_message_id
FROM trajectory_meta
WHERE conversation_id = ?"
))
.bind(conversation_id)
.fetch_optional(self.pool())
.await?;
Ok(id.unwrap_or(0))
}
pub async fn set_trajectory_last_extracted_message_id(
&self,
conversation_id: i64,
message_id: i64,
) -> Result<(), MemoryError> {
query(sql!(
"INSERT INTO trajectory_meta (conversation_id, last_extracted_message_id, updated_at)
VALUES (?, ?, datetime('now'))
ON CONFLICT(conversation_id) DO UPDATE SET
last_extracted_message_id = excluded.last_extracted_message_id,
updated_at = datetime('now')"
))
.bind(conversation_id)
.bind(message_id)
.execute(self.pool())
.await?;
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
async fn make_store() -> DbStore {
DbStore::with_pool_size(":memory:", 1)
.await
.expect("in-memory store")
}
#[tokio::test]
async fn insert_trajectory_entry_basic() {
let store = make_store().await;
let id = store
.insert_trajectory_entry(NewTrajectoryEntry {
conversation_id: None,
turn_index: 1,
kind: "procedural",
intent: "read a file",
outcome: "file read successfully",
tools_used: "[\"read_file\"]",
confidence: 0.9,
})
.await
.expect("insert");
assert!(id > 0);
assert_eq!(store.count_trajectory_entries().await.expect("count"), 1);
}
#[tokio::test]
async fn load_trajectory_entries_kind_filter() {
let store = make_store().await;
store
.insert_trajectory_entry(NewTrajectoryEntry {
conversation_id: None,
turn_index: 1,
kind: "procedural",
intent: "build a crate",
outcome: "built ok",
tools_used: "[\"shell\"]",
confidence: 0.8,
})
.await
.expect("insert procedural");
store
.insert_trajectory_entry(NewTrajectoryEntry {
conversation_id: None,
turn_index: 2,
kind: "episodic",
intent: "fixed a bug",
outcome: "patch applied",
tools_used: "[\"shell\"]",
confidence: 0.7,
})
.await
.expect("insert episodic");
let procedural = store
.load_trajectory_entries(Some("procedural"), 10)
.await
.expect("load procedural");
assert_eq!(procedural.len(), 1);
assert_eq!(procedural[0].kind, "procedural");
let all = store
.load_trajectory_entries(None, 10)
.await
.expect("load all");
assert_eq!(all.len(), 2);
}
#[tokio::test]
async fn trajectory_meta_per_conversation_tracking() {
let store = make_store().await;
let cid1 = store.create_conversation().await.expect("create conv 1").0;
let cid2 = store.create_conversation().await.expect("create conv 2").0;
assert_eq!(
store
.trajectory_last_extracted_message_id(cid1)
.await
.expect("meta 1"),
0
);
assert_eq!(
store
.trajectory_last_extracted_message_id(cid2)
.await
.expect("meta 2"),
0
);
store
.set_trajectory_last_extracted_message_id(cid1, 42)
.await
.expect("set meta 1");
assert_eq!(
store
.trajectory_last_extracted_message_id(cid1)
.await
.expect("meta 1 after"),
42
);
assert_eq!(
store
.trajectory_last_extracted_message_id(cid2)
.await
.expect("meta 2 after"),
0,
"conv2 must remain 0 after conv1 update"
);
store
.set_trajectory_last_extracted_message_id(cid2, 99)
.await
.expect("set meta 2");
assert_eq!(
store
.trajectory_last_extracted_message_id(cid2)
.await
.expect("meta 2 final"),
99
);
}
#[tokio::test]
async fn trajectory_meta_upsert_idempotent() {
let store = make_store().await;
let cid = store.create_conversation().await.expect("create conv").0;
store
.set_trajectory_last_extracted_message_id(cid, 10)
.await
.expect("first set");
store
.set_trajectory_last_extracted_message_id(cid, 20)
.await
.expect("second set");
assert_eq!(
store
.trajectory_last_extracted_message_id(cid)
.await
.expect("final"),
20
);
}
}