1use zeph_db::{query, query_as, query_scalar, sql};
5
6use super::DbStore;
7use crate::error::MemoryError;
8use crate::store::compression_guidelines::redact_sensitive;
9
10#[derive(Debug, Clone)]
12pub struct NewTrajectoryEntry<'a> {
13 pub conversation_id: Option<i64>,
14 pub turn_index: i64,
15 pub kind: &'a str,
16 pub intent: &'a str,
17 pub outcome: &'a str,
18 pub tools_used: &'a str,
19 pub confidence: f64,
20}
21
22#[derive(Debug, Clone, sqlx::FromRow)]
24pub struct TrajectoryEntryRow {
25 pub id: i64,
26 pub conversation_id: Option<i64>,
27 pub turn_index: i64,
28 pub kind: String,
29 pub intent: String,
30 pub outcome: String,
31 pub tools_used: String,
32 pub confidence: f64,
33 pub created_at: String,
34 pub updated_at: String,
35}
36
37impl DbStore {
38 pub async fn insert_trajectory_entry(
46 &self,
47 entry: NewTrajectoryEntry<'_>,
48 ) -> Result<i64, MemoryError> {
49 let intent = redact_sensitive(entry.intent);
51 let outcome = redact_sensitive(entry.outcome);
52
53 let (id,): (i64,) = query_as(sql!(
54 "INSERT INTO trajectory_memory
55 (conversation_id, turn_index, kind, intent, outcome, tools_used, confidence)
56 VALUES (?, ?, ?, ?, ?, ?, ?)
57 RETURNING id"
58 ))
59 .bind(entry.conversation_id)
60 .bind(entry.turn_index)
61 .bind(entry.kind)
62 .bind(intent.as_ref())
63 .bind(outcome.as_ref())
64 .bind(entry.tools_used)
65 .bind(entry.confidence)
66 .fetch_one(self.pool())
67 .await?;
68
69 Ok(id)
70 }
71
72 pub async fn load_trajectory_entries(
80 &self,
81 kind: Option<&str>,
82 limit: usize,
83 ) -> Result<Vec<TrajectoryEntryRow>, MemoryError> {
84 let rows: Vec<TrajectoryEntryRow> = match kind {
85 Some(k) => {
86 query_as(sql!(
87 "SELECT id, conversation_id, turn_index, kind, intent, outcome,
88 tools_used, confidence, created_at, updated_at
89 FROM trajectory_memory
90 WHERE kind = ?
91 ORDER BY confidence DESC, created_at DESC
92 LIMIT ?"
93 ))
94 .bind(k)
95 .bind(i64::try_from(limit).unwrap_or(i64::MAX))
96 .fetch_all(self.pool())
97 .await?
98 }
99 None => {
100 query_as(sql!(
101 "SELECT id, conversation_id, turn_index, kind, intent, outcome,
102 tools_used, confidence, created_at, updated_at
103 FROM trajectory_memory
104 ORDER BY confidence DESC, created_at DESC
105 LIMIT ?"
106 ))
107 .bind(i64::try_from(limit).unwrap_or(i64::MAX))
108 .fetch_all(self.pool())
109 .await?
110 }
111 };
112
113 Ok(rows)
114 }
115
116 pub async fn count_trajectory_entries(&self) -> Result<i64, MemoryError> {
122 let count: i64 = query_scalar(sql!("SELECT COUNT(*) FROM trajectory_memory"))
123 .fetch_one(self.pool())
124 .await?;
125
126 Ok(count)
127 }
128
129 pub async fn trajectory_last_extracted_message_id(
137 &self,
138 conversation_id: i64,
139 ) -> Result<i64, MemoryError> {
140 let id: Option<i64> = query_scalar(sql!(
141 "SELECT last_extracted_message_id
142 FROM trajectory_meta
143 WHERE conversation_id = ?"
144 ))
145 .bind(conversation_id)
146 .fetch_optional(self.pool())
147 .await?;
148
149 Ok(id.unwrap_or(0))
150 }
151
152 pub async fn set_trajectory_last_extracted_message_id(
158 &self,
159 conversation_id: i64,
160 message_id: i64,
161 ) -> Result<(), MemoryError> {
162 query(sql!(
163 "INSERT INTO trajectory_meta (conversation_id, last_extracted_message_id, updated_at)
164 VALUES (?, ?, datetime('now'))
165 ON CONFLICT(conversation_id) DO UPDATE SET
166 last_extracted_message_id = excluded.last_extracted_message_id,
167 updated_at = datetime('now')"
168 ))
169 .bind(conversation_id)
170 .bind(message_id)
171 .execute(self.pool())
172 .await?;
173
174 Ok(())
175 }
176}
177
178#[cfg(test)]
179mod tests {
180 use super::*;
181
182 async fn make_store() -> DbStore {
183 DbStore::with_pool_size(":memory:", 1)
184 .await
185 .expect("in-memory store")
186 }
187
188 #[tokio::test]
189 async fn insert_trajectory_entry_basic() {
190 let store = make_store().await;
191 let id = store
192 .insert_trajectory_entry(NewTrajectoryEntry {
193 conversation_id: None,
194 turn_index: 1,
195 kind: "procedural",
196 intent: "read a file",
197 outcome: "file read successfully",
198 tools_used: "[\"read_file\"]",
199 confidence: 0.9,
200 })
201 .await
202 .expect("insert");
203 assert!(id > 0);
204 assert_eq!(store.count_trajectory_entries().await.expect("count"), 1);
205 }
206
207 #[tokio::test]
208 async fn load_trajectory_entries_kind_filter() {
209 let store = make_store().await;
210 store
211 .insert_trajectory_entry(NewTrajectoryEntry {
212 conversation_id: None,
213 turn_index: 1,
214 kind: "procedural",
215 intent: "build a crate",
216 outcome: "built ok",
217 tools_used: "[\"shell\"]",
218 confidence: 0.8,
219 })
220 .await
221 .expect("insert procedural");
222 store
223 .insert_trajectory_entry(NewTrajectoryEntry {
224 conversation_id: None,
225 turn_index: 2,
226 kind: "episodic",
227 intent: "fixed a bug",
228 outcome: "patch applied",
229 tools_used: "[\"shell\"]",
230 confidence: 0.7,
231 })
232 .await
233 .expect("insert episodic");
234
235 let procedural = store
236 .load_trajectory_entries(Some("procedural"), 10)
237 .await
238 .expect("load procedural");
239 assert_eq!(procedural.len(), 1);
240 assert_eq!(procedural[0].kind, "procedural");
241
242 let all = store
243 .load_trajectory_entries(None, 10)
244 .await
245 .expect("load all");
246 assert_eq!(all.len(), 2);
247 }
248
249 #[tokio::test]
250 async fn trajectory_meta_per_conversation_tracking() {
251 let store = make_store().await;
252 let cid1 = store.create_conversation().await.expect("create conv 1").0;
254 let cid2 = store.create_conversation().await.expect("create conv 2").0;
255
256 assert_eq!(
258 store
259 .trajectory_last_extracted_message_id(cid1)
260 .await
261 .expect("meta 1"),
262 0
263 );
264 assert_eq!(
265 store
266 .trajectory_last_extracted_message_id(cid2)
267 .await
268 .expect("meta 2"),
269 0
270 );
271
272 store
274 .set_trajectory_last_extracted_message_id(cid1, 42)
275 .await
276 .expect("set meta 1");
277
278 assert_eq!(
279 store
280 .trajectory_last_extracted_message_id(cid1)
281 .await
282 .expect("meta 1 after"),
283 42
284 );
285 assert_eq!(
286 store
287 .trajectory_last_extracted_message_id(cid2)
288 .await
289 .expect("meta 2 after"),
290 0,
291 "conv2 must remain 0 after conv1 update"
292 );
293
294 store
296 .set_trajectory_last_extracted_message_id(cid2, 99)
297 .await
298 .expect("set meta 2");
299 assert_eq!(
300 store
301 .trajectory_last_extracted_message_id(cid2)
302 .await
303 .expect("meta 2 final"),
304 99
305 );
306 }
307
308 #[tokio::test]
309 async fn trajectory_meta_upsert_idempotent() {
310 let store = make_store().await;
311 let cid = store.create_conversation().await.expect("create conv").0;
312
313 store
314 .set_trajectory_last_extracted_message_id(cid, 10)
315 .await
316 .expect("first set");
317 store
318 .set_trajectory_last_extracted_message_id(cid, 20)
319 .await
320 .expect("second set");
321
322 assert_eq!(
323 store
324 .trajectory_last_extracted_message_id(cid)
325 .await
326 .expect("final"),
327 20
328 );
329 }
330}