Skip to main content

zeph_memory/store/
trajectory.rs

1// SPDX-FileCopyrightText: 2026 Andrei G <bug-ops>
2// SPDX-License-Identifier: MIT OR Apache-2.0
3
4use zeph_db::{query, query_as, query_scalar, sql};
5
6use super::DbStore;
7use crate::error::MemoryError;
8use crate::store::compression_guidelines::redact_sensitive;
9
10/// Input for inserting a trajectory entry.
11#[derive(Debug, Clone)]
12pub struct NewTrajectoryEntry<'a> {
13    pub conversation_id: Option<i64>,
14    pub turn_index: i64,
15    pub kind: &'a str,
16    pub intent: &'a str,
17    pub outcome: &'a str,
18    pub tools_used: &'a str,
19    pub confidence: f64,
20}
21
22/// A single trajectory memory row from the `trajectory_memory` table.
23#[derive(Debug, Clone, sqlx::FromRow)]
24pub struct TrajectoryEntryRow {
25    pub id: i64,
26    pub conversation_id: Option<i64>,
27    pub turn_index: i64,
28    pub kind: String,
29    pub intent: String,
30    pub outcome: String,
31    pub tools_used: String,
32    pub confidence: f64,
33    pub created_at: String,
34    pub updated_at: String,
35}
36
37impl DbStore {
38    /// Insert a trajectory entry.
39    ///
40    /// Returns the id of the inserted row.
41    ///
42    /// # Errors
43    ///
44    /// Returns an error if the query fails.
45    pub async fn insert_trajectory_entry(
46        &self,
47        entry: NewTrajectoryEntry<'_>,
48    ) -> Result<i64, MemoryError> {
49        // Redact potential secrets echoed by the LLM from tool outputs before persisting.
50        let intent = redact_sensitive(entry.intent);
51        let outcome = redact_sensitive(entry.outcome);
52
53        let (id,): (i64,) = query_as(sql!(
54            "INSERT INTO trajectory_memory
55                (conversation_id, turn_index, kind, intent, outcome, tools_used, confidence)
56             VALUES (?, ?, ?, ?, ?, ?, ?)
57             RETURNING id"
58        ))
59        .bind(entry.conversation_id)
60        .bind(entry.turn_index)
61        .bind(entry.kind)
62        .bind(intent.as_ref())
63        .bind(outcome.as_ref())
64        .bind(entry.tools_used)
65        .bind(entry.confidence)
66        .fetch_one(self.pool())
67        .await?;
68
69        Ok(id)
70    }
71
72    /// Load trajectory entries, optionally filtered by kind.
73    ///
74    /// Results are ordered by confidence DESC, then `created_at` DESC.
75    ///
76    /// # Errors
77    ///
78    /// Returns an error if the query fails.
79    pub async fn load_trajectory_entries(
80        &self,
81        kind: Option<&str>,
82        limit: usize,
83    ) -> Result<Vec<TrajectoryEntryRow>, MemoryError> {
84        let rows: Vec<TrajectoryEntryRow> = match kind {
85            Some(k) => {
86                query_as(sql!(
87                    "SELECT id, conversation_id, turn_index, kind, intent, outcome,
88                        tools_used, confidence, created_at, updated_at
89                 FROM trajectory_memory
90                 WHERE kind = ?
91                 ORDER BY confidence DESC, created_at DESC
92                 LIMIT ?"
93                ))
94                .bind(k)
95                .bind(i64::try_from(limit).unwrap_or(i64::MAX))
96                .fetch_all(self.pool())
97                .await?
98            }
99            None => {
100                query_as(sql!(
101                    "SELECT id, conversation_id, turn_index, kind, intent, outcome,
102                        tools_used, confidence, created_at, updated_at
103                 FROM trajectory_memory
104                 ORDER BY confidence DESC, created_at DESC
105                 LIMIT ?"
106                ))
107                .bind(i64::try_from(limit).unwrap_or(i64::MAX))
108                .fetch_all(self.pool())
109                .await?
110            }
111        };
112
113        Ok(rows)
114    }
115
116    /// Count total trajectory entries (for metrics/TUI).
117    ///
118    /// # Errors
119    ///
120    /// Returns an error if the query fails.
121    pub async fn count_trajectory_entries(&self) -> Result<i64, MemoryError> {
122        let count: i64 = query_scalar(sql!("SELECT COUNT(*) FROM trajectory_memory"))
123            .fetch_one(self.pool())
124            .await?;
125
126        Ok(count)
127    }
128
129    /// Read the last extracted message id for a given conversation from `trajectory_meta`.
130    ///
131    /// Returns `0` if no row exists for the conversation yet.
132    ///
133    /// # Errors
134    ///
135    /// Returns an error if the query fails.
136    pub async fn trajectory_last_extracted_message_id(
137        &self,
138        conversation_id: i64,
139    ) -> Result<i64, MemoryError> {
140        let id: Option<i64> = query_scalar(sql!(
141            "SELECT last_extracted_message_id
142             FROM trajectory_meta
143             WHERE conversation_id = ?"
144        ))
145        .bind(conversation_id)
146        .fetch_optional(self.pool())
147        .await?;
148
149        Ok(id.unwrap_or(0))
150    }
151
152    /// Upsert the last extracted message id for a given conversation in `trajectory_meta`.
153    ///
154    /// # Errors
155    ///
156    /// Returns an error if the query fails.
157    pub async fn set_trajectory_last_extracted_message_id(
158        &self,
159        conversation_id: i64,
160        message_id: i64,
161    ) -> Result<(), MemoryError> {
162        query(sql!(
163            "INSERT INTO trajectory_meta (conversation_id, last_extracted_message_id, updated_at)
164             VALUES (?, ?, datetime('now'))
165             ON CONFLICT(conversation_id) DO UPDATE SET
166                 last_extracted_message_id = excluded.last_extracted_message_id,
167                 updated_at = datetime('now')"
168        ))
169        .bind(conversation_id)
170        .bind(message_id)
171        .execute(self.pool())
172        .await?;
173
174        Ok(())
175    }
176}
177
178#[cfg(test)]
179mod tests {
180    use super::*;
181
182    async fn make_store() -> DbStore {
183        DbStore::with_pool_size(":memory:", 1)
184            .await
185            .expect("in-memory store")
186    }
187
188    #[tokio::test]
189    async fn insert_trajectory_entry_basic() {
190        let store = make_store().await;
191        let id = store
192            .insert_trajectory_entry(NewTrajectoryEntry {
193                conversation_id: None,
194                turn_index: 1,
195                kind: "procedural",
196                intent: "read a file",
197                outcome: "file read successfully",
198                tools_used: "[\"read_file\"]",
199                confidence: 0.9,
200            })
201            .await
202            .expect("insert");
203        assert!(id > 0);
204        assert_eq!(store.count_trajectory_entries().await.expect("count"), 1);
205    }
206
207    #[tokio::test]
208    async fn load_trajectory_entries_kind_filter() {
209        let store = make_store().await;
210        store
211            .insert_trajectory_entry(NewTrajectoryEntry {
212                conversation_id: None,
213                turn_index: 1,
214                kind: "procedural",
215                intent: "build a crate",
216                outcome: "built ok",
217                tools_used: "[\"shell\"]",
218                confidence: 0.8,
219            })
220            .await
221            .expect("insert procedural");
222        store
223            .insert_trajectory_entry(NewTrajectoryEntry {
224                conversation_id: None,
225                turn_index: 2,
226                kind: "episodic",
227                intent: "fixed a bug",
228                outcome: "patch applied",
229                tools_used: "[\"shell\"]",
230                confidence: 0.7,
231            })
232            .await
233            .expect("insert episodic");
234
235        let procedural = store
236            .load_trajectory_entries(Some("procedural"), 10)
237            .await
238            .expect("load procedural");
239        assert_eq!(procedural.len(), 1);
240        assert_eq!(procedural[0].kind, "procedural");
241
242        let all = store
243            .load_trajectory_entries(None, 10)
244            .await
245            .expect("load all");
246        assert_eq!(all.len(), 2);
247    }
248
249    #[tokio::test]
250    async fn trajectory_meta_per_conversation_tracking() {
251        let store = make_store().await;
252        // Two conversations — create real rows to satisfy FK.
253        let cid1 = store.create_conversation().await.expect("create conv 1").0;
254        let cid2 = store.create_conversation().await.expect("create conv 2").0;
255
256        // Initial value is 0 for both.
257        assert_eq!(
258            store
259                .trajectory_last_extracted_message_id(cid1)
260                .await
261                .expect("meta 1"),
262            0
263        );
264        assert_eq!(
265            store
266                .trajectory_last_extracted_message_id(cid2)
267                .await
268                .expect("meta 2"),
269            0
270        );
271
272        // Set for conv 1 — must not affect conv 2.
273        store
274            .set_trajectory_last_extracted_message_id(cid1, 42)
275            .await
276            .expect("set meta 1");
277
278        assert_eq!(
279            store
280                .trajectory_last_extracted_message_id(cid1)
281                .await
282                .expect("meta 1 after"),
283            42
284        );
285        assert_eq!(
286            store
287                .trajectory_last_extracted_message_id(cid2)
288                .await
289                .expect("meta 2 after"),
290            0,
291            "conv2 must remain 0 after conv1 update"
292        );
293
294        // Update conv 2 independently.
295        store
296            .set_trajectory_last_extracted_message_id(cid2, 99)
297            .await
298            .expect("set meta 2");
299        assert_eq!(
300            store
301                .trajectory_last_extracted_message_id(cid2)
302                .await
303                .expect("meta 2 final"),
304            99
305        );
306    }
307
308    #[tokio::test]
309    async fn trajectory_meta_upsert_idempotent() {
310        let store = make_store().await;
311        let cid = store.create_conversation().await.expect("create conv").0;
312
313        store
314            .set_trajectory_last_extracted_message_id(cid, 10)
315            .await
316            .expect("first set");
317        store
318            .set_trajectory_last_extracted_message_id(cid, 20)
319            .await
320            .expect("second set");
321
322        assert_eq!(
323            store
324                .trajectory_last_extracted_message_id(cid)
325                .await
326                .expect("final"),
327            20
328        );
329    }
330}