Skip to main content

shadow_core/store/
sqlite.rs

1//! SQLite index over stored traces (SPEC.md §8).
2//!
3//! The filesystem store ([`super::fs::Store`]) is the authoritative source of
4//! truth for trace content; this index just lets `shadow` answer questions
5//! like "which traces have tag env=prod?" without scanning every file on
6//! disk. The `bundled` feature of rusqlite is on, so there's no system
7//! `sqlite3` dependency.
8
9use std::collections::HashMap;
10use std::path::Path;
11
12use rusqlite::{params, Connection, OptionalExtension};
13use thiserror::Error;
14
15const SCHEMA_SQL: &str = include_str!("schema.sql");
16
17/// Errors from [`Index`].
18#[derive(Debug, Error)]
19pub enum IndexError {
20    /// Underlying rusqlite failure.
21    #[error("sqlite error: {0}\nhint: the .shadow/index.sqlite file may be corrupt; delete it and re-register your traces")]
22    Sqlite(#[from] rusqlite::Error),
23}
24
25/// Result alias for index operations.
26pub type Result<T> = std::result::Result<T, IndexError>;
27
28/// SQLite-backed trace index. Cheap to construct; use one per process or
29/// thread.
30pub struct Index {
31    conn: Connection,
32}
33
34impl Index {
35    /// Open (or create) a SQLite database at `path`. Applies the schema
36    /// idempotently — safe to call on an existing DB.
37    pub fn open(path: impl AsRef<Path>) -> Result<Self> {
38        let conn = Connection::open(path)?;
39        conn.execute_batch(SCHEMA_SQL)?;
40        Ok(Self { conn })
41    }
42
43    /// Open an in-memory index (useful for tests).
44    pub fn open_in_memory() -> Result<Self> {
45        let conn = Connection::open_in_memory()?;
46        conn.execute_batch(SCHEMA_SQL)?;
47        Ok(Self { conn })
48    }
49
50    /// Register a trace. Existing rows with the same `id` are replaced
51    /// (idempotent insert). Tags are cleared and re-inserted.
52    pub fn register_trace(&mut self, trace: &TraceRecord) -> Result<()> {
53        let tx = self.conn.transaction()?;
54        tx.execute(
55            "INSERT OR REPLACE INTO traces (id, created_at, session_tag, root_record_id) VALUES (?1, ?2, ?3, ?4)",
56            params![trace.id, trace.created_at, trace.session_tag, trace.root_record_id],
57        )?;
58        tx.execute("DELETE FROM tags WHERE trace_id = ?1", params![trace.id])?;
59        {
60            let mut stmt =
61                tx.prepare("INSERT INTO tags (trace_id, key, value) VALUES (?1, ?2, ?3)")?;
62            for (k, v) in &trace.tags {
63                stmt.execute(params![trace.id, k, v])?;
64            }
65        }
66        tx.commit()?;
67        Ok(())
68    }
69
70    /// Look up a trace by id.
71    pub fn get_trace(&self, id: &str) -> Result<Option<TraceRecord>> {
72        let row: Option<(String, i64, Option<String>, String)> = self
73            .conn
74            .query_row(
75                "SELECT id, created_at, session_tag, root_record_id FROM traces WHERE id = ?1",
76                params![id],
77                |r| Ok((r.get(0)?, r.get(1)?, r.get(2)?, r.get(3)?)),
78            )
79            .optional()?;
80        let Some((id, created_at, session_tag, root_record_id)) = row else {
81            return Ok(None);
82        };
83        let tags = self.tags_for(&id)?;
84        Ok(Some(TraceRecord {
85            id,
86            created_at,
87            session_tag,
88            root_record_id,
89            tags,
90        }))
91    }
92
93    /// Trace ids with `tags.key = :key AND tags.value = :value`.
94    pub fn find_by_tag(&self, key: &str, value: &str) -> Result<Vec<String>> {
95        let mut stmt = self
96            .conn
97            .prepare("SELECT trace_id FROM tags WHERE key = ?1 AND value = ?2 ORDER BY trace_id")?;
98        let ids: std::result::Result<Vec<String>, _> = stmt
99            .query_map(params![key, value], |r| r.get::<_, String>(0))?
100            .collect();
101        Ok(ids?)
102    }
103
104    /// Trace ids with `traces.session_tag = :tag`.
105    pub fn find_by_session_tag(&self, tag: &str) -> Result<Vec<String>> {
106        let mut stmt = self
107            .conn
108            .prepare("SELECT id FROM traces WHERE session_tag = ?1 ORDER BY created_at DESC")?;
109        let ids: std::result::Result<Vec<String>, _> = stmt
110            .query_map(params![tag], |r| r.get::<_, String>(0))?
111            .collect();
112        Ok(ids?)
113    }
114
115    /// The `limit` most recently-created trace ids.
116    pub fn recent(&self, limit: u32) -> Result<Vec<String>> {
117        let mut stmt = self
118            .conn
119            .prepare("SELECT id FROM traces ORDER BY created_at DESC LIMIT ?1")?;
120        let ids: std::result::Result<Vec<String>, _> = stmt
121            .query_map(params![limit], |r| r.get::<_, String>(0))?
122            .collect();
123        Ok(ids?)
124    }
125
126    /// Register a replay.
127    pub fn register_replay(&mut self, replay: &ReplayRecord) -> Result<()> {
128        self.conn.execute(
129            "INSERT OR REPLACE INTO replays (id, baseline_trace_id, config_hash, outcome_record_id, created_at) VALUES (?1, ?2, ?3, ?4, ?5)",
130            params![
131                replay.id,
132                replay.baseline_trace_id,
133                replay.config_hash,
134                replay.outcome_record_id,
135                replay.created_at
136            ],
137        )?;
138        Ok(())
139    }
140
141    /// Replay records whose baseline is `baseline_trace_id`.
142    pub fn replays_of(&self, baseline_trace_id: &str) -> Result<Vec<ReplayRecord>> {
143        let mut stmt = self.conn.prepare(
144            "SELECT id, baseline_trace_id, config_hash, outcome_record_id, created_at FROM replays WHERE baseline_trace_id = ?1 ORDER BY created_at DESC",
145        )?;
146        let rows = stmt.query_map(params![baseline_trace_id], |r| {
147            Ok(ReplayRecord {
148                id: r.get(0)?,
149                baseline_trace_id: r.get(1)?,
150                config_hash: r.get(2)?,
151                outcome_record_id: r.get(3)?,
152                created_at: r.get(4)?,
153            })
154        })?;
155        let mut out = Vec::new();
156        for row in rows {
157            out.push(row?);
158        }
159        Ok(out)
160    }
161
162    fn tags_for(&self, trace_id: &str) -> Result<HashMap<String, String>> {
163        let mut stmt = self
164            .conn
165            .prepare("SELECT key, value FROM tags WHERE trace_id = ?1")?;
166        let mut map = HashMap::new();
167        for row in stmt.query_map(params![trace_id], |r| {
168            Ok((r.get::<_, String>(0)?, r.get::<_, String>(1)?))
169        })? {
170            let (k, v) = row?;
171            map.insert(k, v);
172        }
173        Ok(map)
174    }
175}
176
177/// One row in the `traces` table (plus its tags).
178#[derive(Debug, Clone, PartialEq, Eq)]
179pub struct TraceRecord {
180    /// Trace id (content id of the root record).
181    pub id: String,
182    /// Unix epoch millis.
183    pub created_at: i64,
184    /// Optional session tag (matches `metadata.payload.tags.session_tag`).
185    pub session_tag: Option<String>,
186    /// Root record id (== `id` for canonical traces, but we store it
187    /// separately so the invariant is queryable).
188    pub root_record_id: String,
189    /// Tags as a key→value map.
190    pub tags: HashMap<String, String>,
191}
192
193/// One row in the `replays` table.
194#[derive(Debug, Clone, PartialEq, Eq)]
195pub struct ReplayRecord {
196    /// Replay id (a UUID or content hash — producer's choice).
197    pub id: String,
198    /// Baseline trace this replay was run against.
199    pub baseline_trace_id: String,
200    /// Content hash of the candidate config that drove the replay.
201    pub config_hash: String,
202    /// Content id of the `replay_summary` record, if one has been written.
203    pub outcome_record_id: Option<String>,
204    /// Unix epoch millis.
205    pub created_at: i64,
206}
207
208#[cfg(test)]
209mod tests {
210    use super::*;
211
212    fn make_trace(id: &str, session_tag: Option<&str>) -> TraceRecord {
213        TraceRecord {
214            id: id.to_string(),
215            created_at: 1_700_000_000_000,
216            session_tag: session_tag.map(ToString::to_string),
217            root_record_id: id.to_string(),
218            tags: HashMap::new(),
219        }
220    }
221
222    #[test]
223    fn open_in_memory_applies_schema() {
224        let idx = Index::open_in_memory().unwrap();
225        // Schema exists if we can query the table without error.
226        let count: i64 = idx
227            .conn
228            .query_row("SELECT COUNT(*) FROM traces", [], |r| r.get(0))
229            .unwrap();
230        assert_eq!(count, 0);
231    }
232
233    #[test]
234    fn register_and_get_trace() {
235        let mut idx = Index::open_in_memory().unwrap();
236        let mut trace = make_trace("sha256:aaaa", Some("prod-agent-0"));
237        trace.tags.insert("env".to_string(), "prod".to_string());
238        trace
239            .tags
240            .insert("region".to_string(), "us-east".to_string());
241        idx.register_trace(&trace).unwrap();
242
243        let back = idx.get_trace("sha256:aaaa").unwrap().unwrap();
244        assert_eq!(back.id, trace.id);
245        assert_eq!(back.session_tag.as_deref(), Some("prod-agent-0"));
246        assert_eq!(back.tags, trace.tags);
247    }
248
249    #[test]
250    fn get_missing_trace_returns_none() {
251        let idx = Index::open_in_memory().unwrap();
252        assert!(idx.get_trace("sha256:does-not-exist").unwrap().is_none());
253    }
254
255    #[test]
256    fn register_is_idempotent_and_refreshes_tags() {
257        let mut idx = Index::open_in_memory().unwrap();
258        let mut trace = make_trace("sha256:aaaa", None);
259        trace.tags.insert("env".to_string(), "prod".to_string());
260        idx.register_trace(&trace).unwrap();
261
262        // Re-register with different tags — old ones should go.
263        trace.tags.clear();
264        trace.tags.insert("env".to_string(), "dev".to_string());
265        idx.register_trace(&trace).unwrap();
266
267        let back = idx.get_trace("sha256:aaaa").unwrap().unwrap();
268        assert_eq!(back.tags.get("env").map(String::as_str), Some("dev"));
269        assert_eq!(back.tags.len(), 1);
270    }
271
272    #[test]
273    fn find_by_tag() {
274        let mut idx = Index::open_in_memory().unwrap();
275        let mut a = make_trace("sha256:a", None);
276        a.tags.insert("env".into(), "prod".into());
277        let mut b = make_trace("sha256:b", None);
278        b.tags.insert("env".into(), "prod".into());
279        let mut c = make_trace("sha256:c", None);
280        c.tags.insert("env".into(), "dev".into());
281        for t in [&a, &b, &c] {
282            idx.register_trace(t).unwrap();
283        }
284        let mut prod = idx.find_by_tag("env", "prod").unwrap();
285        prod.sort();
286        assert_eq!(prod, vec!["sha256:a", "sha256:b"]);
287        assert_eq!(idx.find_by_tag("env", "dev").unwrap(), vec!["sha256:c"]);
288        assert_eq!(idx.find_by_tag("env", "staging").unwrap().len(), 0);
289    }
290
291    #[test]
292    fn find_by_session_tag_and_recent_respect_ordering() {
293        let mut idx = Index::open_in_memory().unwrap();
294        // Insert in order old → new; `recent` must return new first.
295        for (i, id) in ["sha256:a", "sha256:b", "sha256:c"].iter().enumerate() {
296            let mut t = make_trace(id, Some("agent-0"));
297            t.created_at = 1_700_000_000_000 + i as i64 * 1000;
298            idx.register_trace(&t).unwrap();
299        }
300        assert_eq!(
301            idx.find_by_session_tag("agent-0").unwrap(),
302            vec!["sha256:c", "sha256:b", "sha256:a"]
303        );
304        assert_eq!(idx.recent(2).unwrap(), vec!["sha256:c", "sha256:b"]);
305    }
306
307    #[test]
308    fn register_and_query_replays() {
309        let mut idx = Index::open_in_memory().unwrap();
310        let trace = make_trace("sha256:baseline", None);
311        idx.register_trace(&trace).unwrap();
312        for i in 0..3 {
313            idx.register_replay(&ReplayRecord {
314                id: format!("replay-{i}"),
315                baseline_trace_id: trace.id.clone(),
316                config_hash: format!("sha256:cfg-{i}"),
317                outcome_record_id: None,
318                created_at: 1_700_000_000_000 + i * 1000,
319            })
320            .unwrap();
321        }
322        let replays = idx.replays_of(&trace.id).unwrap();
323        assert_eq!(replays.len(), 3);
324        // Descending by created_at.
325        assert!(replays[0].created_at > replays[2].created_at);
326    }
327
328    #[test]
329    fn cascade_delete_of_trace_removes_tags_and_replays() {
330        let mut idx = Index::open_in_memory().unwrap();
331        let mut trace = make_trace("sha256:x", None);
332        trace.tags.insert("k".into(), "v".into());
333        idx.register_trace(&trace).unwrap();
334        idx.register_replay(&ReplayRecord {
335            id: "r0".into(),
336            baseline_trace_id: trace.id.clone(),
337            config_hash: "sha256:c".into(),
338            outcome_record_id: None,
339            created_at: 1,
340        })
341        .unwrap();
342        idx.conn
343            .execute("DELETE FROM traces WHERE id = ?1", params![trace.id])
344            .unwrap();
345        assert!(idx.find_by_tag("k", "v").unwrap().is_empty());
346        assert!(idx.replays_of(&trace.id).unwrap().is_empty());
347    }
348}