1use std::collections::HashMap;
10use std::path::Path;
11
12use rusqlite::{params, Connection, OptionalExtension};
13use thiserror::Error;
14
15const SCHEMA_SQL: &str = include_str!("schema.sql");
16
17#[derive(Debug, Error)]
19pub enum IndexError {
20 #[error("sqlite error: {0}\nhint: the .shadow/index.sqlite file may be corrupt; delete it and re-register your traces")]
22 Sqlite(#[from] rusqlite::Error),
23}
24
25pub type Result<T> = std::result::Result<T, IndexError>;
27
28pub struct Index {
31 conn: Connection,
32}
33
34impl Index {
35 pub fn open(path: impl AsRef<Path>) -> Result<Self> {
38 let conn = Connection::open(path)?;
39 conn.execute_batch(SCHEMA_SQL)?;
40 Ok(Self { conn })
41 }
42
43 pub fn open_in_memory() -> Result<Self> {
45 let conn = Connection::open_in_memory()?;
46 conn.execute_batch(SCHEMA_SQL)?;
47 Ok(Self { conn })
48 }
49
50 pub fn register_trace(&mut self, trace: &TraceRecord) -> Result<()> {
53 let tx = self.conn.transaction()?;
54 tx.execute(
55 "INSERT OR REPLACE INTO traces (id, created_at, session_tag, root_record_id) VALUES (?1, ?2, ?3, ?4)",
56 params![trace.id, trace.created_at, trace.session_tag, trace.root_record_id],
57 )?;
58 tx.execute("DELETE FROM tags WHERE trace_id = ?1", params![trace.id])?;
59 {
60 let mut stmt =
61 tx.prepare("INSERT INTO tags (trace_id, key, value) VALUES (?1, ?2, ?3)")?;
62 for (k, v) in &trace.tags {
63 stmt.execute(params![trace.id, k, v])?;
64 }
65 }
66 tx.commit()?;
67 Ok(())
68 }
69
70 pub fn get_trace(&self, id: &str) -> Result<Option<TraceRecord>> {
72 let row: Option<(String, i64, Option<String>, String)> = self
73 .conn
74 .query_row(
75 "SELECT id, created_at, session_tag, root_record_id FROM traces WHERE id = ?1",
76 params![id],
77 |r| Ok((r.get(0)?, r.get(1)?, r.get(2)?, r.get(3)?)),
78 )
79 .optional()?;
80 let Some((id, created_at, session_tag, root_record_id)) = row else {
81 return Ok(None);
82 };
83 let tags = self.tags_for(&id)?;
84 Ok(Some(TraceRecord {
85 id,
86 created_at,
87 session_tag,
88 root_record_id,
89 tags,
90 }))
91 }
92
93 pub fn find_by_tag(&self, key: &str, value: &str) -> Result<Vec<String>> {
95 let mut stmt = self
96 .conn
97 .prepare("SELECT trace_id FROM tags WHERE key = ?1 AND value = ?2 ORDER BY trace_id")?;
98 let ids: std::result::Result<Vec<String>, _> = stmt
99 .query_map(params![key, value], |r| r.get::<_, String>(0))?
100 .collect();
101 Ok(ids?)
102 }
103
104 pub fn find_by_session_tag(&self, tag: &str) -> Result<Vec<String>> {
106 let mut stmt = self
107 .conn
108 .prepare("SELECT id FROM traces WHERE session_tag = ?1 ORDER BY created_at DESC")?;
109 let ids: std::result::Result<Vec<String>, _> = stmt
110 .query_map(params![tag], |r| r.get::<_, String>(0))?
111 .collect();
112 Ok(ids?)
113 }
114
115 pub fn recent(&self, limit: u32) -> Result<Vec<String>> {
117 let mut stmt = self
118 .conn
119 .prepare("SELECT id FROM traces ORDER BY created_at DESC LIMIT ?1")?;
120 let ids: std::result::Result<Vec<String>, _> = stmt
121 .query_map(params![limit], |r| r.get::<_, String>(0))?
122 .collect();
123 Ok(ids?)
124 }
125
126 pub fn register_replay(&mut self, replay: &ReplayRecord) -> Result<()> {
128 self.conn.execute(
129 "INSERT OR REPLACE INTO replays (id, baseline_trace_id, config_hash, outcome_record_id, created_at) VALUES (?1, ?2, ?3, ?4, ?5)",
130 params![
131 replay.id,
132 replay.baseline_trace_id,
133 replay.config_hash,
134 replay.outcome_record_id,
135 replay.created_at
136 ],
137 )?;
138 Ok(())
139 }
140
141 pub fn replays_of(&self, baseline_trace_id: &str) -> Result<Vec<ReplayRecord>> {
143 let mut stmt = self.conn.prepare(
144 "SELECT id, baseline_trace_id, config_hash, outcome_record_id, created_at FROM replays WHERE baseline_trace_id = ?1 ORDER BY created_at DESC",
145 )?;
146 let rows = stmt.query_map(params![baseline_trace_id], |r| {
147 Ok(ReplayRecord {
148 id: r.get(0)?,
149 baseline_trace_id: r.get(1)?,
150 config_hash: r.get(2)?,
151 outcome_record_id: r.get(3)?,
152 created_at: r.get(4)?,
153 })
154 })?;
155 let mut out = Vec::new();
156 for row in rows {
157 out.push(row?);
158 }
159 Ok(out)
160 }
161
162 fn tags_for(&self, trace_id: &str) -> Result<HashMap<String, String>> {
163 let mut stmt = self
164 .conn
165 .prepare("SELECT key, value FROM tags WHERE trace_id = ?1")?;
166 let mut map = HashMap::new();
167 for row in stmt.query_map(params![trace_id], |r| {
168 Ok((r.get::<_, String>(0)?, r.get::<_, String>(1)?))
169 })? {
170 let (k, v) = row?;
171 map.insert(k, v);
172 }
173 Ok(map)
174 }
175}
176
177#[derive(Debug, Clone, PartialEq, Eq)]
179pub struct TraceRecord {
180 pub id: String,
182 pub created_at: i64,
184 pub session_tag: Option<String>,
186 pub root_record_id: String,
189 pub tags: HashMap<String, String>,
191}
192
193#[derive(Debug, Clone, PartialEq, Eq)]
195pub struct ReplayRecord {
196 pub id: String,
198 pub baseline_trace_id: String,
200 pub config_hash: String,
202 pub outcome_record_id: Option<String>,
204 pub created_at: i64,
206}
207
208#[cfg(test)]
209mod tests {
210 use super::*;
211
212 fn make_trace(id: &str, session_tag: Option<&str>) -> TraceRecord {
213 TraceRecord {
214 id: id.to_string(),
215 created_at: 1_700_000_000_000,
216 session_tag: session_tag.map(ToString::to_string),
217 root_record_id: id.to_string(),
218 tags: HashMap::new(),
219 }
220 }
221
222 #[test]
223 fn open_in_memory_applies_schema() {
224 let idx = Index::open_in_memory().unwrap();
225 let count: i64 = idx
227 .conn
228 .query_row("SELECT COUNT(*) FROM traces", [], |r| r.get(0))
229 .unwrap();
230 assert_eq!(count, 0);
231 }
232
233 #[test]
234 fn register_and_get_trace() {
235 let mut idx = Index::open_in_memory().unwrap();
236 let mut trace = make_trace("sha256:aaaa", Some("prod-agent-0"));
237 trace.tags.insert("env".to_string(), "prod".to_string());
238 trace
239 .tags
240 .insert("region".to_string(), "us-east".to_string());
241 idx.register_trace(&trace).unwrap();
242
243 let back = idx.get_trace("sha256:aaaa").unwrap().unwrap();
244 assert_eq!(back.id, trace.id);
245 assert_eq!(back.session_tag.as_deref(), Some("prod-agent-0"));
246 assert_eq!(back.tags, trace.tags);
247 }
248
249 #[test]
250 fn get_missing_trace_returns_none() {
251 let idx = Index::open_in_memory().unwrap();
252 assert!(idx.get_trace("sha256:does-not-exist").unwrap().is_none());
253 }
254
255 #[test]
256 fn register_is_idempotent_and_refreshes_tags() {
257 let mut idx = Index::open_in_memory().unwrap();
258 let mut trace = make_trace("sha256:aaaa", None);
259 trace.tags.insert("env".to_string(), "prod".to_string());
260 idx.register_trace(&trace).unwrap();
261
262 trace.tags.clear();
264 trace.tags.insert("env".to_string(), "dev".to_string());
265 idx.register_trace(&trace).unwrap();
266
267 let back = idx.get_trace("sha256:aaaa").unwrap().unwrap();
268 assert_eq!(back.tags.get("env").map(String::as_str), Some("dev"));
269 assert_eq!(back.tags.len(), 1);
270 }
271
272 #[test]
273 fn find_by_tag() {
274 let mut idx = Index::open_in_memory().unwrap();
275 let mut a = make_trace("sha256:a", None);
276 a.tags.insert("env".into(), "prod".into());
277 let mut b = make_trace("sha256:b", None);
278 b.tags.insert("env".into(), "prod".into());
279 let mut c = make_trace("sha256:c", None);
280 c.tags.insert("env".into(), "dev".into());
281 for t in [&a, &b, &c] {
282 idx.register_trace(t).unwrap();
283 }
284 let mut prod = idx.find_by_tag("env", "prod").unwrap();
285 prod.sort();
286 assert_eq!(prod, vec!["sha256:a", "sha256:b"]);
287 assert_eq!(idx.find_by_tag("env", "dev").unwrap(), vec!["sha256:c"]);
288 assert_eq!(idx.find_by_tag("env", "staging").unwrap().len(), 0);
289 }
290
291 #[test]
292 fn find_by_session_tag_and_recent_respect_ordering() {
293 let mut idx = Index::open_in_memory().unwrap();
294 for (i, id) in ["sha256:a", "sha256:b", "sha256:c"].iter().enumerate() {
296 let mut t = make_trace(id, Some("agent-0"));
297 t.created_at = 1_700_000_000_000 + i as i64 * 1000;
298 idx.register_trace(&t).unwrap();
299 }
300 assert_eq!(
301 idx.find_by_session_tag("agent-0").unwrap(),
302 vec!["sha256:c", "sha256:b", "sha256:a"]
303 );
304 assert_eq!(idx.recent(2).unwrap(), vec!["sha256:c", "sha256:b"]);
305 }
306
307 #[test]
308 fn register_and_query_replays() {
309 let mut idx = Index::open_in_memory().unwrap();
310 let trace = make_trace("sha256:baseline", None);
311 idx.register_trace(&trace).unwrap();
312 for i in 0..3 {
313 idx.register_replay(&ReplayRecord {
314 id: format!("replay-{i}"),
315 baseline_trace_id: trace.id.clone(),
316 config_hash: format!("sha256:cfg-{i}"),
317 outcome_record_id: None,
318 created_at: 1_700_000_000_000 + i * 1000,
319 })
320 .unwrap();
321 }
322 let replays = idx.replays_of(&trace.id).unwrap();
323 assert_eq!(replays.len(), 3);
324 assert!(replays[0].created_at > replays[2].created_at);
326 }
327
328 #[test]
329 fn cascade_delete_of_trace_removes_tags_and_replays() {
330 let mut idx = Index::open_in_memory().unwrap();
331 let mut trace = make_trace("sha256:x", None);
332 trace.tags.insert("k".into(), "v".into());
333 idx.register_trace(&trace).unwrap();
334 idx.register_replay(&ReplayRecord {
335 id: "r0".into(),
336 baseline_trace_id: trace.id.clone(),
337 config_hash: "sha256:c".into(),
338 outcome_record_id: None,
339 created_at: 1,
340 })
341 .unwrap();
342 idx.conn
343 .execute("DELETE FROM traces WHERE id = ?1", params![trace.id])
344 .unwrap();
345 assert!(idx.find_by_tag("k", "v").unwrap().is_empty());
346 assert!(idx.replays_of(&trace.id).unwrap().is_empty());
347 }
348}