1use std::path::PathBuf;
7use std::sync::Arc;
8
9use chrono::{DateTime, Utc};
10use sqlx::sqlite::{SqlitePool, SqlitePoolOptions};
11use tracing::debug;
12
13use crate::models::{
14 EnrichedSession, Id, ModelStats, Observation, ObservationLevel, ObservationType, Session,
15 SummaryStats, TokenUsage, Trace,
16};
17use crate::trace_store::{
18 DailyStats, PaginatedResponse, StoreError, TraceQuery, TraceStore, TraceWithObservations,
19};
20
21#[derive(Debug)]
24struct TransientDbGuard(PathBuf);
25
26impl Drop for TransientDbGuard {
27 fn drop(&mut self) {
28 let _ = std::fs::remove_file(&self.0);
29 let wal = PathBuf::from(format!("{}-wal", self.0.display()));
30 let shm = PathBuf::from(format!("{}-shm", self.0.display()));
31 let _ = std::fs::remove_file(wal);
32 let _ = std::fs::remove_file(shm);
33 }
34}
35
36#[derive(Clone, Debug)]
43pub struct SqliteStore {
44 pool: SqlitePool,
45 _transient_guard: Option<Arc<TransientDbGuard>>,
46}
47
48impl SqliteStore {
49 pub async fn new(path: &str) -> Result<Self, StoreError> {
58 let url = format!("sqlite:{path}?mode=rwc");
59
60 let pool = SqlitePoolOptions::new()
61 .max_connections(1)
62 .connect(&url)
63 .await
64 .map_err(|e| StoreError::Storage(format!("open db: {e}")))?;
65
66 let store = Self {
67 pool,
68 _transient_guard: None,
69 };
70 store.create_schema().await?;
71 Ok(store)
72 }
73
74 pub async fn new_memory() -> Result<Self, StoreError> {
82 let id = uuid::Uuid::new_v4();
83 let path = std::env::temp_dir().join(format!("juncture-telemetry-{id}.db"));
84 let path_str = path.to_string_lossy().to_string();
85 let pool = SqlitePoolOptions::new()
86 .max_connections(1)
87 .connect(&format!("sqlite:{path_str}?mode=rwc"))
88 .await
89 .map_err(|e| StoreError::Storage(format!("open db: {e}")))?;
90
91 let store = Self {
92 pool,
93 _transient_guard: Some(Arc::new(TransientDbGuard(path))),
94 };
95 store.create_schema().await?;
96 Ok(store)
97 }
98
99 async fn create_schema(&self) -> Result<(), StoreError> {
100 sqlx::query(
101 "CREATE TABLE IF NOT EXISTS traces (
102 id TEXT PRIMARY KEY,
103 name TEXT NOT NULL,
104 user_id TEXT,
105 session_id TEXT,
106 tags TEXT NOT NULL DEFAULT '[]',
107 metadata TEXT NOT NULL DEFAULT 'null',
108 environment TEXT,
109 release_version TEXT,
110 input TEXT,
111 output TEXT,
112 start_time TEXT NOT NULL,
113 end_time TEXT,
114 total_cost REAL,
115 total_tokens INTEGER
116 )",
117 )
118 .execute(&self.pool)
119 .await
120 .map_err(|e| StoreError::Storage(format!("create traces table: {e}")))?;
121
122 sqlx::query("CREATE INDEX IF NOT EXISTS idx_traces_session ON traces(session_id)")
123 .execute(&self.pool)
124 .await
125 .map_err(|e| StoreError::Storage(format!("create session index: {e}")))?;
126
127 sqlx::query("CREATE INDEX IF NOT EXISTS idx_traces_user ON traces(user_id)")
128 .execute(&self.pool)
129 .await
130 .map_err(|e| StoreError::Storage(format!("create user index: {e}")))?;
131
132 sqlx::query("CREATE INDEX IF NOT EXISTS idx_traces_start ON traces(start_time)")
133 .execute(&self.pool)
134 .await
135 .map_err(|e| StoreError::Storage(format!("create start index: {e}")))?;
136
137 sqlx::query(
138 "CREATE TABLE IF NOT EXISTS observations (
139 id TEXT PRIMARY KEY,
140 trace_id TEXT NOT NULL REFERENCES traces(id),
141 parent_observation_id TEXT,
142 name TEXT NOT NULL,
143 observation_type TEXT NOT NULL,
144 start_time TEXT NOT NULL,
145 end_time TEXT,
146 input TEXT,
147 output TEXT,
148 metadata TEXT NOT NULL DEFAULT 'null',
149 level TEXT NOT NULL DEFAULT 'DEFAULT',
150 status_message TEXT,
151 model TEXT,
152 model_parameters TEXT,
153 usage_input_tokens INTEGER,
154 usage_output_tokens INTEGER,
155 usage_total_tokens INTEGER,
156 usage_cached_tokens INTEGER,
157 cost REAL
158 )",
159 )
160 .execute(&self.pool)
161 .await
162 .map_err(|e| StoreError::Storage(format!("create observations table: {e}")))?;
163
164 sqlx::query("CREATE INDEX IF NOT EXISTS idx_obs_trace ON observations(trace_id)")
165 .execute(&self.pool)
166 .await
167 .map_err(|e| StoreError::Storage(format!("create obs trace index: {e}")))?;
168
169 sqlx::query("CREATE INDEX IF NOT EXISTS idx_obs_start ON observations(start_time)")
170 .execute(&self.pool)
171 .await
172 .map_err(|e| StoreError::Storage(format!("create obs start index: {e}")))?;
173
174 sqlx::query(
175 "CREATE TABLE IF NOT EXISTS sessions (
176 id TEXT PRIMARY KEY,
177 user_id TEXT,
178 created_at TEXT NOT NULL
179 )",
180 )
181 .execute(&self.pool)
182 .await
183 .map_err(|e| StoreError::Storage(format!("create sessions table: {e}")))?;
184
185 debug!("SQLite schema initialized");
186 Ok(())
187 }
188}
189
190#[async_trait::async_trait]
191impl TraceStore for SqliteStore {
192 async fn upsert_trace(&self, trace: &Trace) -> Result<(), StoreError> {
193 sqlx::query(
194 "INSERT OR REPLACE INTO traces (
195 id, name, user_id, session_id, tags, metadata,
196 environment, release_version, input, output,
197 start_time, end_time, total_cost, total_tokens
198 ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
199 )
200 .bind(trace.id.to_string())
201 .bind(&trace.name)
202 .bind(&trace.user_id)
203 .bind(&trace.session_id)
204 .bind(serde_json::to_string(&trace.tags).unwrap_or_else(|_| "[]".to_string()))
205 .bind(serde_json::to_string(&trace.metadata).unwrap_or_else(|_| "null".to_string()))
206 .bind(&trace.environment)
207 .bind(&trace.release)
208 .bind(
209 trace
210 .input
211 .as_ref()
212 .map(|v| serde_json::to_string(v).unwrap_or_else(|_| "null".to_string())),
213 )
214 .bind(
215 trace
216 .output
217 .as_ref()
218 .map(|v| serde_json::to_string(v).unwrap_or_else(|_| "null".to_string())),
219 )
220 .bind(trace.start_time.to_rfc3339())
221 .bind(trace.end_time.map(|t| t.to_rfc3339()))
222 .bind(trace.total_cost)
223 .bind(
224 trace
225 .total_tokens
226 .map(|t| i64::try_from(t).unwrap_or(i64::MAX)),
227 )
228 .execute(&self.pool)
229 .await
230 .map_err(|e| StoreError::Storage(format!("insert trace: {e}")))?;
231 Ok(())
232 }
233
234 async fn insert_observation(&self, observation: &Observation) -> Result<(), StoreError> {
235 sqlx::query(
236 "INSERT OR REPLACE INTO observations (
237 id, trace_id, parent_observation_id, name, observation_type,
238 start_time, end_time, input, output, metadata,
239 level, status_message, model, model_parameters,
240 usage_input_tokens, usage_output_tokens, usage_total_tokens,
241 usage_cached_tokens, cost
242 ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?15, ?16, ?17, ?18, ?19)",
243 )
244 .bind(observation.id.to_string())
245 .bind(observation.trace_id.to_string())
246 .bind(observation.parent_observation_id.map(|id| id.to_string()))
247 .bind(&observation.name)
248 .bind(observation.observation_type.as_str())
249 .bind(observation.start_time.to_rfc3339())
250 .bind(observation.end_time.map(|t| t.to_rfc3339()))
251 .bind(
252 observation
253 .input
254 .as_ref()
255 .map(|v| serde_json::to_string(v).unwrap_or_else(|_| "null".to_string())),
256 )
257 .bind(
258 observation
259 .output
260 .as_ref()
261 .map(|v| serde_json::to_string(v).unwrap_or_else(|_| "null".to_string())),
262 )
263 .bind(serde_json::to_string(&observation.metadata).unwrap_or_else(|_| "null".to_string()))
264 .bind(observation.level.as_str())
265 .bind(&observation.status_message)
266 .bind(&observation.model)
267 .bind(
268 observation
269 .model_parameters
270 .as_ref()
271 .map(|v| serde_json::to_string(v).unwrap_or_else(|_| "null".to_string())),
272 )
273 .bind(
274 observation
275 .usage
276 .as_ref()
277 .map(|u| i64::try_from(u.input_tokens).unwrap_or(i64::MAX)),
278 )
279 .bind(
280 observation
281 .usage
282 .as_ref()
283 .map(|u| i64::try_from(u.output_tokens).unwrap_or(i64::MAX)),
284 )
285 .bind(
286 observation
287 .usage
288 .as_ref()
289 .map(|u| i64::try_from(u.total_tokens).unwrap_or(i64::MAX)),
290 )
291 .bind(observation.usage.as_ref().and_then(|u| {
292 u.cached_tokens
293 .map(|t| i64::try_from(t).unwrap_or(i64::MAX))
294 }))
295 .bind(observation.cost)
296 .execute(&self.pool)
297 .await
298 .map_err(|e| StoreError::Storage(format!("insert observation: {e}")))?;
299 Ok(())
300 }
301
302 async fn upsert_session(&self, session: &Session) -> Result<(), StoreError> {
303 sqlx::query("INSERT OR REPLACE INTO sessions (id, user_id, created_at) VALUES (?, ?, ?)")
304 .bind(&session.id)
305 .bind(&session.user_id)
306 .bind(session.created_at.to_rfc3339())
307 .execute(&self.pool)
308 .await
309 .map_err(|e| StoreError::Storage(format!("insert session: {e}")))?;
310 Ok(())
311 }
312
313 async fn get_trace(&self, id: Id) -> Result<Option<TraceWithObservations>, StoreError> {
314 let id_str = id.to_string();
315
316 let row = sqlx::query_as::<_, TraceRow>(
317 "SELECT id, name, user_id, session_id, tags, metadata,
318 environment, release_version, input, output,
319 start_time, end_time, total_cost, total_tokens
320 FROM traces WHERE id = ?",
321 )
322 .bind(&id_str)
323 .fetch_optional(&self.pool)
324 .await
325 .map_err(|e| StoreError::Storage(format!("query trace: {e}")))?;
326
327 let Some(row) = row else {
328 return Ok(None);
329 };
330
331 let trace = row.into_trace();
332
333 let obs_rows = sqlx::query_as::<_, ObservationRow>(
334 "SELECT id, trace_id, parent_observation_id, name, observation_type,
335 start_time, end_time, input, output, metadata,
336 level, status_message, model, model_parameters,
337 usage_input_tokens, usage_output_tokens, usage_total_tokens,
338 usage_cached_tokens, cost
339 FROM observations WHERE trace_id = ? ORDER BY start_time",
340 )
341 .bind(&id_str)
342 .fetch_all(&self.pool)
343 .await
344 .map_err(|e| StoreError::Storage(format!("query observations: {e}")))?;
345
346 let observations = obs_rows
347 .into_iter()
348 .map(ObservationRow::into_observation)
349 .collect::<Vec<_>>();
350
351 Ok(Some(TraceWithObservations {
352 trace,
353 observations,
354 }))
355 }
356
357 async fn query_traces(
358 &self,
359 query: &TraceQuery,
360 ) -> Result<PaginatedResponse<Trace>, StoreError> {
361 let page = query.page.unwrap_or(0);
362 let page_size = i64::from(query.page_size.unwrap_or(50).min(500));
363 let offset = i64::from(page) * page_size;
364
365 let mut conditions = vec!["1=1".to_string()];
367 if query.session_id.is_some() {
368 conditions.push("session_id = ?".to_string());
369 }
370 if query.user_id.is_some() {
371 conditions.push("user_id = ?".to_string());
372 }
373 if query.name.is_some() {
374 conditions.push("name LIKE ?".to_string());
375 }
376 if query.environment.is_some() {
377 conditions.push("environment = ?".to_string());
378 }
379 if query.from_timestamp.is_some() {
380 conditions.push("start_time >= ?".to_string());
381 }
382 if query.to_timestamp.is_some() {
383 conditions.push("start_time <= ?".to_string());
384 }
385
386 let where_clause = conditions.join(" AND ");
387
388 let count_sql = format!("SELECT COUNT(*) as cnt FROM traces WHERE {where_clause}");
390 let mut count_q = sqlx::query_scalar::<_, i64>(&count_sql);
391 if let Some(ref sid) = query.session_id {
392 count_q = count_q.bind(sid);
393 }
394 if let Some(ref uid) = query.user_id {
395 count_q = count_q.bind(uid);
396 }
397 if let Some(ref name) = query.name {
398 count_q = count_q.bind(format!("%{name}%"));
399 }
400 if let Some(ref env) = query.environment {
401 count_q = count_q.bind(env);
402 }
403 if let Some(from) = query.from_timestamp {
404 count_q = count_q.bind(from.to_rfc3339());
405 }
406 if let Some(to) = query.to_timestamp {
407 count_q = count_q.bind(to.to_rfc3339());
408 }
409
410 let total_count: i64 = count_q
411 .fetch_one(&self.pool)
412 .await
413 .map_err(|e| StoreError::Storage(format!("count traces: {e}")))?;
414
415 let data_sql = format!(
417 "SELECT id, name, user_id, session_id, tags, metadata,
418 environment, release_version, input, output,
419 start_time, end_time, total_cost, total_tokens
420 FROM traces WHERE {where_clause}
421 ORDER BY start_time DESC LIMIT ? OFFSET ?"
422 );
423 let mut data_q = sqlx::query_as::<_, TraceRow>(&data_sql);
424 if let Some(ref sid) = query.session_id {
425 data_q = data_q.bind(sid);
426 }
427 if let Some(ref uid) = query.user_id {
428 data_q = data_q.bind(uid);
429 }
430 if let Some(ref name) = query.name {
431 data_q = data_q.bind(format!("%{name}%"));
432 }
433 if let Some(ref env) = query.environment {
434 data_q = data_q.bind(env);
435 }
436 if let Some(from) = query.from_timestamp {
437 data_q = data_q.bind(from.to_rfc3339());
438 }
439 if let Some(to) = query.to_timestamp {
440 data_q = data_q.bind(to.to_rfc3339());
441 }
442 data_q = data_q.bind(page_size);
443 data_q = data_q.bind(offset);
444
445 let rows = data_q
446 .fetch_all(&self.pool)
447 .await
448 .map_err(|e| StoreError::Storage(format!("query traces: {e}")))?;
449
450 let traces = rows.into_iter().map(TraceRow::into_trace).collect();
451
452 Ok(PaginatedResponse {
453 data: traces,
454 page,
455 page_size: u32::try_from(page_size).unwrap_or(50),
456 total_count: u64::try_from(total_count).unwrap_or(0),
457 })
458 }
459
460 async fn get_session(&self, id: &str) -> Result<Option<Session>, StoreError> {
461 let row = sqlx::query_as::<_, SessionRow>(
462 "SELECT id, user_id, created_at FROM sessions WHERE id = ?",
463 )
464 .bind(id)
465 .fetch_optional(&self.pool)
466 .await
467 .map_err(|e| StoreError::Storage(format!("query session: {e}")))?;
468
469 Ok(row.map(SessionRow::into_session))
470 }
471
472 async fn query_sessions(
473 &self,
474 page: u32,
475 page_size: u32,
476 ) -> Result<PaginatedResponse<Session>, StoreError> {
477 let page_size = i64::from(page_size.min(500));
478 let offset = i64::from(page) * page_size;
479
480 let total_count: i64 = sqlx::query_scalar("SELECT COUNT(*) FROM sessions")
481 .fetch_one(&self.pool)
482 .await
483 .map_err(|e| StoreError::Storage(format!("count sessions: {e}")))?;
484
485 let rows = sqlx::query_as::<_, SessionRow>(
486 "SELECT id, user_id, created_at FROM sessions
487 ORDER BY created_at DESC LIMIT ? OFFSET ?",
488 )
489 .bind(page_size)
490 .bind(offset)
491 .fetch_all(&self.pool)
492 .await
493 .map_err(|e| StoreError::Storage(format!("query sessions: {e}")))?;
494
495 let sessions = rows.into_iter().map(SessionRow::into_session).collect();
496
497 Ok(PaginatedResponse {
498 data: sessions,
499 page,
500 page_size: u32::try_from(page_size).unwrap_or(50),
501 total_count: u64::try_from(total_count).unwrap_or(0),
502 })
503 }
504
505 async fn get_daily_stats(
506 &self,
507 from: DateTime<Utc>,
508 to: DateTime<Utc>,
509 ) -> Result<Vec<DailyStats>, StoreError> {
510 let rows = sqlx::query_as::<_, DailyStatsRow>(
511 "SELECT
512 DATE(start_time) as date,
513 COUNT(*) as trace_count,
514 COALESCE(SUM(total_tokens), 0) as total_tokens,
515 COALESCE(SUM(total_cost), 0.0) as total_cost
516 FROM traces
517 WHERE start_time >= ? AND start_time <= ?
518 GROUP BY DATE(start_time)
519 ORDER BY date",
520 )
521 .bind(from.to_rfc3339())
522 .bind(to.to_rfc3339())
523 .fetch_all(&self.pool)
524 .await
525 .map_err(|e| StoreError::Storage(format!("query stats: {e}")))?;
526
527 Ok(rows.into_iter().map(DailyStatsRow::into_stats).collect())
528 }
529
530 async fn get_model_stats(&self) -> Result<Vec<ModelStats>, StoreError> {
531 let rows = sqlx::query_as::<_, ModelStatsRow>(
532 "SELECT model, COUNT(*) as call_count,
533 COALESCE(SUM(usage_input_tokens), 0) as input_tokens,
534 COALESCE(SUM(usage_output_tokens), 0) as output_tokens,
535 COALESCE(SUM(cost), 0.0) as total_cost,
536 COALESCE(AVG(CAST((julianday(end_time) - julianday(start_time)) * 86400000 AS REAL)), 0.0) as avg_latency_ms
537 FROM observations WHERE model IS NOT NULL
538 GROUP BY model ORDER BY total_cost DESC",
539 )
540 .fetch_all(&self.pool)
541 .await
542 .map_err(|e| StoreError::Storage(format!("query model stats: {e}")))?;
543
544 Ok(rows
545 .into_iter()
546 .map(ModelStatsRow::into_model_stats)
547 .collect())
548 }
549
550 async fn get_summary_stats(&self) -> Result<SummaryStats, StoreError> {
551 let total_traces: i64 = sqlx::query_scalar("SELECT COUNT(*) FROM traces")
552 .fetch_one(&self.pool)
553 .await
554 .map_err(|e| StoreError::Storage(format!("count traces: {e}")))?;
555
556 let total_observations: i64 = sqlx::query_scalar("SELECT COUNT(*) FROM observations")
557 .fetch_one(&self.pool)
558 .await
559 .map_err(|e| StoreError::Storage(format!("count observations: {e}")))?;
560
561 let total_cost: f64 =
562 sqlx::query_scalar("SELECT COALESCE(SUM(total_cost), 0.0) FROM traces")
563 .fetch_one(&self.pool)
564 .await
565 .map_err(|e| StoreError::Storage(format!("sum cost: {e}")))?;
566
567 let total_tokens: i64 =
568 sqlx::query_scalar("SELECT COALESCE(SUM(total_tokens), 0) FROM traces")
569 .fetch_one(&self.pool)
570 .await
571 .map_err(|e| StoreError::Storage(format!("sum tokens: {e}")))?;
572
573 let error_count: i64 =
574 sqlx::query_scalar("SELECT COUNT(*) FROM observations WHERE level = 'ERROR'")
575 .fetch_one(&self.pool)
576 .await
577 .map_err(|e| StoreError::Storage(format!("count errors: {e}")))?;
578
579 let active_sessions: i64 = sqlx::query_scalar("SELECT COUNT(*) FROM sessions")
580 .fetch_one(&self.pool)
581 .await
582 .map_err(|e| StoreError::Storage(format!("count sessions: {e}")))?;
583
584 let durations: Vec<f64> = sqlx::query_scalar(
586 "SELECT CAST((julianday(end_time) - julianday(start_time)) * 86400000 AS REAL) as dur
587 FROM traces WHERE end_time IS NOT NULL ORDER BY dur",
588 )
589 .fetch_all(&self.pool)
590 .await
591 .map_err(|e| StoreError::Storage(format!("query latencies: {e}")))?;
592
593 let (p50, p95, p99) = if durations.is_empty() {
594 (0.0, 0.0, 0.0)
595 } else {
596 let len = durations.len();
597 let p50_idx = len * 50 / 100;
598 let p95_idx = len * 95 / 100;
599 let p99_idx = (len * 99 / 100).min(len - 1);
600 (durations[p50_idx], durations[p95_idx], durations[p99_idx])
601 };
602
603 Ok(SummaryStats {
604 total_traces: u64::try_from(total_traces).unwrap_or(0),
605 total_observations: u64::try_from(total_observations).unwrap_or(0),
606 total_cost,
607 total_tokens: u64::try_from(total_tokens).unwrap_or(0),
608 error_count: u64::try_from(error_count).unwrap_or(0),
609 active_sessions: u64::try_from(active_sessions).unwrap_or(0),
610 latency_p50_ms: p50,
611 latency_p95_ms: p95,
612 latency_p99_ms: p99,
613 })
614 }
615
616 async fn query_enriched_sessions(
617 &self,
618 page: u32,
619 page_size: u32,
620 ) -> Result<PaginatedResponse<EnrichedSession>, StoreError> {
621 let page_size = i64::from(page_size.min(500));
622 let offset = i64::from(page) * page_size;
623
624 let total_count: i64 = sqlx::query_scalar("SELECT COUNT(*) FROM sessions")
625 .fetch_one(&self.pool)
626 .await
627 .map_err(|e| StoreError::Storage(format!("count sessions: {e}")))?;
628
629 let rows = sqlx::query_as::<_, EnrichedSessionRow>(
630 "SELECT s.id, s.user_id, s.created_at,
631 COUNT(t.id) as trace_count,
632 COALESCE(SUM(t.total_cost), 0.0) as total_cost,
633 COALESCE(SUM(t.total_tokens), 0) as total_tokens,
634 MAX(t.start_time) as last_active
635 FROM sessions s LEFT JOIN traces t ON t.session_id = s.id
636 GROUP BY s.id ORDER BY s.created_at DESC LIMIT ? OFFSET ?",
637 )
638 .bind(page_size)
639 .bind(offset)
640 .fetch_all(&self.pool)
641 .await
642 .map_err(|e| StoreError::Storage(format!("query enriched sessions: {e}")))?;
643
644 let sessions = rows
645 .into_iter()
646 .map(EnrichedSessionRow::into_enriched_session)
647 .collect();
648
649 Ok(PaginatedResponse {
650 data: sessions,
651 page,
652 page_size: u32::try_from(page_size).unwrap_or(50),
653 total_count: u64::try_from(total_count).unwrap_or(0),
654 })
655 }
656
657 async fn flush(&self) -> Result<(), StoreError> {
658 Ok(())
660 }
661}
662
663#[derive(sqlx::FromRow)]
666struct TraceRow {
667 id: String,
668 name: String,
669 user_id: Option<String>,
670 session_id: Option<String>,
671 tags: String,
672 metadata: String,
673 environment: Option<String>,
674 release_version: Option<String>,
675 input: Option<String>,
676 output: Option<String>,
677 start_time: String,
678 end_time: Option<String>,
679 total_cost: Option<f64>,
680 total_tokens: Option<i64>,
681}
682
683impl TraceRow {
684 fn into_trace(self) -> Trace {
685 Trace {
686 id: Id::parse_str(&self.id).unwrap_or_else(|_| Id::nil()),
687 name: self.name,
688 user_id: self.user_id,
689 session_id: self.session_id,
690 tags: serde_json::from_str(&self.tags).unwrap_or_default(),
691 metadata: serde_json::from_str(&self.metadata).unwrap_or(serde_json::Value::Null),
692 environment: self.environment,
693 release: self.release_version,
694 input: self.input.and_then(|s| serde_json::from_str(&s).ok()),
695 output: self.output.and_then(|s| serde_json::from_str(&s).ok()),
696 start_time: DateTime::parse_from_rfc3339(&self.start_time)
697 .map_or_else(|_| Utc::now(), |dt| dt.with_timezone(&Utc)),
698 end_time: self.end_time.and_then(|s| {
699 DateTime::parse_from_rfc3339(&s)
700 .map(|dt| dt.with_timezone(&Utc))
701 .ok()
702 }),
703 total_cost: self.total_cost,
704 total_tokens: self.total_tokens.and_then(|t| u64::try_from(t).ok()),
705 }
706 }
707}
708
709#[derive(sqlx::FromRow)]
710struct ObservationRow {
711 id: String,
712 trace_id: String,
713 parent_observation_id: Option<String>,
714 name: String,
715 observation_type: String,
716 start_time: String,
717 end_time: Option<String>,
718 input: Option<String>,
719 output: Option<String>,
720 metadata: String,
721 level: String,
722 status_message: Option<String>,
723 model: Option<String>,
724 model_parameters: Option<String>,
725 usage_input_tokens: Option<i64>,
726 usage_output_tokens: Option<i64>,
727 usage_total_tokens: Option<i64>,
728 usage_cached_tokens: Option<i64>,
729 cost: Option<f64>,
730}
731
732impl ObservationRow {
733 fn into_observation(self) -> Observation {
734 let observation_type = match self.observation_type.as_str() {
735 "GENERATION" => ObservationType::Generation,
736 "TOOL_CALL" => ObservationType::ToolCall,
737 "RETRIEVAL" => ObservationType::Retrieval,
738 _ => ObservationType::Span,
739 };
740
741 let level = match self.level.as_str() {
742 "DEBUG" => ObservationLevel::Debug,
743 "WARNING" => ObservationLevel::Warning,
744 "ERROR" => ObservationLevel::Error,
745 _ => ObservationLevel::Default,
746 };
747
748 let usage = (self.usage_input_tokens.is_some()
749 || self.usage_output_tokens.is_some()
750 || self.usage_total_tokens.is_some())
751 .then(|| TokenUsage {
752 input_tokens: self
753 .usage_input_tokens
754 .and_then(|t| u64::try_from(t).ok())
755 .unwrap_or(0),
756 output_tokens: self
757 .usage_output_tokens
758 .and_then(|t| u64::try_from(t).ok())
759 .unwrap_or(0),
760 total_tokens: self
761 .usage_total_tokens
762 .and_then(|t| u64::try_from(t).ok())
763 .unwrap_or(0),
764 cached_tokens: self.usage_cached_tokens.and_then(|t| u64::try_from(t).ok()),
765 });
766
767 Observation {
768 id: Id::parse_str(&self.id).unwrap_or_else(|_| Id::nil()),
769 trace_id: Id::parse_str(&self.trace_id).unwrap_or_else(|_| Id::nil()),
770 parent_observation_id: self
771 .parent_observation_id
772 .and_then(|s| Id::parse_str(&s).ok()),
773 name: self.name,
774 observation_type,
775 start_time: DateTime::parse_from_rfc3339(&self.start_time)
776 .map_or_else(|_| Utc::now(), |dt| dt.with_timezone(&Utc)),
777 end_time: self.end_time.and_then(|s| {
778 DateTime::parse_from_rfc3339(&s)
779 .map(|dt| dt.with_timezone(&Utc))
780 .ok()
781 }),
782 input: self.input.and_then(|s| serde_json::from_str(&s).ok()),
783 output: self.output.and_then(|s| serde_json::from_str(&s).ok()),
784 metadata: serde_json::from_str(&self.metadata).unwrap_or(serde_json::Value::Null),
785 level,
786 status_message: self.status_message,
787 model: self.model,
788 model_parameters: self
789 .model_parameters
790 .and_then(|s| serde_json::from_str(&s).ok()),
791 usage,
792 cost: self.cost,
793 }
794 }
795}
796
797#[derive(sqlx::FromRow)]
798struct SessionRow {
799 id: String,
800 user_id: Option<String>,
801 created_at: String,
802}
803
804impl SessionRow {
805 fn into_session(self) -> Session {
806 Session {
807 id: self.id,
808 user_id: self.user_id,
809 created_at: DateTime::parse_from_rfc3339(&self.created_at)
810 .map_or_else(|_| Utc::now(), |dt| dt.with_timezone(&Utc)),
811 }
812 }
813}
814
815#[derive(sqlx::FromRow)]
816struct DailyStatsRow {
817 date: String,
818 trace_count: i64,
819 total_tokens: i64,
820 total_cost: f64,
821}
822
823impl DailyStatsRow {
824 fn into_stats(self) -> DailyStats {
825 DailyStats {
826 date: self.date,
827 trace_count: u64::try_from(self.trace_count).unwrap_or(0),
828 observation_count: 0,
829 total_tokens: u64::try_from(self.total_tokens).unwrap_or(0),
830 total_cost: self.total_cost,
831 total_duration_ms: 0,
832 }
833 }
834}
835
836#[derive(sqlx::FromRow)]
837struct ModelStatsRow {
838 model: String,
839 call_count: i64,
840 input_tokens: i64,
841 output_tokens: i64,
842 total_cost: f64,
843 avg_latency_ms: f64,
844}
845
846impl ModelStatsRow {
847 fn into_model_stats(self) -> ModelStats {
848 ModelStats {
849 model: self.model,
850 call_count: u64::try_from(self.call_count).unwrap_or(0),
851 input_tokens: u64::try_from(self.input_tokens).unwrap_or(0),
852 output_tokens: u64::try_from(self.output_tokens).unwrap_or(0),
853 total_cost: self.total_cost,
854 avg_latency_ms: self.avg_latency_ms,
855 }
856 }
857}
858
859#[derive(sqlx::FromRow)]
860struct EnrichedSessionRow {
861 id: String,
862 user_id: Option<String>,
863 created_at: String,
864 trace_count: i64,
865 total_cost: f64,
866 total_tokens: i64,
867 last_active: Option<String>,
868}
869
870impl EnrichedSessionRow {
871 fn into_enriched_session(self) -> EnrichedSession {
872 EnrichedSession {
873 id: self.id,
874 user_id: self.user_id,
875 created_at: self.created_at,
876 trace_count: u64::try_from(self.trace_count).unwrap_or(0),
877 total_cost: self.total_cost,
878 total_tokens: u64::try_from(self.total_tokens).unwrap_or(0),
879 last_active: self.last_active,
880 }
881 }
882}
883
884#[cfg(test)]
885mod tests {
886 use super::*;
887
888 #[tokio::test]
889 async fn sqlite_store_create_and_insert_trace() {
890 let store = SqliteStore::new_memory().await.unwrap();
891 let mut trace = Trace::new("test_graph");
892 trace.session_id = Some("thread-1".to_string());
893 trace.user_id = Some("user-1".to_string());
894 trace.complete(None, Some(0.05), Some(100));
895
896 store.upsert_trace(&trace).await.unwrap();
897
898 let loaded = store.get_trace(trace.id).await.unwrap();
899 assert!(loaded.is_some());
900 let loaded = loaded.unwrap();
901 assert_eq!(loaded.trace.name, "test_graph");
902 assert_eq!(loaded.trace.session_id.as_deref(), Some("thread-1"));
903 assert_eq!(loaded.trace.total_cost, Some(0.05));
904 }
905
906 #[tokio::test]
907 async fn sqlite_store_insert_observation() {
908 let store = SqliteStore::new_memory().await.unwrap();
909 let trace = Trace::new("test_graph");
910 store.upsert_trace(&trace).await.unwrap();
911
912 let mut obs = Observation::generation(trace.id, "llm_call", "claude-sonnet-4-20250514");
913 obs.input = Some(serde_json::json!({"prompt": "hello"}));
914 obs.usage = Some(TokenUsage {
915 input_tokens: 100,
916 output_tokens: 50,
917 total_tokens: 150,
918 cached_tokens: None,
919 });
920 obs.cost = Some(0.003);
921 obs.complete(Some(serde_json::json!({"text": "hi there"})));
922
923 store.insert_observation(&obs).await.unwrap();
924
925 let loaded = store.get_trace(trace.id).await.unwrap().unwrap();
926 assert_eq!(loaded.observations.len(), 1);
927 assert_eq!(loaded.observations[0].name, "llm_call");
928 assert_eq!(
929 loaded.observations[0].observation_type,
930 ObservationType::Generation
931 );
932 }
933
934 #[tokio::test]
935 async fn sqlite_store_query_traces() {
936 let store = SqliteStore::new_memory().await.unwrap();
937
938 for i in 0..5 {
939 let mut trace = Trace::new(format!("graph_{i}"));
940 trace.session_id = Some("thread-1".to_string());
941 store.upsert_trace(&trace).await.unwrap();
942 }
943
944 let query = TraceQuery {
945 session_id: Some("thread-1".to_string()),
946 ..Default::default()
947 };
948 let result = store.query_traces(&query).await.unwrap();
949 assert_eq!(result.data.len(), 5);
950 assert_eq!(result.total_count, 5);
951 }
952
953 #[tokio::test]
954 async fn sqlite_store_sessions() {
955 let store = SqliteStore::new_memory().await.unwrap();
956
957 let session = Session::new("thread-1");
958 store.upsert_session(&session).await.unwrap();
959
960 let loaded = store.get_session("thread-1").await.unwrap();
961 assert!(loaded.is_some());
962 assert_eq!(loaded.unwrap().id, "thread-1");
963
964 let pages = store.query_sessions(0, 10).await.unwrap();
965 assert_eq!(pages.data.len(), 1);
966 }
967
968 #[tokio::test]
969 async fn sqlite_store_nested_observations() {
970 let store = SqliteStore::new_memory().await.unwrap();
971 let trace = Trace::new("test_graph");
972 store.upsert_trace(&trace).await.unwrap();
973
974 let mut superstep = Observation::span(trace.id, "juncture.superstep");
975 superstep.complete(None);
976 store.insert_observation(&superstep).await.unwrap();
977
978 let mut node =
979 Observation::span(trace.id, "juncture.node.execute").with_parent(superstep.id);
980 node.complete(None);
981 store.insert_observation(&node).await.unwrap();
982
983 let mut llm = Observation::generation(trace.id, "llm_call", "model").with_parent(node.id);
984 llm.complete(None);
985 store.insert_observation(&llm).await.unwrap();
986
987 let loaded = store.get_trace(trace.id).await.unwrap().unwrap();
988 assert_eq!(loaded.observations.len(), 3);
989
990 let llm_loaded = loaded
991 .observations
992 .iter()
993 .find(|o| o.name == "llm_call")
994 .unwrap();
995 assert_eq!(llm_loaded.parent_observation_id, Some(node.id));
996 }
997
998 #[tokio::test]
999 async fn sqlite_store_daily_stats() {
1000 let store = SqliteStore::new_memory().await.unwrap();
1001
1002 let mut trace = Trace::new("test");
1003 trace.total_cost = Some(0.05);
1004 trace.total_tokens = Some(100);
1005 store.upsert_trace(&trace).await.unwrap();
1006
1007 let from = Utc::now() - chrono::Duration::days(1);
1008 let to = Utc::now() + chrono::Duration::days(1);
1009 let stats = store.get_daily_stats(from, to).await.unwrap();
1010 assert!(!stats.is_empty());
1011 }
1012}