1use crate::TraceStore;
2use anyhow::Result;
3use sqlx::{SqlitePool, sqlite::SqlitePoolOptions};
4use std::path::PathBuf;
5use trace_weft_core::{EventRecord, SpanRecord};
6
7pub struct SqliteRecorder {
8 pool: SqlitePool,
9}
10
11impl SqliteRecorder {
12 pub async fn new(db_path: PathBuf) -> Result<Self> {
13 if let Some(parent) = db_path.parent() {
14 tokio::fs::create_dir_all(parent).await?;
15 }
16
17 let db_url = format!("sqlite://{}?mode=rwc", db_path.to_string_lossy());
18
19 let pool = SqlitePoolOptions::new().connect(&db_url).await?;
20
21 Self::from_pool(pool).await
22 }
23
24 pub async fn from_pool(pool: SqlitePool) -> Result<Self> {
25 sqlx::migrate!("./migrations").run(&pool).await?;
27
28 Ok(Self { pool })
29 }
30}
31
32#[async_trait::async_trait]
33impl TraceStore for SqliteRecorder {
34 async fn record_span(&self, span: SpanRecord) -> Result<()> {
35 let trace_id = span.trace_id.0.to_string();
36 let span_id = span.span_id.0.to_string();
37 let parent_span_id = span.parent_span_id.map(|id| id.0.to_string());
38 let run_id = span.run_id.0.to_string();
39 let session_id = span.session_id.map(|id| id.0.to_string());
40 let span_kind = serde_json::to_string(&span.span_kind)?
41 .trim_matches('"')
42 .to_string();
43 let status = serde_json::to_string(&span.status)?
44 .trim_matches('"')
45 .to_string();
46
47 let attributes = serde_json::to_string(&span.attributes)?;
48 let otel_attributes = serde_json::to_string(&span.otel_attributes)?;
49 let openinference_attributes = serde_json::to_string(&span.openinference_attributes)?;
50 let memory_state = span
51 .memory_state
52 .map(|s| serde_json::to_string(&s).unwrap());
53
54 let input_ref = span.input_ref.map(|r| serde_json::to_string(&r).unwrap());
55 let output_ref = span.output_ref.map(|r| serde_json::to_string(&r).unwrap());
56 let retrieved_document_refs = serde_json::to_string(&span.retrieved_document_refs)?;
57 let token_usage = span.token_usage.map(|u| serde_json::to_string(&u).unwrap());
58 let cost_estimate = span
59 .cost_estimate
60 .map(|c| serde_json::to_string(&c).unwrap());
61 let redaction_policy = serde_json::to_string(&span.redaction_policy)?
62 .trim_matches('"')
63 .to_string();
64
65 sqlx::query(
66 r#"
67 INSERT INTO spans (
68 trace_id, span_id, parent_span_id, run_id, session_id, user_id_hash,
69 span_kind, name, start_time, end_time, status, status_message, error_type, error_message_redacted,
70 attributes, otel_attributes, openinference_attributes, memory_state,
71 input_ref, output_ref, prompt_template_id, prompt_version,
72 model_provider, model_name, tool_name, tool_schema_hash, retrieval_query_hash,
73 retrieved_document_refs, token_usage, cost_estimate, latency_ms, retry_count, cache_hit,
74 redaction_policy, schema_version, project_id
75 ) VALUES (
76 ?, ?, ?, ?, ?, ?,
77 ?, ?, ?, ?, ?, ?, ?, ?,
78 ?, ?, ?, ?,
79 ?, ?, ?, ?,
80 ?, ?, ?, ?, ?,
81 ?, ?, ?, ?, ?, ?,
82 ?, ?, ?
83 )
84 "#,
85 )
86 .bind(trace_id).bind(span_id).bind(parent_span_id).bind(run_id).bind(session_id).bind(span.user_id_hash)
87 .bind(span_kind).bind(span.name).bind(span.start_time as i64).bind(span.end_time.map(|t| t as i64)).bind(status).bind(span.status_message).bind(span.error_type).bind(span.error_message_redacted)
88 .bind(attributes).bind(otel_attributes).bind(openinference_attributes).bind(memory_state)
89 .bind(input_ref).bind(output_ref).bind(span.prompt_template_id).bind(span.prompt_version)
90 .bind(span.model_provider).bind(span.model_name).bind(span.tool_name).bind(span.tool_schema_hash).bind(span.retrieval_query_hash)
91 .bind(retrieved_document_refs).bind(token_usage).bind(cost_estimate).bind(span.latency_ms.map(|t| t as i64)).bind(span.retry_count).bind(span.cache_hit)
92 .bind(redaction_policy).bind(span.schema_version).bind(span.project_id)
93 .execute(&self.pool)
94 .await?;
95
96 Ok(())
97 }
98
99 async fn record_event(&self, event: EventRecord) -> Result<()> {
100 let event_kind = serde_json::to_string(&event.event_kind)?
101 .trim_matches('"')
102 .to_string();
103 let attributes = serde_json::to_string(&event.attributes)?;
104
105 sqlx::query(
106 r#"
107 INSERT INTO events (
108 event_id, trace_id, run_id, parent_span_id, seq,
109 event_kind, name, timestamp, attributes, schema_version
110 ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
111 "#,
112 )
113 .bind(event.event_id.0.to_string())
114 .bind(event.trace_id.0.to_string())
115 .bind(event.run_id.0.to_string())
116 .bind(event.parent_span_id.map(|id| id.0.to_string()))
117 .bind(event.seq as i64)
118 .bind(event_kind)
119 .bind(event.name)
120 .bind(event.timestamp as i64)
121 .bind(attributes)
122 .bind(event.schema_version)
123 .execute(&self.pool)
124 .await?;
125
126 Ok(())
127 }
128}