Skip to main content

juncture_telemetry/
sqlite_store.rs

1//! SQLite-backed implementation of `TraceStore`.
2//!
3//! Uses `sqlx` with native async support. Schema is auto-created on
4//! first open. Suitable for single-process deployments and development.
5
6use std::path::PathBuf;
7use std::sync::Arc;
8
9use chrono::{DateTime, Utc};
10use sqlx::sqlite::{SqlitePool, SqlitePoolOptions};
11use tracing::debug;
12
13use crate::models::{
14    EnrichedSession, Id, ModelStats, Observation, ObservationLevel, ObservationType, Session,
15    SummaryStats, TokenUsage, Trace,
16};
17use crate::trace_store::{
18    DailyStats, PaginatedResponse, StoreError, TraceQuery, TraceStore, TraceWithObservations,
19};
20
21/// RAII guard that removes a transient `SQLite` database file (and its
22/// WAL/SHM companions) when the last `SqliteStore` clone is dropped.
23#[derive(Debug)]
24struct TransientDbGuard(PathBuf);
25
26impl Drop for TransientDbGuard {
27    fn drop(&mut self) {
28        let _ = std::fs::remove_file(&self.0);
29        let wal = PathBuf::from(format!("{}-wal", self.0.display()));
30        let shm = PathBuf::from(format!("{}-shm", self.0.display()));
31        let _ = std::fs::remove_file(wal);
32        let _ = std::fs::remove_file(shm);
33    }
34}
35
36/// `SQLite` store backed by a `sqlx` connection pool.
37///
38/// All database operations are async and non-blocking. Stores created
39/// via [`SqliteStore::new`] persist to the given file. Stores created
40/// via [`SqliteStore::new_memory`] use a transient file that is
41/// automatically deleted when the last clone is dropped.
42#[derive(Clone, Debug)]
43pub struct SqliteStore {
44    pool: SqlitePool,
45    _transient_guard: Option<Arc<TransientDbGuard>>,
46}
47
48impl SqliteStore {
49    /// Create a new `SQLite` store at the given file path.
50    ///
51    /// The database and schema are created if they do not exist.
52    ///
53    /// # Errors
54    ///
55    /// Returns `StoreError::Storage` if the database cannot be opened
56    /// or the schema cannot be created.
57    pub async fn new(path: &str) -> Result<Self, StoreError> {
58        let url = format!("sqlite:{path}?mode=rwc");
59
60        let pool = SqlitePoolOptions::new()
61            .max_connections(1)
62            .connect(&url)
63            .await
64            .map_err(|e| StoreError::Storage(format!("open db: {e}")))?;
65
66        let store = Self {
67            pool,
68            _transient_guard: None,
69        };
70        store.create_schema().await?;
71        Ok(store)
72    }
73
74    /// Create a transient `SQLite` store backed by a file in the system
75    /// temp directory. The file and its WAL/SHM companions are removed
76    /// automatically when the last clone is dropped.
77    ///
78    /// # Errors
79    ///
80    /// Returns `StoreError::Storage` if the schema cannot be created.
81    pub async fn new_memory() -> Result<Self, StoreError> {
82        let id = uuid::Uuid::new_v4();
83        let path = std::env::temp_dir().join(format!("juncture-telemetry-{id}.db"));
84        let path_str = path.to_string_lossy().to_string();
85        let pool = SqlitePoolOptions::new()
86            .max_connections(1)
87            .connect(&format!("sqlite:{path_str}?mode=rwc"))
88            .await
89            .map_err(|e| StoreError::Storage(format!("open db: {e}")))?;
90
91        let store = Self {
92            pool,
93            _transient_guard: Some(Arc::new(TransientDbGuard(path))),
94        };
95        store.create_schema().await?;
96        Ok(store)
97    }
98
99    async fn create_schema(&self) -> Result<(), StoreError> {
100        sqlx::query(
101            "CREATE TABLE IF NOT EXISTS traces (
102                id TEXT PRIMARY KEY,
103                name TEXT NOT NULL,
104                user_id TEXT,
105                session_id TEXT,
106                tags TEXT NOT NULL DEFAULT '[]',
107                metadata TEXT NOT NULL DEFAULT 'null',
108                environment TEXT,
109                release_version TEXT,
110                input TEXT,
111                output TEXT,
112                start_time TEXT NOT NULL,
113                end_time TEXT,
114                total_cost REAL,
115                total_tokens INTEGER
116            )",
117        )
118        .execute(&self.pool)
119        .await
120        .map_err(|e| StoreError::Storage(format!("create traces table: {e}")))?;
121
122        sqlx::query("CREATE INDEX IF NOT EXISTS idx_traces_session ON traces(session_id)")
123            .execute(&self.pool)
124            .await
125            .map_err(|e| StoreError::Storage(format!("create session index: {e}")))?;
126
127        sqlx::query("CREATE INDEX IF NOT EXISTS idx_traces_user ON traces(user_id)")
128            .execute(&self.pool)
129            .await
130            .map_err(|e| StoreError::Storage(format!("create user index: {e}")))?;
131
132        sqlx::query("CREATE INDEX IF NOT EXISTS idx_traces_start ON traces(start_time)")
133            .execute(&self.pool)
134            .await
135            .map_err(|e| StoreError::Storage(format!("create start index: {e}")))?;
136
137        sqlx::query(
138            "CREATE TABLE IF NOT EXISTS observations (
139                id TEXT PRIMARY KEY,
140                trace_id TEXT NOT NULL REFERENCES traces(id),
141                parent_observation_id TEXT,
142                name TEXT NOT NULL,
143                observation_type TEXT NOT NULL,
144                start_time TEXT NOT NULL,
145                end_time TEXT,
146                input TEXT,
147                output TEXT,
148                metadata TEXT NOT NULL DEFAULT 'null',
149                level TEXT NOT NULL DEFAULT 'DEFAULT',
150                status_message TEXT,
151                model TEXT,
152                model_parameters TEXT,
153                usage_input_tokens INTEGER,
154                usage_output_tokens INTEGER,
155                usage_total_tokens INTEGER,
156                usage_cached_tokens INTEGER,
157                cost REAL
158            )",
159        )
160        .execute(&self.pool)
161        .await
162        .map_err(|e| StoreError::Storage(format!("create observations table: {e}")))?;
163
164        sqlx::query("CREATE INDEX IF NOT EXISTS idx_obs_trace ON observations(trace_id)")
165            .execute(&self.pool)
166            .await
167            .map_err(|e| StoreError::Storage(format!("create obs trace index: {e}")))?;
168
169        sqlx::query("CREATE INDEX IF NOT EXISTS idx_obs_start ON observations(start_time)")
170            .execute(&self.pool)
171            .await
172            .map_err(|e| StoreError::Storage(format!("create obs start index: {e}")))?;
173
174        sqlx::query(
175            "CREATE TABLE IF NOT EXISTS sessions (
176                id TEXT PRIMARY KEY,
177                user_id TEXT,
178                created_at TEXT NOT NULL
179            )",
180        )
181        .execute(&self.pool)
182        .await
183        .map_err(|e| StoreError::Storage(format!("create sessions table: {e}")))?;
184
185        debug!("SQLite schema initialized");
186        Ok(())
187    }
188}
189
190#[async_trait::async_trait]
191impl TraceStore for SqliteStore {
192    async fn upsert_trace(&self, trace: &Trace) -> Result<(), StoreError> {
193        sqlx::query(
194            "INSERT OR REPLACE INTO traces (
195                id, name, user_id, session_id, tags, metadata,
196                environment, release_version, input, output,
197                start_time, end_time, total_cost, total_tokens
198            ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
199        )
200        .bind(trace.id.to_string())
201        .bind(&trace.name)
202        .bind(&trace.user_id)
203        .bind(&trace.session_id)
204        .bind(serde_json::to_string(&trace.tags).unwrap_or_else(|_| "[]".to_string()))
205        .bind(serde_json::to_string(&trace.metadata).unwrap_or_else(|_| "null".to_string()))
206        .bind(&trace.environment)
207        .bind(&trace.release)
208        .bind(
209            trace
210                .input
211                .as_ref()
212                .map(|v| serde_json::to_string(v).unwrap_or_else(|_| "null".to_string())),
213        )
214        .bind(
215            trace
216                .output
217                .as_ref()
218                .map(|v| serde_json::to_string(v).unwrap_or_else(|_| "null".to_string())),
219        )
220        .bind(trace.start_time.to_rfc3339())
221        .bind(trace.end_time.map(|t| t.to_rfc3339()))
222        .bind(trace.total_cost)
223        .bind(
224            trace
225                .total_tokens
226                .map(|t| i64::try_from(t).unwrap_or(i64::MAX)),
227        )
228        .execute(&self.pool)
229        .await
230        .map_err(|e| StoreError::Storage(format!("insert trace: {e}")))?;
231        Ok(())
232    }
233
234    async fn insert_observation(&self, observation: &Observation) -> Result<(), StoreError> {
235        sqlx::query(
236            "INSERT OR REPLACE INTO observations (
237                id, trace_id, parent_observation_id, name, observation_type,
238                start_time, end_time, input, output, metadata,
239                level, status_message, model, model_parameters,
240                usage_input_tokens, usage_output_tokens, usage_total_tokens,
241                usage_cached_tokens, cost
242            ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?15, ?16, ?17, ?18, ?19)",
243        )
244        .bind(observation.id.to_string())
245        .bind(observation.trace_id.to_string())
246        .bind(observation.parent_observation_id.map(|id| id.to_string()))
247        .bind(&observation.name)
248        .bind(observation.observation_type.as_str())
249        .bind(observation.start_time.to_rfc3339())
250        .bind(observation.end_time.map(|t| t.to_rfc3339()))
251        .bind(
252            observation
253                .input
254                .as_ref()
255                .map(|v| serde_json::to_string(v).unwrap_or_else(|_| "null".to_string())),
256        )
257        .bind(
258            observation
259                .output
260                .as_ref()
261                .map(|v| serde_json::to_string(v).unwrap_or_else(|_| "null".to_string())),
262        )
263        .bind(serde_json::to_string(&observation.metadata).unwrap_or_else(|_| "null".to_string()))
264        .bind(observation.level.as_str())
265        .bind(&observation.status_message)
266        .bind(&observation.model)
267        .bind(
268            observation
269                .model_parameters
270                .as_ref()
271                .map(|v| serde_json::to_string(v).unwrap_or_else(|_| "null".to_string())),
272        )
273        .bind(
274            observation
275                .usage
276                .as_ref()
277                .map(|u| i64::try_from(u.input_tokens).unwrap_or(i64::MAX)),
278        )
279        .bind(
280            observation
281                .usage
282                .as_ref()
283                .map(|u| i64::try_from(u.output_tokens).unwrap_or(i64::MAX)),
284        )
285        .bind(
286            observation
287                .usage
288                .as_ref()
289                .map(|u| i64::try_from(u.total_tokens).unwrap_or(i64::MAX)),
290        )
291        .bind(observation.usage.as_ref().and_then(|u| {
292            u.cached_tokens
293                .map(|t| i64::try_from(t).unwrap_or(i64::MAX))
294        }))
295        .bind(observation.cost)
296        .execute(&self.pool)
297        .await
298        .map_err(|e| StoreError::Storage(format!("insert observation: {e}")))?;
299        Ok(())
300    }
301
302    async fn upsert_session(&self, session: &Session) -> Result<(), StoreError> {
303        sqlx::query("INSERT OR REPLACE INTO sessions (id, user_id, created_at) VALUES (?, ?, ?)")
304            .bind(&session.id)
305            .bind(&session.user_id)
306            .bind(session.created_at.to_rfc3339())
307            .execute(&self.pool)
308            .await
309            .map_err(|e| StoreError::Storage(format!("insert session: {e}")))?;
310        Ok(())
311    }
312
313    async fn get_trace(&self, id: Id) -> Result<Option<TraceWithObservations>, StoreError> {
314        let id_str = id.to_string();
315
316        let row = sqlx::query_as::<_, TraceRow>(
317            "SELECT id, name, user_id, session_id, tags, metadata,
318                    environment, release_version, input, output,
319                    start_time, end_time, total_cost, total_tokens
320             FROM traces WHERE id = ?",
321        )
322        .bind(&id_str)
323        .fetch_optional(&self.pool)
324        .await
325        .map_err(|e| StoreError::Storage(format!("query trace: {e}")))?;
326
327        let Some(row) = row else {
328            return Ok(None);
329        };
330
331        let trace = row.into_trace();
332
333        let obs_rows = sqlx::query_as::<_, ObservationRow>(
334            "SELECT id, trace_id, parent_observation_id, name, observation_type,
335                    start_time, end_time, input, output, metadata,
336                    level, status_message, model, model_parameters,
337                    usage_input_tokens, usage_output_tokens, usage_total_tokens,
338                    usage_cached_tokens, cost
339             FROM observations WHERE trace_id = ? ORDER BY start_time",
340        )
341        .bind(&id_str)
342        .fetch_all(&self.pool)
343        .await
344        .map_err(|e| StoreError::Storage(format!("query observations: {e}")))?;
345
346        let observations = obs_rows
347            .into_iter()
348            .map(ObservationRow::into_observation)
349            .collect::<Vec<_>>();
350
351        Ok(Some(TraceWithObservations {
352            trace,
353            observations,
354        }))
355    }
356
357    async fn query_traces(
358        &self,
359        query: &TraceQuery,
360    ) -> Result<PaginatedResponse<Trace>, StoreError> {
361        let page = query.page.unwrap_or(0);
362        let page_size = i64::from(query.page_size.unwrap_or(50).min(500));
363        let offset = i64::from(page) * page_size;
364
365        // Build WHERE clause dynamically
366        let mut conditions = vec!["1=1".to_string()];
367        if query.session_id.is_some() {
368            conditions.push("session_id = ?".to_string());
369        }
370        if query.user_id.is_some() {
371            conditions.push("user_id = ?".to_string());
372        }
373        if query.name.is_some() {
374            conditions.push("name LIKE ?".to_string());
375        }
376        if query.environment.is_some() {
377            conditions.push("environment = ?".to_string());
378        }
379        if query.from_timestamp.is_some() {
380            conditions.push("start_time >= ?".to_string());
381        }
382        if query.to_timestamp.is_some() {
383            conditions.push("start_time <= ?".to_string());
384        }
385
386        let where_clause = conditions.join(" AND ");
387
388        // Count query
389        let count_sql = format!("SELECT COUNT(*) as cnt FROM traces WHERE {where_clause}");
390        let mut count_q = sqlx::query_scalar::<_, i64>(&count_sql);
391        if let Some(ref sid) = query.session_id {
392            count_q = count_q.bind(sid);
393        }
394        if let Some(ref uid) = query.user_id {
395            count_q = count_q.bind(uid);
396        }
397        if let Some(ref name) = query.name {
398            count_q = count_q.bind(format!("%{name}%"));
399        }
400        if let Some(ref env) = query.environment {
401            count_q = count_q.bind(env);
402        }
403        if let Some(from) = query.from_timestamp {
404            count_q = count_q.bind(from.to_rfc3339());
405        }
406        if let Some(to) = query.to_timestamp {
407            count_q = count_q.bind(to.to_rfc3339());
408        }
409
410        let total_count: i64 = count_q
411            .fetch_one(&self.pool)
412            .await
413            .map_err(|e| StoreError::Storage(format!("count traces: {e}")))?;
414
415        // Data query
416        let data_sql = format!(
417            "SELECT id, name, user_id, session_id, tags, metadata,
418                    environment, release_version, input, output,
419                    start_time, end_time, total_cost, total_tokens
420             FROM traces WHERE {where_clause}
421             ORDER BY start_time DESC LIMIT ? OFFSET ?"
422        );
423        let mut data_q = sqlx::query_as::<_, TraceRow>(&data_sql);
424        if let Some(ref sid) = query.session_id {
425            data_q = data_q.bind(sid);
426        }
427        if let Some(ref uid) = query.user_id {
428            data_q = data_q.bind(uid);
429        }
430        if let Some(ref name) = query.name {
431            data_q = data_q.bind(format!("%{name}%"));
432        }
433        if let Some(ref env) = query.environment {
434            data_q = data_q.bind(env);
435        }
436        if let Some(from) = query.from_timestamp {
437            data_q = data_q.bind(from.to_rfc3339());
438        }
439        if let Some(to) = query.to_timestamp {
440            data_q = data_q.bind(to.to_rfc3339());
441        }
442        data_q = data_q.bind(page_size);
443        data_q = data_q.bind(offset);
444
445        let rows = data_q
446            .fetch_all(&self.pool)
447            .await
448            .map_err(|e| StoreError::Storage(format!("query traces: {e}")))?;
449
450        let traces = rows.into_iter().map(TraceRow::into_trace).collect();
451
452        Ok(PaginatedResponse {
453            data: traces,
454            page,
455            page_size: u32::try_from(page_size).unwrap_or(50),
456            total_count: u64::try_from(total_count).unwrap_or(0),
457        })
458    }
459
460    async fn get_session(&self, id: &str) -> Result<Option<Session>, StoreError> {
461        let row = sqlx::query_as::<_, SessionRow>(
462            "SELECT id, user_id, created_at FROM sessions WHERE id = ?",
463        )
464        .bind(id)
465        .fetch_optional(&self.pool)
466        .await
467        .map_err(|e| StoreError::Storage(format!("query session: {e}")))?;
468
469        Ok(row.map(SessionRow::into_session))
470    }
471
472    async fn query_sessions(
473        &self,
474        page: u32,
475        page_size: u32,
476    ) -> Result<PaginatedResponse<Session>, StoreError> {
477        let page_size = i64::from(page_size.min(500));
478        let offset = i64::from(page) * page_size;
479
480        let total_count: i64 = sqlx::query_scalar("SELECT COUNT(*) FROM sessions")
481            .fetch_one(&self.pool)
482            .await
483            .map_err(|e| StoreError::Storage(format!("count sessions: {e}")))?;
484
485        let rows = sqlx::query_as::<_, SessionRow>(
486            "SELECT id, user_id, created_at FROM sessions
487             ORDER BY created_at DESC LIMIT ? OFFSET ?",
488        )
489        .bind(page_size)
490        .bind(offset)
491        .fetch_all(&self.pool)
492        .await
493        .map_err(|e| StoreError::Storage(format!("query sessions: {e}")))?;
494
495        let sessions = rows.into_iter().map(SessionRow::into_session).collect();
496
497        Ok(PaginatedResponse {
498            data: sessions,
499            page,
500            page_size: u32::try_from(page_size).unwrap_or(50),
501            total_count: u64::try_from(total_count).unwrap_or(0),
502        })
503    }
504
505    async fn get_daily_stats(
506        &self,
507        from: DateTime<Utc>,
508        to: DateTime<Utc>,
509    ) -> Result<Vec<DailyStats>, StoreError> {
510        let rows = sqlx::query_as::<_, DailyStatsRow>(
511            "SELECT
512                DATE(start_time) as date,
513                COUNT(*) as trace_count,
514                COALESCE(SUM(total_tokens), 0) as total_tokens,
515                COALESCE(SUM(total_cost), 0.0) as total_cost
516             FROM traces
517             WHERE start_time >= ? AND start_time <= ?
518             GROUP BY DATE(start_time)
519             ORDER BY date",
520        )
521        .bind(from.to_rfc3339())
522        .bind(to.to_rfc3339())
523        .fetch_all(&self.pool)
524        .await
525        .map_err(|e| StoreError::Storage(format!("query stats: {e}")))?;
526
527        Ok(rows.into_iter().map(DailyStatsRow::into_stats).collect())
528    }
529
530    async fn get_model_stats(&self) -> Result<Vec<ModelStats>, StoreError> {
531        let rows = sqlx::query_as::<_, ModelStatsRow>(
532            "SELECT model, COUNT(*) as call_count,
533                COALESCE(SUM(usage_input_tokens), 0) as input_tokens,
534                COALESCE(SUM(usage_output_tokens), 0) as output_tokens,
535                COALESCE(SUM(cost), 0.0) as total_cost,
536                COALESCE(AVG(CAST((julianday(end_time) - julianday(start_time)) * 86400000 AS REAL)), 0.0) as avg_latency_ms
537             FROM observations WHERE model IS NOT NULL
538             GROUP BY model ORDER BY total_cost DESC",
539        )
540        .fetch_all(&self.pool)
541        .await
542        .map_err(|e| StoreError::Storage(format!("query model stats: {e}")))?;
543
544        Ok(rows
545            .into_iter()
546            .map(ModelStatsRow::into_model_stats)
547            .collect())
548    }
549
550    async fn get_summary_stats(&self) -> Result<SummaryStats, StoreError> {
551        let total_traces: i64 = sqlx::query_scalar("SELECT COUNT(*) FROM traces")
552            .fetch_one(&self.pool)
553            .await
554            .map_err(|e| StoreError::Storage(format!("count traces: {e}")))?;
555
556        let total_observations: i64 = sqlx::query_scalar("SELECT COUNT(*) FROM observations")
557            .fetch_one(&self.pool)
558            .await
559            .map_err(|e| StoreError::Storage(format!("count observations: {e}")))?;
560
561        let total_cost: f64 =
562            sqlx::query_scalar("SELECT COALESCE(SUM(total_cost), 0.0) FROM traces")
563                .fetch_one(&self.pool)
564                .await
565                .map_err(|e| StoreError::Storage(format!("sum cost: {e}")))?;
566
567        let total_tokens: i64 =
568            sqlx::query_scalar("SELECT COALESCE(SUM(total_tokens), 0) FROM traces")
569                .fetch_one(&self.pool)
570                .await
571                .map_err(|e| StoreError::Storage(format!("sum tokens: {e}")))?;
572
573        let error_count: i64 =
574            sqlx::query_scalar("SELECT COUNT(*) FROM observations WHERE level = 'ERROR'")
575                .fetch_one(&self.pool)
576                .await
577                .map_err(|e| StoreError::Storage(format!("count errors: {e}")))?;
578
579        let active_sessions: i64 = sqlx::query_scalar("SELECT COUNT(*) FROM sessions")
580            .fetch_one(&self.pool)
581            .await
582            .map_err(|e| StoreError::Storage(format!("count sessions: {e}")))?;
583
584        // Latency percentiles: get all trace durations, sort, calculate p50/p95/p99
585        let durations: Vec<f64> = sqlx::query_scalar(
586            "SELECT CAST((julianday(end_time) - julianday(start_time)) * 86400000 AS REAL) as dur
587             FROM traces WHERE end_time IS NOT NULL ORDER BY dur",
588        )
589        .fetch_all(&self.pool)
590        .await
591        .map_err(|e| StoreError::Storage(format!("query latencies: {e}")))?;
592
593        let (p50, p95, p99) = if durations.is_empty() {
594            (0.0, 0.0, 0.0)
595        } else {
596            let len = durations.len();
597            let p50_idx = len * 50 / 100;
598            let p95_idx = len * 95 / 100;
599            let p99_idx = (len * 99 / 100).min(len - 1);
600            (durations[p50_idx], durations[p95_idx], durations[p99_idx])
601        };
602
603        Ok(SummaryStats {
604            total_traces: u64::try_from(total_traces).unwrap_or(0),
605            total_observations: u64::try_from(total_observations).unwrap_or(0),
606            total_cost,
607            total_tokens: u64::try_from(total_tokens).unwrap_or(0),
608            error_count: u64::try_from(error_count).unwrap_or(0),
609            active_sessions: u64::try_from(active_sessions).unwrap_or(0),
610            latency_p50_ms: p50,
611            latency_p95_ms: p95,
612            latency_p99_ms: p99,
613        })
614    }
615
616    async fn query_enriched_sessions(
617        &self,
618        page: u32,
619        page_size: u32,
620    ) -> Result<PaginatedResponse<EnrichedSession>, StoreError> {
621        let page_size = i64::from(page_size.min(500));
622        let offset = i64::from(page) * page_size;
623
624        let total_count: i64 = sqlx::query_scalar("SELECT COUNT(*) FROM sessions")
625            .fetch_one(&self.pool)
626            .await
627            .map_err(|e| StoreError::Storage(format!("count sessions: {e}")))?;
628
629        let rows = sqlx::query_as::<_, EnrichedSessionRow>(
630            "SELECT s.id, s.user_id, s.created_at,
631                COUNT(t.id) as trace_count,
632                COALESCE(SUM(t.total_cost), 0.0) as total_cost,
633                COALESCE(SUM(t.total_tokens), 0) as total_tokens,
634                MAX(t.start_time) as last_active
635             FROM sessions s LEFT JOIN traces t ON t.session_id = s.id
636             GROUP BY s.id ORDER BY s.created_at DESC LIMIT ? OFFSET ?",
637        )
638        .bind(page_size)
639        .bind(offset)
640        .fetch_all(&self.pool)
641        .await
642        .map_err(|e| StoreError::Storage(format!("query enriched sessions: {e}")))?;
643
644        let sessions = rows
645            .into_iter()
646            .map(EnrichedSessionRow::into_enriched_session)
647            .collect();
648
649        Ok(PaginatedResponse {
650            data: sessions,
651            page,
652            page_size: u32::try_from(page_size).unwrap_or(50),
653            total_count: u64::try_from(total_count).unwrap_or(0),
654        })
655    }
656
657    async fn flush(&self) -> Result<(), StoreError> {
658        // SQLite with WAL mode auto-flushes. No-op.
659        Ok(())
660    }
661}
662
663// ── sqlx row types ──────────────────────────────────────────────
664
665#[derive(sqlx::FromRow)]
666struct TraceRow {
667    id: String,
668    name: String,
669    user_id: Option<String>,
670    session_id: Option<String>,
671    tags: String,
672    metadata: String,
673    environment: Option<String>,
674    release_version: Option<String>,
675    input: Option<String>,
676    output: Option<String>,
677    start_time: String,
678    end_time: Option<String>,
679    total_cost: Option<f64>,
680    total_tokens: Option<i64>,
681}
682
683impl TraceRow {
684    fn into_trace(self) -> Trace {
685        Trace {
686            id: Id::parse_str(&self.id).unwrap_or_else(|_| Id::nil()),
687            name: self.name,
688            user_id: self.user_id,
689            session_id: self.session_id,
690            tags: serde_json::from_str(&self.tags).unwrap_or_default(),
691            metadata: serde_json::from_str(&self.metadata).unwrap_or(serde_json::Value::Null),
692            environment: self.environment,
693            release: self.release_version,
694            input: self.input.and_then(|s| serde_json::from_str(&s).ok()),
695            output: self.output.and_then(|s| serde_json::from_str(&s).ok()),
696            start_time: DateTime::parse_from_rfc3339(&self.start_time)
697                .map_or_else(|_| Utc::now(), |dt| dt.with_timezone(&Utc)),
698            end_time: self.end_time.and_then(|s| {
699                DateTime::parse_from_rfc3339(&s)
700                    .map(|dt| dt.with_timezone(&Utc))
701                    .ok()
702            }),
703            total_cost: self.total_cost,
704            total_tokens: self.total_tokens.and_then(|t| u64::try_from(t).ok()),
705        }
706    }
707}
708
709#[derive(sqlx::FromRow)]
710struct ObservationRow {
711    id: String,
712    trace_id: String,
713    parent_observation_id: Option<String>,
714    name: String,
715    observation_type: String,
716    start_time: String,
717    end_time: Option<String>,
718    input: Option<String>,
719    output: Option<String>,
720    metadata: String,
721    level: String,
722    status_message: Option<String>,
723    model: Option<String>,
724    model_parameters: Option<String>,
725    usage_input_tokens: Option<i64>,
726    usage_output_tokens: Option<i64>,
727    usage_total_tokens: Option<i64>,
728    usage_cached_tokens: Option<i64>,
729    cost: Option<f64>,
730}
731
732impl ObservationRow {
733    fn into_observation(self) -> Observation {
734        let observation_type = match self.observation_type.as_str() {
735            "GENERATION" => ObservationType::Generation,
736            "TOOL_CALL" => ObservationType::ToolCall,
737            "RETRIEVAL" => ObservationType::Retrieval,
738            _ => ObservationType::Span,
739        };
740
741        let level = match self.level.as_str() {
742            "DEBUG" => ObservationLevel::Debug,
743            "WARNING" => ObservationLevel::Warning,
744            "ERROR" => ObservationLevel::Error,
745            _ => ObservationLevel::Default,
746        };
747
748        let usage = (self.usage_input_tokens.is_some()
749            || self.usage_output_tokens.is_some()
750            || self.usage_total_tokens.is_some())
751        .then(|| TokenUsage {
752            input_tokens: self
753                .usage_input_tokens
754                .and_then(|t| u64::try_from(t).ok())
755                .unwrap_or(0),
756            output_tokens: self
757                .usage_output_tokens
758                .and_then(|t| u64::try_from(t).ok())
759                .unwrap_or(0),
760            total_tokens: self
761                .usage_total_tokens
762                .and_then(|t| u64::try_from(t).ok())
763                .unwrap_or(0),
764            cached_tokens: self.usage_cached_tokens.and_then(|t| u64::try_from(t).ok()),
765        });
766
767        Observation {
768            id: Id::parse_str(&self.id).unwrap_or_else(|_| Id::nil()),
769            trace_id: Id::parse_str(&self.trace_id).unwrap_or_else(|_| Id::nil()),
770            parent_observation_id: self
771                .parent_observation_id
772                .and_then(|s| Id::parse_str(&s).ok()),
773            name: self.name,
774            observation_type,
775            start_time: DateTime::parse_from_rfc3339(&self.start_time)
776                .map_or_else(|_| Utc::now(), |dt| dt.with_timezone(&Utc)),
777            end_time: self.end_time.and_then(|s| {
778                DateTime::parse_from_rfc3339(&s)
779                    .map(|dt| dt.with_timezone(&Utc))
780                    .ok()
781            }),
782            input: self.input.and_then(|s| serde_json::from_str(&s).ok()),
783            output: self.output.and_then(|s| serde_json::from_str(&s).ok()),
784            metadata: serde_json::from_str(&self.metadata).unwrap_or(serde_json::Value::Null),
785            level,
786            status_message: self.status_message,
787            model: self.model,
788            model_parameters: self
789                .model_parameters
790                .and_then(|s| serde_json::from_str(&s).ok()),
791            usage,
792            cost: self.cost,
793        }
794    }
795}
796
797#[derive(sqlx::FromRow)]
798struct SessionRow {
799    id: String,
800    user_id: Option<String>,
801    created_at: String,
802}
803
804impl SessionRow {
805    fn into_session(self) -> Session {
806        Session {
807            id: self.id,
808            user_id: self.user_id,
809            created_at: DateTime::parse_from_rfc3339(&self.created_at)
810                .map_or_else(|_| Utc::now(), |dt| dt.with_timezone(&Utc)),
811        }
812    }
813}
814
815#[derive(sqlx::FromRow)]
816struct DailyStatsRow {
817    date: String,
818    trace_count: i64,
819    total_tokens: i64,
820    total_cost: f64,
821}
822
823impl DailyStatsRow {
824    fn into_stats(self) -> DailyStats {
825        DailyStats {
826            date: self.date,
827            trace_count: u64::try_from(self.trace_count).unwrap_or(0),
828            observation_count: 0,
829            total_tokens: u64::try_from(self.total_tokens).unwrap_or(0),
830            total_cost: self.total_cost,
831            total_duration_ms: 0,
832        }
833    }
834}
835
836#[derive(sqlx::FromRow)]
837struct ModelStatsRow {
838    model: String,
839    call_count: i64,
840    input_tokens: i64,
841    output_tokens: i64,
842    total_cost: f64,
843    avg_latency_ms: f64,
844}
845
846impl ModelStatsRow {
847    fn into_model_stats(self) -> ModelStats {
848        ModelStats {
849            model: self.model,
850            call_count: u64::try_from(self.call_count).unwrap_or(0),
851            input_tokens: u64::try_from(self.input_tokens).unwrap_or(0),
852            output_tokens: u64::try_from(self.output_tokens).unwrap_or(0),
853            total_cost: self.total_cost,
854            avg_latency_ms: self.avg_latency_ms,
855        }
856    }
857}
858
859#[derive(sqlx::FromRow)]
860struct EnrichedSessionRow {
861    id: String,
862    user_id: Option<String>,
863    created_at: String,
864    trace_count: i64,
865    total_cost: f64,
866    total_tokens: i64,
867    last_active: Option<String>,
868}
869
870impl EnrichedSessionRow {
871    fn into_enriched_session(self) -> EnrichedSession {
872        EnrichedSession {
873            id: self.id,
874            user_id: self.user_id,
875            created_at: self.created_at,
876            trace_count: u64::try_from(self.trace_count).unwrap_or(0),
877            total_cost: self.total_cost,
878            total_tokens: u64::try_from(self.total_tokens).unwrap_or(0),
879            last_active: self.last_active,
880        }
881    }
882}
883
884#[cfg(test)]
885mod tests {
886    use super::*;
887
888    #[tokio::test]
889    async fn sqlite_store_create_and_insert_trace() {
890        let store = SqliteStore::new_memory().await.unwrap();
891        let mut trace = Trace::new("test_graph");
892        trace.session_id = Some("thread-1".to_string());
893        trace.user_id = Some("user-1".to_string());
894        trace.complete(None, Some(0.05), Some(100));
895
896        store.upsert_trace(&trace).await.unwrap();
897
898        let loaded = store.get_trace(trace.id).await.unwrap();
899        assert!(loaded.is_some());
900        let loaded = loaded.unwrap();
901        assert_eq!(loaded.trace.name, "test_graph");
902        assert_eq!(loaded.trace.session_id.as_deref(), Some("thread-1"));
903        assert_eq!(loaded.trace.total_cost, Some(0.05));
904    }
905
906    #[tokio::test]
907    async fn sqlite_store_insert_observation() {
908        let store = SqliteStore::new_memory().await.unwrap();
909        let trace = Trace::new("test_graph");
910        store.upsert_trace(&trace).await.unwrap();
911
912        let mut obs = Observation::generation(trace.id, "llm_call", "claude-sonnet-4-20250514");
913        obs.input = Some(serde_json::json!({"prompt": "hello"}));
914        obs.usage = Some(TokenUsage {
915            input_tokens: 100,
916            output_tokens: 50,
917            total_tokens: 150,
918            cached_tokens: None,
919        });
920        obs.cost = Some(0.003);
921        obs.complete(Some(serde_json::json!({"text": "hi there"})));
922
923        store.insert_observation(&obs).await.unwrap();
924
925        let loaded = store.get_trace(trace.id).await.unwrap().unwrap();
926        assert_eq!(loaded.observations.len(), 1);
927        assert_eq!(loaded.observations[0].name, "llm_call");
928        assert_eq!(
929            loaded.observations[0].observation_type,
930            ObservationType::Generation
931        );
932    }
933
934    #[tokio::test]
935    async fn sqlite_store_query_traces() {
936        let store = SqliteStore::new_memory().await.unwrap();
937
938        for i in 0..5 {
939            let mut trace = Trace::new(format!("graph_{i}"));
940            trace.session_id = Some("thread-1".to_string());
941            store.upsert_trace(&trace).await.unwrap();
942        }
943
944        let query = TraceQuery {
945            session_id: Some("thread-1".to_string()),
946            ..Default::default()
947        };
948        let result = store.query_traces(&query).await.unwrap();
949        assert_eq!(result.data.len(), 5);
950        assert_eq!(result.total_count, 5);
951    }
952
953    #[tokio::test]
954    async fn sqlite_store_sessions() {
955        let store = SqliteStore::new_memory().await.unwrap();
956
957        let session = Session::new("thread-1");
958        store.upsert_session(&session).await.unwrap();
959
960        let loaded = store.get_session("thread-1").await.unwrap();
961        assert!(loaded.is_some());
962        assert_eq!(loaded.unwrap().id, "thread-1");
963
964        let pages = store.query_sessions(0, 10).await.unwrap();
965        assert_eq!(pages.data.len(), 1);
966    }
967
968    #[tokio::test]
969    async fn sqlite_store_nested_observations() {
970        let store = SqliteStore::new_memory().await.unwrap();
971        let trace = Trace::new("test_graph");
972        store.upsert_trace(&trace).await.unwrap();
973
974        let mut superstep = Observation::span(trace.id, "juncture.superstep");
975        superstep.complete(None);
976        store.insert_observation(&superstep).await.unwrap();
977
978        let mut node =
979            Observation::span(trace.id, "juncture.node.execute").with_parent(superstep.id);
980        node.complete(None);
981        store.insert_observation(&node).await.unwrap();
982
983        let mut llm = Observation::generation(trace.id, "llm_call", "model").with_parent(node.id);
984        llm.complete(None);
985        store.insert_observation(&llm).await.unwrap();
986
987        let loaded = store.get_trace(trace.id).await.unwrap().unwrap();
988        assert_eq!(loaded.observations.len(), 3);
989
990        let llm_loaded = loaded
991            .observations
992            .iter()
993            .find(|o| o.name == "llm_call")
994            .unwrap();
995        assert_eq!(llm_loaded.parent_observation_id, Some(node.id));
996    }
997
998    #[tokio::test]
999    async fn sqlite_store_daily_stats() {
1000        let store = SqliteStore::new_memory().await.unwrap();
1001
1002        let mut trace = Trace::new("test");
1003        trace.total_cost = Some(0.05);
1004        trace.total_tokens = Some(100);
1005        store.upsert_trace(&trace).await.unwrap();
1006
1007        let from = Utc::now() - chrono::Duration::days(1);
1008        let to = Utc::now() + chrono::Duration::days(1);
1009        let stats = store.get_daily_stats(from, to).await.unwrap();
1010        assert!(!stats.is_empty());
1011    }
1012}