Skip to main content

llmtrace_storage/
sqlite.rs

1//! SQLite storage backend implementations.
2//!
3//! Provides [`SqliteTraceRepository`] for trace/span storage and
4//! [`SqliteMetadataRepository`] for tenant/config/audit storage,
5//! both backed by a shared SQLite connection pool.
6
7use async_trait::async_trait;
8use chrono::{DateTime, Utc};
9use llmtrace_core::{
10    AgentAction, ApiKeyRecord, ApiKeyRole, AuditEvent, AuditQuery, LLMProvider, LLMTraceError,
11    MetadataRepository, Result, SecurityFinding, SpanEvent, StorageStats, Tenant, TenantConfig,
12    TenantId, TraceEvent, TraceQuery, TraceRepository, TraceSpan,
13};
14use sqlx::sqlite::{SqliteConnectOptions, SqliteJournalMode};
15use sqlx::{QueryBuilder, Row, Sqlite, SqlitePool};
16use std::collections::HashMap;
17use std::str::FromStr;
18use uuid::Uuid;
19
20// ---------------------------------------------------------------------------
21// Schema migrations — now driven by versioned SQL files via migration runner
22// ---------------------------------------------------------------------------
23
24// ---------------------------------------------------------------------------
25// Shared pool builder
26// ---------------------------------------------------------------------------
27
28/// Open (or create) a SQLite connection pool configured for LLMTrace.
29pub(crate) async fn open_pool(database_url: &str) -> Result<SqlitePool> {
30    let connect_opts = SqliteConnectOptions::from_str(database_url)
31        .map_err(|e| LLMTraceError::Storage(format!("Invalid database URL: {e}")))?
32        .create_if_missing(true)
33        .journal_mode(SqliteJournalMode::Wal);
34
35    // For in-memory databases every connection gets its own database, so
36    // restrict the pool to a single connection to keep a consistent view.
37    let max_conns: u32 = if database_url.contains(":memory:") {
38        1
39    } else {
40        10
41    };
42
43    sqlx::pool::PoolOptions::<Sqlite>::new()
44        .max_connections(max_conns)
45        .connect_with(connect_opts)
46        .await
47        .map_err(|e| LLMTraceError::Storage(format!("Failed to connect to SQLite: {e}")))
48}
49
50/// Run all versioned SQLite migrations against the given pool.
51///
52/// Delegates to the [`migration`](crate::migration) module which tracks
53/// applied versions in a `schema_version` table and only applies pending
54/// migrations.
55pub(crate) async fn run_migrations(pool: &SqlitePool) -> Result<()> {
56    crate::migration::run_sqlite_migrations(pool).await
57}
58
59// ---------------------------------------------------------------------------
60// Serialisation helpers
61// ---------------------------------------------------------------------------
62
63/// Serialize an [`LLMProvider`] for storage as TEXT.
64fn serialize_provider(provider: &LLMProvider) -> String {
65    serde_json::to_string(provider).expect("LLMProvider serialization cannot fail")
66}
67
68/// Deserialize an [`LLMProvider`] from its stored TEXT representation.
69fn deserialize_provider(s: &str) -> Result<LLMProvider> {
70    serde_json::from_str(s)
71        .map_err(|e| LLMTraceError::Storage(format!("Invalid provider value '{s}': {e}")))
72}
73
74/// Parse a [`Uuid`] from a TEXT column value.
75fn parse_uuid(s: &str) -> Result<Uuid> {
76    Uuid::parse_str(s).map_err(|e| LLMTraceError::Storage(format!("Invalid UUID '{s}': {e}")))
77}
78
79/// Parse a [`DateTime<Utc>`] from an RFC 3339 TEXT column value.
80fn parse_datetime(s: &str) -> Result<DateTime<Utc>> {
81    DateTime::parse_from_rfc3339(s)
82        .map(|dt| dt.with_timezone(&Utc))
83        .map_err(|e| LLMTraceError::Storage(format!("Invalid datetime '{s}': {e}")))
84}
85
86// ---------------------------------------------------------------------------
87// Row ↔ TraceSpan conversion
88// ---------------------------------------------------------------------------
89
90/// Reconstruct a [`TraceSpan`] from a SQLite row.
91fn span_from_row(row: &sqlx::sqlite::SqliteRow) -> Result<TraceSpan> {
92    let trace_id = parse_uuid(&row.get::<String, _>("trace_id"))?;
93    let span_id = parse_uuid(&row.get::<String, _>("span_id"))?;
94    let parent_span_id = row
95        .get::<Option<String>, _>("parent_span_id")
96        .map(|s| parse_uuid(&s))
97        .transpose()?;
98    let tenant_id = TenantId(parse_uuid(&row.get::<String, _>("tenant_id"))?);
99    let operation_name: String = row.get("operation_name");
100    let start_time = parse_datetime(&row.get::<String, _>("start_time"))?;
101    let end_time = row
102        .get::<Option<String>, _>("end_time")
103        .map(|s| parse_datetime(&s))
104        .transpose()?;
105    let provider = deserialize_provider(&row.get::<String, _>("provider"))?;
106    let model_name: String = row.get("model_name");
107    let prompt: String = row.get("prompt");
108    let response: Option<String> = row.get("response");
109    let prompt_tokens = row.get::<Option<i64>, _>("prompt_tokens").map(|v| v as u32);
110    let completion_tokens = row
111        .get::<Option<i64>, _>("completion_tokens")
112        .map(|v| v as u32);
113    let total_tokens = row.get::<Option<i64>, _>("total_tokens").map(|v| v as u32);
114    let time_to_first_token_ms = row
115        .get::<Option<i64>, _>("time_to_first_token_ms")
116        .map(|v| v as u64);
117    let duration_ms = row.get::<Option<i64>, _>("duration_ms").map(|v| v as u64);
118    let status_code = row.get::<Option<i64>, _>("status_code").map(|v| v as u16);
119    let error_message: Option<String> = row.get("error_message");
120    let estimated_cost_usd: Option<f64> = row.get("estimated_cost_usd");
121    let security_score = row.get::<Option<i64>, _>("security_score").map(|v| v as u8);
122
123    let security_findings: Vec<SecurityFinding> = {
124        let raw: String = row.get("security_findings");
125        serde_json::from_str(&raw)
126            .map_err(|e| LLMTraceError::Storage(format!("Invalid security_findings JSON: {e}")))?
127    };
128    let tags: HashMap<String, String> = {
129        let raw: String = row.get("tags");
130        serde_json::from_str(&raw)
131            .map_err(|e| LLMTraceError::Storage(format!("Invalid tags JSON: {e}")))?
132    };
133    let events: Vec<SpanEvent> = {
134        let raw: String = row.get("events");
135        serde_json::from_str(&raw)
136            .map_err(|e| LLMTraceError::Storage(format!("Invalid events JSON: {e}")))?
137    };
138    let agent_actions: Vec<AgentAction> = {
139        let raw: String = row.get("agent_actions");
140        serde_json::from_str(&raw)
141            .map_err(|e| LLMTraceError::Storage(format!("Invalid agent_actions JSON: {e}")))?
142    };
143
144    Ok(TraceSpan {
145        trace_id,
146        span_id,
147        parent_span_id,
148        tenant_id,
149        operation_name,
150        start_time,
151        end_time,
152        provider,
153        model_name,
154        prompt,
155        response,
156        prompt_tokens,
157        completion_tokens,
158        total_tokens,
159        time_to_first_token_ms,
160        duration_ms,
161        status_code,
162        error_message,
163        estimated_cost_usd,
164        security_score,
165        security_findings,
166        tags,
167        events,
168        agent_actions,
169    })
170}
171
172// ---------------------------------------------------------------------------
173// Row ↔ ComplianceReportRecord conversion
174// ---------------------------------------------------------------------------
175
176/// Reconstruct a [`ComplianceReportRecord`] from a SQLite row.
177fn report_from_row(row: &sqlx::sqlite::SqliteRow) -> Result<llmtrace_core::ComplianceReportRecord> {
178    let content: Option<serde_json::Value> = row
179        .get::<Option<String>, _>("content")
180        .map(|s| serde_json::from_str(&s))
181        .transpose()
182        .map_err(|e| LLMTraceError::Storage(format!("Invalid report content JSON: {e}")))?;
183
184    Ok(llmtrace_core::ComplianceReportRecord {
185        id: parse_uuid(&row.get::<String, _>("id"))?,
186        tenant_id: TenantId(parse_uuid(&row.get::<String, _>("tenant_id"))?),
187        report_type: row.get("report_type"),
188        status: row.get("status"),
189        period_start: parse_datetime(&row.get::<String, _>("period_start"))?,
190        period_end: parse_datetime(&row.get::<String, _>("period_end"))?,
191        created_at: parse_datetime(&row.get::<String, _>("created_at"))?,
192        completed_at: row
193            .get::<Option<String>, _>("completed_at")
194            .map(|s| parse_datetime(&s))
195            .transpose()?,
196        content,
197        error: row.get("error"),
198    })
199}
200
201// ===========================================================================
202// SqliteTraceRepository
203// ===========================================================================
204
205/// SQLite-backed trace repository.
206///
207/// Stores trace metadata in a `traces` table and individual spans in a `spans`
208/// table with proper indexes for efficient querying by tenant, time range,
209/// provider, model, and security score.
210pub struct SqliteTraceRepository {
211    pool: SqlitePool,
212}
213
214impl SqliteTraceRepository {
215    /// Open (or create) a SQLite database and run trace schema migrations.
216    ///
217    /// # Examples
218    ///
219    /// ```no_run
220    /// # async fn example() -> llmtrace_core::Result<()> {
221    /// let repo = llmtrace_storage::SqliteTraceRepository::new("sqlite::memory:").await?;
222    /// # Ok(())
223    /// # }
224    /// ```
225    pub async fn new(database_url: &str) -> Result<Self> {
226        let pool = open_pool(database_url).await?;
227        run_migrations(&pool).await?;
228        Ok(Self { pool })
229    }
230
231    /// Create from an existing pool (used by [`StorageProfile`](crate::StorageProfile) factory).
232    pub(crate) async fn from_pool(pool: SqlitePool) -> Result<Self> {
233        run_migrations(&pool).await?;
234        Ok(Self { pool })
235    }
236
237    /// Insert a single span using the provided executor (pool or transaction).
238    async fn insert_span<'e, E>(&self, executor: E, span: &TraceSpan) -> Result<()>
239    where
240        E: sqlx::Executor<'e, Database = Sqlite>,
241    {
242        let security_findings_json = serde_json::to_string(&span.security_findings)
243            .map_err(|e| LLMTraceError::Storage(format!("serialize security_findings: {e}")))?;
244        let tags_json = serde_json::to_string(&span.tags)
245            .map_err(|e| LLMTraceError::Storage(format!("serialize tags: {e}")))?;
246        let events_json = serde_json::to_string(&span.events)
247            .map_err(|e| LLMTraceError::Storage(format!("serialize events: {e}")))?;
248        let agent_actions_json = serde_json::to_string(&span.agent_actions)
249            .map_err(|e| LLMTraceError::Storage(format!("serialize agent_actions: {e}")))?;
250
251        sqlx::query(
252            "INSERT OR REPLACE INTO spans (
253                span_id, trace_id, parent_span_id, tenant_id, operation_name,
254                start_time, end_time, provider, model_name, prompt,
255                response, prompt_tokens, completion_tokens, total_tokens,
256                time_to_first_token_ms, duration_ms, status_code, error_message,
257                estimated_cost_usd, security_score, security_findings, tags, events,
258                agent_actions
259            ) VALUES (
260                ?1, ?2, ?3, ?4, ?5,
261                ?6, ?7, ?8, ?9, ?10,
262                ?11, ?12, ?13, ?14,
263                ?15, ?16, ?17, ?18,
264                ?19, ?20, ?21, ?22, ?23,
265                ?24
266            )",
267        )
268        .bind(span.span_id.to_string())
269        .bind(span.trace_id.to_string())
270        .bind(span.parent_span_id.map(|id| id.to_string()))
271        .bind(span.tenant_id.0.to_string())
272        .bind(&span.operation_name)
273        .bind(span.start_time.to_rfc3339())
274        .bind(span.end_time.map(|t| t.to_rfc3339()))
275        .bind(serialize_provider(&span.provider))
276        .bind(&span.model_name)
277        .bind(&span.prompt)
278        .bind(span.response.as_deref())
279        .bind(span.prompt_tokens.map(|v| v as i64))
280        .bind(span.completion_tokens.map(|v| v as i64))
281        .bind(span.total_tokens.map(|v| v as i64))
282        .bind(span.time_to_first_token_ms.map(|v| v as i64))
283        .bind(span.duration_ms.map(|v| v as i64))
284        .bind(span.status_code.map(|v| v as i64))
285        .bind(span.error_message.as_deref())
286        .bind(span.estimated_cost_usd)
287        .bind(span.security_score.map(|v| v as i64))
288        .bind(&security_findings_json)
289        .bind(&tags_json)
290        .bind(&events_json)
291        .bind(&agent_actions_json)
292        .execute(executor)
293        .await
294        .map_err(|e| LLMTraceError::Storage(format!("Failed to insert span: {e}")))?;
295
296        Ok(())
297    }
298
299    /// Load all spans belonging to a set of trace IDs for a given tenant.
300    async fn load_spans_for_traces(
301        &self,
302        tenant_id: &TenantId,
303        trace_ids: &[String],
304    ) -> Result<HashMap<String, Vec<TraceSpan>>> {
305        if trace_ids.is_empty() {
306            return Ok(HashMap::new());
307        }
308
309        let mut qb = QueryBuilder::<Sqlite>::new("SELECT * FROM spans WHERE tenant_id = ");
310        qb.push_bind(tenant_id.0.to_string());
311        qb.push(" AND trace_id IN (");
312        let mut sep = qb.separated(", ");
313        for tid in trace_ids {
314            sep.push_bind(tid.clone());
315        }
316        sep.push_unseparated(") ORDER BY start_time ASC");
317
318        let rows = qb
319            .build()
320            .fetch_all(&self.pool)
321            .await
322            .map_err(|e| LLMTraceError::Storage(format!("Failed to load spans: {e}")))?;
323
324        let mut grouped: HashMap<String, Vec<TraceSpan>> = HashMap::new();
325        for row in &rows {
326            let span = span_from_row(row)?;
327            grouped
328                .entry(span.trace_id.to_string())
329                .or_default()
330                .push(span);
331        }
332        Ok(grouped)
333    }
334
335    /// Build a dynamic span-filter query, returning matching `trace_id` values.
336    fn build_trace_id_query<'args>(&self, query: &TraceQuery) -> QueryBuilder<'args, Sqlite> {
337        let mut qb = QueryBuilder::<Sqlite>::new(
338            "SELECT DISTINCT s.trace_id FROM spans s \
339             JOIN traces t ON s.trace_id = t.trace_id AND s.tenant_id = t.tenant_id \
340             WHERE s.tenant_id = ",
341        );
342        qb.push_bind(query.tenant_id.0.to_string());
343
344        if let Some(ref trace_id) = query.trace_id {
345            qb.push(" AND s.trace_id = ");
346            qb.push_bind(trace_id.to_string());
347        }
348        if let Some(ref start) = query.start_time {
349            qb.push(" AND s.start_time >= ");
350            qb.push_bind(start.to_rfc3339());
351        }
352        if let Some(ref end) = query.end_time {
353            qb.push(" AND s.start_time <= ");
354            qb.push_bind(end.to_rfc3339());
355        }
356        if let Some(ref provider) = query.provider {
357            qb.push(" AND s.provider = ");
358            qb.push_bind(serialize_provider(provider));
359        }
360        if let Some(ref model) = query.model_name {
361            qb.push(" AND s.model_name = ");
362            qb.push_bind(model.clone());
363        }
364        if let Some(ref op) = query.operation_name {
365            qb.push(" AND s.operation_name = ");
366            qb.push_bind(op.clone());
367        }
368        if let Some(min) = query.min_security_score {
369            qb.push(" AND s.security_score >= ");
370            qb.push_bind(min as i64);
371        }
372        if let Some(max) = query.max_security_score {
373            qb.push(" AND s.security_score <= ");
374            qb.push_bind(max as i64);
375        }
376
377        qb.push(" ORDER BY t.created_at DESC");
378
379        if let Some(limit) = query.limit {
380            qb.push(" LIMIT ");
381            qb.push_bind(limit as i64);
382        }
383        if let Some(offset) = query.offset {
384            qb.push(" OFFSET ");
385            qb.push_bind(offset as i64);
386        }
387
388        qb
389    }
390
391    /// Build a dynamic span-filter query returning full span rows.
392    fn build_span_query<'args>(&self, query: &TraceQuery) -> QueryBuilder<'args, Sqlite> {
393        let mut qb = QueryBuilder::<Sqlite>::new("SELECT * FROM spans WHERE tenant_id = ");
394        qb.push_bind(query.tenant_id.0.to_string());
395
396        if let Some(ref trace_id) = query.trace_id {
397            qb.push(" AND trace_id = ");
398            qb.push_bind(trace_id.to_string());
399        }
400        if let Some(ref start) = query.start_time {
401            qb.push(" AND start_time >= ");
402            qb.push_bind(start.to_rfc3339());
403        }
404        if let Some(ref end) = query.end_time {
405            qb.push(" AND start_time <= ");
406            qb.push_bind(end.to_rfc3339());
407        }
408        if let Some(ref provider) = query.provider {
409            qb.push(" AND provider = ");
410            qb.push_bind(serialize_provider(provider));
411        }
412        if let Some(ref model) = query.model_name {
413            qb.push(" AND model_name = ");
414            qb.push_bind(model.clone());
415        }
416        if let Some(ref op) = query.operation_name {
417            qb.push(" AND operation_name = ");
418            qb.push_bind(op.clone());
419        }
420        if let Some(min) = query.min_security_score {
421            qb.push(" AND security_score >= ");
422            qb.push_bind(min as i64);
423        }
424        if let Some(max) = query.max_security_score {
425            qb.push(" AND security_score <= ");
426            qb.push_bind(max as i64);
427        }
428
429        qb.push(" ORDER BY start_time DESC");
430
431        if let Some(limit) = query.limit {
432            qb.push(" LIMIT ");
433            qb.push_bind(limit as i64);
434        }
435        if let Some(offset) = query.offset {
436            qb.push(" OFFSET ");
437            qb.push_bind(offset as i64);
438        }
439
440        qb
441    }
442}
443
444#[async_trait]
445impl TraceRepository for SqliteTraceRepository {
446    async fn store_trace(&self, trace: &TraceEvent) -> Result<()> {
447        let mut tx = self
448            .pool
449            .begin()
450            .await
451            .map_err(|e| LLMTraceError::Storage(format!("Failed to begin transaction: {e}")))?;
452
453        sqlx::query(
454            "INSERT OR REPLACE INTO traces (trace_id, tenant_id, created_at) VALUES (?1, ?2, ?3)",
455        )
456        .bind(trace.trace_id.to_string())
457        .bind(trace.tenant_id.0.to_string())
458        .bind(trace.created_at.to_rfc3339())
459        .execute(&mut *tx)
460        .await
461        .map_err(|e| LLMTraceError::Storage(format!("Failed to insert trace: {e}")))?;
462
463        for span in &trace.spans {
464            self.insert_span(&mut *tx, span).await?;
465        }
466
467        tx.commit()
468            .await
469            .map_err(|e| LLMTraceError::Storage(format!("Failed to commit transaction: {e}")))?;
470
471        Ok(())
472    }
473
474    async fn store_span(&self, span: &TraceSpan) -> Result<()> {
475        let mut tx = self
476            .pool
477            .begin()
478            .await
479            .map_err(|e| LLMTraceError::Storage(format!("Failed to begin transaction: {e}")))?;
480
481        sqlx::query(
482            "INSERT OR IGNORE INTO traces (trace_id, tenant_id, created_at) VALUES (?1, ?2, ?3)",
483        )
484        .bind(span.trace_id.to_string())
485        .bind(span.tenant_id.0.to_string())
486        .bind(span.start_time.to_rfc3339())
487        .execute(&mut *tx)
488        .await
489        .map_err(|e| LLMTraceError::Storage(format!("Failed to ensure trace row: {e}")))?;
490
491        self.insert_span(&mut *tx, span).await?;
492
493        tx.commit()
494            .await
495            .map_err(|e| LLMTraceError::Storage(format!("Failed to commit transaction: {e}")))?;
496
497        Ok(())
498    }
499
500    async fn query_traces(&self, query: &TraceQuery) -> Result<Vec<TraceEvent>> {
501        let mut qb = self.build_trace_id_query(query);
502        let rows = qb
503            .build()
504            .fetch_all(&self.pool)
505            .await
506            .map_err(|e| LLMTraceError::Storage(format!("Failed to query trace IDs: {e}")))?;
507
508        let trace_ids: Vec<String> = rows.iter().map(|r| r.get("trace_id")).collect();
509        if trace_ids.is_empty() {
510            return Ok(Vec::new());
511        }
512
513        let mut meta_qb = QueryBuilder::<Sqlite>::new("SELECT * FROM traces WHERE tenant_id = ");
514        meta_qb.push_bind(query.tenant_id.0.to_string());
515        meta_qb.push(" AND trace_id IN (");
516        let mut sep = meta_qb.separated(", ");
517        for tid in &trace_ids {
518            sep.push_bind(tid.clone());
519        }
520        sep.push_unseparated(") ORDER BY created_at DESC");
521
522        let meta_rows =
523            meta_qb.build().fetch_all(&self.pool).await.map_err(|e| {
524                LLMTraceError::Storage(format!("Failed to load trace metadata: {e}"))
525            })?;
526
527        let spans_map = self
528            .load_spans_for_traces(&query.tenant_id, &trace_ids)
529            .await?;
530
531        let mut results = Vec::with_capacity(meta_rows.len());
532        for row in &meta_rows {
533            let tid_str: String = row.get("trace_id");
534            let trace_id = parse_uuid(&tid_str)?;
535            let tenant_id = TenantId(parse_uuid(&row.get::<String, _>("tenant_id"))?);
536            let created_at = parse_datetime(&row.get::<String, _>("created_at"))?;
537            let spans = spans_map.get(&tid_str).cloned().unwrap_or_default();
538
539            results.push(TraceEvent {
540                trace_id,
541                tenant_id,
542                spans,
543                created_at,
544            });
545        }
546
547        Ok(results)
548    }
549
550    async fn query_spans(&self, query: &TraceQuery) -> Result<Vec<TraceSpan>> {
551        let mut qb = self.build_span_query(query);
552        let rows = qb
553            .build()
554            .fetch_all(&self.pool)
555            .await
556            .map_err(|e| LLMTraceError::Storage(format!("Failed to query spans: {e}")))?;
557
558        rows.iter().map(span_from_row).collect()
559    }
560
561    async fn get_trace(&self, tenant_id: TenantId, trace_id: Uuid) -> Result<Option<TraceEvent>> {
562        let row = sqlx::query("SELECT * FROM traces WHERE tenant_id = ?1 AND trace_id = ?2")
563            .bind(tenant_id.0.to_string())
564            .bind(trace_id.to_string())
565            .fetch_optional(&self.pool)
566            .await
567            .map_err(|e| LLMTraceError::Storage(format!("Failed to get trace: {e}")))?;
568
569        let Some(row) = row else {
570            return Ok(None);
571        };
572
573        let created_at = parse_datetime(&row.get::<String, _>("created_at"))?;
574
575        let span_rows = sqlx::query(
576            "SELECT * FROM spans WHERE tenant_id = ?1 AND trace_id = ?2 ORDER BY start_time ASC",
577        )
578        .bind(tenant_id.0.to_string())
579        .bind(trace_id.to_string())
580        .fetch_all(&self.pool)
581        .await
582        .map_err(|e| LLMTraceError::Storage(format!("Failed to load trace spans: {e}")))?;
583
584        let spans: Vec<TraceSpan> = span_rows.iter().map(span_from_row).collect::<Result<_>>()?;
585
586        Ok(Some(TraceEvent {
587            trace_id,
588            tenant_id,
589            spans,
590            created_at,
591        }))
592    }
593
594    async fn get_span(&self, tenant_id: TenantId, span_id: Uuid) -> Result<Option<TraceSpan>> {
595        let row = sqlx::query("SELECT * FROM spans WHERE tenant_id = ?1 AND span_id = ?2")
596            .bind(tenant_id.0.to_string())
597            .bind(span_id.to_string())
598            .fetch_optional(&self.pool)
599            .await
600            .map_err(|e| LLMTraceError::Storage(format!("Failed to get span: {e}")))?;
601
602        match row {
603            Some(ref r) => Ok(Some(span_from_row(r)?)),
604            None => Ok(None),
605        }
606    }
607
608    async fn delete_traces_before(
609        &self,
610        tenant_id: TenantId,
611        before: DateTime<Utc>,
612    ) -> Result<u64> {
613        let mut tx = self
614            .pool
615            .begin()
616            .await
617            .map_err(|e| LLMTraceError::Storage(format!("Failed to begin transaction: {e}")))?;
618
619        let before_str = before.to_rfc3339();
620        let tid = tenant_id.0.to_string();
621
622        sqlx::query(
623            "DELETE FROM spans WHERE tenant_id = ?1 AND trace_id IN \
624             (SELECT trace_id FROM traces WHERE tenant_id = ?2 AND created_at < ?3)",
625        )
626        .bind(&tid)
627        .bind(&tid)
628        .bind(&before_str)
629        .execute(&mut *tx)
630        .await
631        .map_err(|e| LLMTraceError::Storage(format!("Failed to delete spans: {e}")))?;
632
633        let result = sqlx::query("DELETE FROM traces WHERE tenant_id = ?1 AND created_at < ?2")
634            .bind(&tid)
635            .bind(&before_str)
636            .execute(&mut *tx)
637            .await
638            .map_err(|e| LLMTraceError::Storage(format!("Failed to delete traces: {e}")))?;
639
640        tx.commit()
641            .await
642            .map_err(|e| LLMTraceError::Storage(format!("Failed to commit delete: {e}")))?;
643
644        Ok(result.rows_affected())
645    }
646
647    async fn get_stats(&self, tenant_id: TenantId) -> Result<StorageStats> {
648        let tid = tenant_id.0.to_string();
649
650        let trace_count: i64 =
651            sqlx::query("SELECT COUNT(*) as cnt FROM traces WHERE tenant_id = ?1")
652                .bind(&tid)
653                .fetch_one(&self.pool)
654                .await
655                .map_err(|e| LLMTraceError::Storage(format!("Failed to count traces: {e}")))?
656                .get("cnt");
657
658        let span_count: i64 = sqlx::query("SELECT COUNT(*) as cnt FROM spans WHERE tenant_id = ?1")
659            .bind(&tid)
660            .fetch_one(&self.pool)
661            .await
662            .map_err(|e| LLMTraceError::Storage(format!("Failed to count spans: {e}")))?
663            .get("cnt");
664
665        let size_row = sqlx::query(
666            "SELECT COALESCE(SUM(LENGTH(prompt) + COALESCE(LENGTH(response), 0)), 0) as sz \
667             FROM spans WHERE tenant_id = ?1",
668        )
669        .bind(&tid)
670        .fetch_one(&self.pool)
671        .await
672        .map_err(|e| LLMTraceError::Storage(format!("Failed to calculate size: {e}")))?;
673        let storage_size_bytes: i64 = size_row.get("sz");
674
675        let time_row = sqlx::query(
676            "SELECT MIN(created_at) as oldest, MAX(created_at) as newest \
677             FROM traces WHERE tenant_id = ?1",
678        )
679        .bind(&tid)
680        .fetch_one(&self.pool)
681        .await
682        .map_err(|e| LLMTraceError::Storage(format!("Failed to get time range: {e}")))?;
683
684        let oldest_trace = time_row
685            .get::<Option<String>, _>("oldest")
686            .map(|s| parse_datetime(&s))
687            .transpose()?;
688        let newest_trace = time_row
689            .get::<Option<String>, _>("newest")
690            .map(|s| parse_datetime(&s))
691            .transpose()?;
692
693        Ok(StorageStats {
694            total_traces: trace_count as u64,
695            total_spans: span_count as u64,
696            storage_size_bytes: storage_size_bytes as u64,
697            oldest_trace,
698            newest_trace,
699        })
700    }
701
702    async fn health_check(&self) -> Result<()> {
703        sqlx::query("SELECT 1")
704            .fetch_one(&self.pool)
705            .await
706            .map_err(|e| LLMTraceError::Storage(format!("Health check failed: {e}")))?;
707        Ok(())
708    }
709}
710
711// ===========================================================================
712// SqliteMetadataRepository
713// ===========================================================================
714
715/// SQLite-backed metadata repository for tenants, configurations, and audit events.
716pub struct SqliteMetadataRepository {
717    pool: SqlitePool,
718}
719
720impl SqliteMetadataRepository {
721    /// Open (or create) a SQLite database and run metadata schema migrations.
722    pub async fn new(database_url: &str) -> Result<Self> {
723        let pool = open_pool(database_url).await?;
724        run_migrations(&pool).await?;
725        Ok(Self { pool })
726    }
727
728    /// Create from an existing pool (used by [`StorageProfile`](crate::StorageProfile) factory).
729    pub(crate) async fn from_pool(pool: SqlitePool) -> Result<Self> {
730        run_migrations(&pool).await?;
731        Ok(Self { pool })
732    }
733}
734
735#[async_trait]
736impl MetadataRepository for SqliteMetadataRepository {
737    async fn create_tenant(&self, tenant: &Tenant) -> Result<()> {
738        let config_json = serde_json::to_string(&tenant.config)
739            .map_err(|e| LLMTraceError::Storage(format!("serialize tenant config: {e}")))?;
740
741        sqlx::query(
742            "INSERT INTO tenants (id, name, api_token, plan, created_at, config) VALUES (?1, ?2, ?3, ?4, ?5, ?6)",
743        )
744        .bind(tenant.id.0.to_string())
745        .bind(&tenant.name)
746        .bind(&tenant.api_token)
747        .bind(&tenant.plan)
748        .bind(tenant.created_at.to_rfc3339())
749        .bind(&config_json)
750        .execute(&self.pool)
751        .await
752        .map_err(|e| LLMTraceError::Storage(format!("Failed to create tenant: {e}")))?;
753
754        Ok(())
755    }
756
757    async fn get_tenant(&self, id: TenantId) -> Result<Option<Tenant>> {
758        let row = sqlx::query("SELECT * FROM tenants WHERE id = ?1")
759            .bind(id.0.to_string())
760            .fetch_optional(&self.pool)
761            .await
762            .map_err(|e| LLMTraceError::Storage(format!("Failed to get tenant: {e}")))?;
763
764        let Some(row) = row else {
765            return Ok(None);
766        };
767
768        let config_str: String = row.get("config");
769        let config: serde_json::Value = serde_json::from_str(&config_str)
770            .map_err(|e| LLMTraceError::Storage(format!("Invalid tenant config JSON: {e}")))?;
771
772        Ok(Some(Tenant {
773            id: TenantId(parse_uuid(&row.get::<String, _>("id"))?),
774            name: row.get("name"),
775            api_token: row
776                .get::<Option<String>, _>("api_token")
777                .unwrap_or_default(),
778            plan: row.get("plan"),
779            created_at: parse_datetime(&row.get::<String, _>("created_at"))?,
780            config,
781        }))
782    }
783
784    async fn get_tenant_by_token(&self, token: &str) -> Result<Option<Tenant>> {
785        let row = sqlx::query("SELECT * FROM tenants WHERE api_token = ?1")
786            .bind(token)
787            .fetch_optional(&self.pool)
788            .await
789            .map_err(|e| LLMTraceError::Storage(format!("Failed to get tenant by token: {e}")))?;
790
791        let Some(row) = row else {
792            return Ok(None);
793        };
794
795        let config_str: String = row.get("config");
796        let config: serde_json::Value = serde_json::from_str(&config_str)
797            .map_err(|e| LLMTraceError::Storage(format!("Invalid tenant config JSON: {e}")))?;
798
799        Ok(Some(Tenant {
800            id: TenantId(parse_uuid(&row.get::<String, _>("id"))?),
801            name: row.get("name"),
802            api_token: row
803                .get::<Option<String>, _>("api_token")
804                .unwrap_or_default(),
805            plan: row.get("plan"),
806            created_at: parse_datetime(&row.get::<String, _>("created_at"))?,
807            config,
808        }))
809    }
810
811    async fn update_tenant(&self, tenant: &Tenant) -> Result<()> {
812        let config_json = serde_json::to_string(&tenant.config)
813            .map_err(|e| LLMTraceError::Storage(format!("serialize tenant config: {e}")))?;
814
815        let result = sqlx::query(
816            "UPDATE tenants SET name = ?1, plan = ?2, config = ?3, api_token = ?4 WHERE id = ?5",
817        )
818        .bind(&tenant.name)
819        .bind(&tenant.plan)
820        .bind(&config_json)
821        .bind(&tenant.api_token)
822        .bind(tenant.id.0.to_string())
823        .execute(&self.pool)
824        .await
825        .map_err(|e| LLMTraceError::Storage(format!("Failed to update tenant: {e}")))?;
826
827        if result.rows_affected() == 0 {
828            return Err(LLMTraceError::InvalidTenant {
829                tenant_id: tenant.id,
830            });
831        }
832        Ok(())
833    }
834
835    async fn list_tenants(&self) -> Result<Vec<Tenant>> {
836        let rows = sqlx::query("SELECT * FROM tenants ORDER BY created_at DESC")
837            .fetch_all(&self.pool)
838            .await
839            .map_err(|e| LLMTraceError::Storage(format!("Failed to list tenants: {e}")))?;
840
841        rows.iter()
842            .map(|row| {
843                let config_str: String = row.get("config");
844                let config: serde_json::Value = serde_json::from_str(&config_str).map_err(|e| {
845                    LLMTraceError::Storage(format!("Invalid tenant config JSON: {e}"))
846                })?;
847                Ok(Tenant {
848                    id: TenantId(parse_uuid(&row.get::<String, _>("id"))?),
849                    name: row.get("name"),
850                    api_token: row
851                        .get::<Option<String>, _>("api_token")
852                        .unwrap_or_default(),
853                    plan: row.get("plan"),
854                    created_at: parse_datetime(&row.get::<String, _>("created_at"))?,
855                    config,
856                })
857            })
858            .collect()
859    }
860
861    async fn delete_tenant(&self, id: TenantId) -> Result<()> {
862        sqlx::query("DELETE FROM tenants WHERE id = ?1")
863            .bind(id.0.to_string())
864            .execute(&self.pool)
865            .await
866            .map_err(|e| LLMTraceError::Storage(format!("Failed to delete tenant: {e}")))?;
867        Ok(())
868    }
869
870    async fn get_tenant_config(&self, tenant_id: TenantId) -> Result<Option<TenantConfig>> {
871        let row = sqlx::query("SELECT * FROM tenant_configs WHERE tenant_id = ?1")
872            .bind(tenant_id.0.to_string())
873            .fetch_optional(&self.pool)
874            .await
875            .map_err(|e| LLMTraceError::Storage(format!("Failed to get tenant config: {e}")))?;
876
877        let Some(row) = row else {
878            return Ok(None);
879        };
880
881        let thresholds_str: String = row.get("security_thresholds");
882        let flags_str: String = row.get("feature_flags");
883        let monitoring_scope_str: String = row.get("monitoring_scope");
884        let rate_limit_rpm: Option<i32> = row.get("rate_limit_rpm");
885        let monthly_budget: Option<f64> = row.get("monthly_budget");
886
887        Ok(Some(TenantConfig {
888            tenant_id: TenantId(parse_uuid(&row.get::<String, _>("tenant_id"))?),
889            security_thresholds: serde_json::from_str(&thresholds_str).map_err(|e| {
890                LLMTraceError::Storage(format!("Invalid security_thresholds JSON: {e}"))
891            })?,
892            feature_flags: serde_json::from_str(&flags_str)
893                .map_err(|e| LLMTraceError::Storage(format!("Invalid feature_flags JSON: {e}")))?,
894            monitoring_scope: monitoring_scope_str.parse().map_err(|e| {
895                LLMTraceError::Storage(format!("Invalid monitoring_scope value: {e}"))
896            })?,
897            rate_limit_rpm: rate_limit_rpm.map(|v| v as u32),
898            monthly_budget,
899        }))
900    }
901
902    async fn upsert_tenant_config(&self, config: &TenantConfig) -> Result<()> {
903        let thresholds_json = serde_json::to_string(&config.security_thresholds)
904            .map_err(|e| LLMTraceError::Storage(format!("serialize thresholds: {e}")))?;
905        let flags_json = serde_json::to_string(&config.feature_flags)
906            .map_err(|e| LLMTraceError::Storage(format!("serialize feature_flags: {e}")))?;
907
908        sqlx::query(
909            "INSERT INTO tenant_configs (tenant_id, security_thresholds, feature_flags, monitoring_scope, rate_limit_rpm, monthly_budget)
910             VALUES (?1, ?2, ?3, ?4, ?5, ?6)
911             ON CONFLICT(tenant_id) DO UPDATE SET
912                security_thresholds = excluded.security_thresholds,
913                feature_flags = excluded.feature_flags,
914                monitoring_scope = excluded.monitoring_scope,
915                rate_limit_rpm = excluded.rate_limit_rpm,
916                monthly_budget = excluded.monthly_budget",
917        )
918        .bind(config.tenant_id.0.to_string())
919        .bind(&thresholds_json)
920        .bind(&flags_json)
921        .bind(config.monitoring_scope.to_string())
922        .bind(config.rate_limit_rpm.map(|v| v as i32))
923        .bind(config.monthly_budget)
924        .execute(&self.pool)
925        .await
926        .map_err(|e| LLMTraceError::Storage(format!("Failed to upsert tenant config: {e}")))?;
927
928        Ok(())
929    }
930
931    async fn record_audit_event(&self, event: &AuditEvent) -> Result<()> {
932        let data_json = serde_json::to_string(&event.data)
933            .map_err(|e| LLMTraceError::Storage(format!("serialize audit data: {e}")))?;
934
935        sqlx::query(
936            "INSERT INTO audit_events (id, tenant_id, event_type, actor, resource, data, timestamp)
937             VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)",
938        )
939        .bind(event.id.to_string())
940        .bind(event.tenant_id.0.to_string())
941        .bind(&event.event_type)
942        .bind(&event.actor)
943        .bind(&event.resource)
944        .bind(&data_json)
945        .bind(event.timestamp.to_rfc3339())
946        .execute(&self.pool)
947        .await
948        .map_err(|e| LLMTraceError::Storage(format!("Failed to record audit event: {e}")))?;
949
950        Ok(())
951    }
952
953    async fn query_audit_events(&self, query: &AuditQuery) -> Result<Vec<AuditEvent>> {
954        let mut qb = QueryBuilder::<Sqlite>::new("SELECT * FROM audit_events WHERE tenant_id = ");
955        qb.push_bind(query.tenant_id.0.to_string());
956
957        if let Some(ref event_type) = query.event_type {
958            qb.push(" AND event_type = ");
959            qb.push_bind(event_type.clone());
960        }
961        if let Some(ref start) = query.start_time {
962            qb.push(" AND timestamp >= ");
963            qb.push_bind(start.to_rfc3339());
964        }
965        if let Some(ref end) = query.end_time {
966            qb.push(" AND timestamp <= ");
967            qb.push_bind(end.to_rfc3339());
968        }
969
970        qb.push(" ORDER BY timestamp DESC");
971
972        if let Some(limit) = query.limit {
973            qb.push(" LIMIT ");
974            qb.push_bind(limit as i64);
975        }
976        if let Some(offset) = query.offset {
977            qb.push(" OFFSET ");
978            qb.push_bind(offset as i64);
979        }
980
981        let rows =
982            qb.build().fetch_all(&self.pool).await.map_err(|e| {
983                LLMTraceError::Storage(format!("Failed to query audit events: {e}"))
984            })?;
985
986        rows.iter()
987            .map(|row| {
988                let data_str: String = row.get("data");
989                let data: serde_json::Value = serde_json::from_str(&data_str).map_err(|e| {
990                    LLMTraceError::Storage(format!("Invalid audit event data JSON: {e}"))
991                })?;
992                Ok(AuditEvent {
993                    id: parse_uuid(&row.get::<String, _>("id"))?,
994                    tenant_id: TenantId(parse_uuid(&row.get::<String, _>("tenant_id"))?),
995                    event_type: row.get("event_type"),
996                    actor: row.get("actor"),
997                    resource: row.get("resource"),
998                    data,
999                    timestamp: parse_datetime(&row.get::<String, _>("timestamp"))?,
1000                })
1001            })
1002            .collect()
1003    }
1004
1005    async fn create_api_key(&self, key: &ApiKeyRecord) -> Result<()> {
1006        sqlx::query(
1007            "INSERT INTO api_keys (id, tenant_id, name, key_hash, key_prefix, role, created_at, revoked_at)
1008             VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)",
1009        )
1010        .bind(key.id.to_string())
1011        .bind(key.tenant_id.0.to_string())
1012        .bind(&key.name)
1013        .bind(&key.key_hash)
1014        .bind(&key.key_prefix)
1015        .bind(key.role.to_string())
1016        .bind(key.created_at.to_rfc3339())
1017        .bind(key.revoked_at.map(|t| t.to_rfc3339()))
1018        .execute(&self.pool)
1019        .await
1020        .map_err(|e| LLMTraceError::Storage(format!("Failed to create API key: {e}")))?;
1021        Ok(())
1022    }
1023
1024    async fn get_api_key_by_hash(&self, key_hash: &str) -> Result<Option<ApiKeyRecord>> {
1025        let row = sqlx::query("SELECT * FROM api_keys WHERE key_hash = ?1 AND revoked_at IS NULL")
1026            .bind(key_hash)
1027            .fetch_optional(&self.pool)
1028            .await
1029            .map_err(|e| LLMTraceError::Storage(format!("Failed to look up API key: {e}")))?;
1030
1031        let Some(row) = row else {
1032            return Ok(None);
1033        };
1034
1035        Ok(Some(ApiKeyRecord {
1036            id: parse_uuid(&row.get::<String, _>("id"))?,
1037            tenant_id: TenantId(parse_uuid(&row.get::<String, _>("tenant_id"))?),
1038            name: row.get("name"),
1039            key_hash: row.get("key_hash"),
1040            key_prefix: row.get("key_prefix"),
1041            role: row
1042                .get::<String, _>("role")
1043                .parse::<ApiKeyRole>()
1044                .map_err(|e| LLMTraceError::Storage(format!("Invalid API key role: {e}")))?,
1045            created_at: parse_datetime(&row.get::<String, _>("created_at"))?,
1046            revoked_at: {
1047                let s: Option<String> = row.get("revoked_at");
1048                s.map(|v| parse_datetime(&v)).transpose()?
1049            },
1050        }))
1051    }
1052
1053    async fn list_api_keys(&self, tenant_id: TenantId) -> Result<Vec<ApiKeyRecord>> {
1054        let rows =
1055            sqlx::query("SELECT * FROM api_keys WHERE tenant_id = ?1 ORDER BY created_at DESC")
1056                .bind(tenant_id.0.to_string())
1057                .fetch_all(&self.pool)
1058                .await
1059                .map_err(|e| LLMTraceError::Storage(format!("Failed to list API keys: {e}")))?;
1060
1061        rows.iter()
1062            .map(|row| {
1063                Ok(ApiKeyRecord {
1064                    id: parse_uuid(&row.get::<String, _>("id"))?,
1065                    tenant_id: TenantId(parse_uuid(&row.get::<String, _>("tenant_id"))?),
1066                    name: row.get("name"),
1067                    key_hash: row.get("key_hash"),
1068                    key_prefix: row.get("key_prefix"),
1069                    role: row
1070                        .get::<String, _>("role")
1071                        .parse::<ApiKeyRole>()
1072                        .map_err(|e| {
1073                            LLMTraceError::Storage(format!("Invalid API key role: {e}"))
1074                        })?,
1075                    created_at: parse_datetime(&row.get::<String, _>("created_at"))?,
1076                    revoked_at: {
1077                        let s: Option<String> = row.get("revoked_at");
1078                        s.map(|v| parse_datetime(&v)).transpose()?
1079                    },
1080                })
1081            })
1082            .collect()
1083    }
1084
1085    async fn revoke_api_key(&self, key_id: Uuid) -> Result<bool> {
1086        let result =
1087            sqlx::query("UPDATE api_keys SET revoked_at = ?1 WHERE id = ?2 AND revoked_at IS NULL")
1088                .bind(Utc::now().to_rfc3339())
1089                .bind(key_id.to_string())
1090                .execute(&self.pool)
1091                .await
1092                .map_err(|e| LLMTraceError::Storage(format!("Failed to revoke API key: {e}")))?;
1093
1094        Ok(result.rows_affected() > 0)
1095    }
1096
1097    async fn store_report(&self, report: &llmtrace_core::ComplianceReportRecord) -> Result<()> {
1098        let content_json = report
1099            .content
1100            .as_ref()
1101            .map(serde_json::to_string)
1102            .transpose()
1103            .map_err(|e| LLMTraceError::Storage(format!("serialize report content: {e}")))?;
1104
1105        sqlx::query(
1106            "INSERT OR REPLACE INTO compliance_reports
1107             (id, tenant_id, report_type, status, period_start, period_end, created_at, completed_at, content, error)
1108             VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10)",
1109        )
1110        .bind(report.id.to_string())
1111        .bind(report.tenant_id.0.to_string())
1112        .bind(&report.report_type)
1113        .bind(&report.status)
1114        .bind(report.period_start.to_rfc3339())
1115        .bind(report.period_end.to_rfc3339())
1116        .bind(report.created_at.to_rfc3339())
1117        .bind(report.completed_at.map(|t| t.to_rfc3339()))
1118        .bind(content_json.as_deref())
1119        .bind(report.error.as_deref())
1120        .execute(&self.pool)
1121        .await
1122        .map_err(|e| LLMTraceError::Storage(format!("Failed to store compliance report: {e}")))?;
1123
1124        Ok(())
1125    }
1126
1127    async fn get_report(
1128        &self,
1129        report_id: Uuid,
1130    ) -> Result<Option<llmtrace_core::ComplianceReportRecord>> {
1131        let row = sqlx::query("SELECT * FROM compliance_reports WHERE id = ?1")
1132            .bind(report_id.to_string())
1133            .fetch_optional(&self.pool)
1134            .await
1135            .map_err(|e| LLMTraceError::Storage(format!("Failed to get report: {e}")))?;
1136
1137        let Some(row) = row else {
1138            return Ok(None);
1139        };
1140
1141        Ok(Some(report_from_row(&row)?))
1142    }
1143
1144    async fn list_reports(
1145        &self,
1146        query: &llmtrace_core::ReportQuery,
1147    ) -> Result<Vec<llmtrace_core::ComplianceReportRecord>> {
1148        let mut qb =
1149            QueryBuilder::<Sqlite>::new("SELECT * FROM compliance_reports WHERE tenant_id = ");
1150        qb.push_bind(query.tenant_id.0.to_string());
1151        qb.push(" ORDER BY created_at DESC");
1152
1153        if let Some(limit) = query.limit {
1154            qb.push(" LIMIT ");
1155            qb.push_bind(limit as i64);
1156        }
1157        if let Some(offset) = query.offset {
1158            qb.push(" OFFSET ");
1159            qb.push_bind(offset as i64);
1160        }
1161
1162        let rows = qb.build().fetch_all(&self.pool).await.map_err(|e| {
1163            LLMTraceError::Storage(format!("Failed to list compliance reports: {e}"))
1164        })?;
1165
1166        rows.iter().map(report_from_row).collect()
1167    }
1168
1169    async fn health_check(&self) -> Result<()> {
1170        sqlx::query("SELECT 1")
1171            .fetch_one(&self.pool)
1172            .await
1173            .map_err(|e| LLMTraceError::Storage(format!("Metadata health check failed: {e}")))?;
1174        Ok(())
1175    }
1176}
1177
1178// ---------------------------------------------------------------------------
1179// Tests
1180// ---------------------------------------------------------------------------
1181
1182#[cfg(test)]
1183mod tests {
1184    use super::*;
1185    use llmtrace_core::{SecurityFinding, SecuritySeverity, SpanEvent};
1186
1187    /// Create a fresh in-memory [`SqliteTraceRepository`] for testing.
1188    async fn test_storage() -> SqliteTraceRepository {
1189        SqliteTraceRepository::new("sqlite::memory:").await.unwrap()
1190    }
1191
1192    /// Build a minimal [`TraceSpan`] for testing.
1193    fn make_span(
1194        trace_id: Uuid,
1195        tenant_id: TenantId,
1196        provider: LLMProvider,
1197        model: &str,
1198    ) -> TraceSpan {
1199        TraceSpan::new(
1200            trace_id,
1201            tenant_id,
1202            "chat_completion".to_string(),
1203            provider,
1204            model.to_string(),
1205            "Hello, world!".to_string(),
1206        )
1207    }
1208
1209    /// Build a minimal [`TraceEvent`] with one span.
1210    fn make_trace(tenant_id: TenantId) -> TraceEvent {
1211        let trace_id = Uuid::new_v4();
1212        let span = make_span(trace_id, tenant_id, LLMProvider::OpenAI, "gpt-4");
1213        TraceEvent {
1214            trace_id,
1215            tenant_id,
1216            spans: vec![span],
1217            created_at: Utc::now(),
1218        }
1219    }
1220
1221    // -- basic CRUD --------------------------------------------------------
1222
1223    #[tokio::test]
1224    async fn test_store_and_retrieve_trace() {
1225        let storage = test_storage().await;
1226        let tenant = TenantId::new();
1227        let trace = make_trace(tenant);
1228
1229        storage.store_trace(&trace).await.unwrap();
1230
1231        let retrieved = storage
1232            .get_trace(tenant, trace.trace_id)
1233            .await
1234            .unwrap()
1235            .expect("trace should exist");
1236
1237        assert_eq!(retrieved.trace_id, trace.trace_id);
1238        assert_eq!(retrieved.tenant_id, tenant);
1239        assert_eq!(retrieved.spans.len(), 1);
1240        assert_eq!(retrieved.spans[0].model_name, "gpt-4");
1241        assert_eq!(retrieved.spans[0].prompt, "Hello, world!");
1242    }
1243
1244    #[tokio::test]
1245    async fn test_store_span_individually() {
1246        let storage = test_storage().await;
1247        let tenant = TenantId::new();
1248        let trace_id = Uuid::new_v4();
1249        let span = make_span(trace_id, tenant, LLMProvider::Anthropic, "claude-3");
1250
1251        storage.store_span(&span).await.unwrap();
1252
1253        let retrieved = storage
1254            .get_span(tenant, span.span_id)
1255            .await
1256            .unwrap()
1257            .expect("span should exist");
1258
1259        assert_eq!(retrieved.span_id, span.span_id);
1260        assert_eq!(retrieved.provider, LLMProvider::Anthropic);
1261        assert_eq!(retrieved.model_name, "claude-3");
1262
1263        let trace = storage
1264            .get_trace(tenant, trace_id)
1265            .await
1266            .unwrap()
1267            .expect("parent trace row should exist");
1268        assert_eq!(trace.spans.len(), 1);
1269    }
1270
1271    #[tokio::test]
1272    async fn test_get_nonexistent_trace() {
1273        let storage = test_storage().await;
1274        let tenant = TenantId::new();
1275        let result = storage.get_trace(tenant, Uuid::new_v4()).await.unwrap();
1276        assert!(result.is_none());
1277    }
1278
1279    #[tokio::test]
1280    async fn test_get_nonexistent_span() {
1281        let storage = test_storage().await;
1282        let tenant = TenantId::new();
1283        let result = storage.get_span(tenant, Uuid::new_v4()).await.unwrap();
1284        assert!(result.is_none());
1285    }
1286
1287    // -- query filters -----------------------------------------------------
1288
1289    #[tokio::test]
1290    async fn test_query_by_tenant_isolation() {
1291        let storage = test_storage().await;
1292        let tenant_a = TenantId::new();
1293        let tenant_b = TenantId::new();
1294
1295        storage.store_trace(&make_trace(tenant_a)).await.unwrap();
1296        storage.store_trace(&make_trace(tenant_b)).await.unwrap();
1297
1298        let results_a = storage
1299            .query_traces(&TraceQuery::new(tenant_a))
1300            .await
1301            .unwrap();
1302        assert_eq!(results_a.len(), 1);
1303        assert_eq!(results_a[0].tenant_id, tenant_a);
1304
1305        let results_b = storage
1306            .query_traces(&TraceQuery::new(tenant_b))
1307            .await
1308            .unwrap();
1309        assert_eq!(results_b.len(), 1);
1310        assert_eq!(results_b[0].tenant_id, tenant_b);
1311    }
1312
1313    #[tokio::test]
1314    async fn test_query_by_time_range() {
1315        let storage = test_storage().await;
1316        let tenant = TenantId::new();
1317
1318        let old_time = Utc::now() - chrono::Duration::hours(2);
1319        let recent_time = Utc::now();
1320
1321        let trace_id_old = Uuid::new_v4();
1322        let mut span_old = make_span(trace_id_old, tenant, LLMProvider::OpenAI, "gpt-4");
1323        span_old.start_time = old_time;
1324        let old_trace = TraceEvent {
1325            trace_id: trace_id_old,
1326            tenant_id: tenant,
1327            spans: vec![span_old],
1328            created_at: old_time,
1329        };
1330
1331        let trace_id_new = Uuid::new_v4();
1332        let mut span_new = make_span(trace_id_new, tenant, LLMProvider::OpenAI, "gpt-4");
1333        span_new.start_time = recent_time;
1334        let new_trace = TraceEvent {
1335            trace_id: trace_id_new,
1336            tenant_id: tenant,
1337            spans: vec![span_new],
1338            created_at: recent_time,
1339        };
1340
1341        storage.store_trace(&old_trace).await.unwrap();
1342        storage.store_trace(&new_trace).await.unwrap();
1343
1344        let one_hour_ago = Utc::now() - chrono::Duration::hours(1);
1345        let query = TraceQuery::new(tenant).with_time_range(one_hour_ago, Utc::now());
1346        let results = storage.query_traces(&query).await.unwrap();
1347        assert_eq!(results.len(), 1);
1348        assert_eq!(results[0].trace_id, trace_id_new);
1349    }
1350
1351    #[tokio::test]
1352    async fn test_query_by_provider() {
1353        let storage = test_storage().await;
1354        let tenant = TenantId::new();
1355
1356        let tid1 = Uuid::new_v4();
1357        let t1 = TraceEvent {
1358            trace_id: tid1,
1359            tenant_id: tenant,
1360            spans: vec![make_span(tid1, tenant, LLMProvider::OpenAI, "gpt-4")],
1361            created_at: Utc::now(),
1362        };
1363        let tid2 = Uuid::new_v4();
1364        let t2 = TraceEvent {
1365            trace_id: tid2,
1366            tenant_id: tenant,
1367            spans: vec![make_span(tid2, tenant, LLMProvider::Anthropic, "claude-3")],
1368            created_at: Utc::now(),
1369        };
1370
1371        storage.store_trace(&t1).await.unwrap();
1372        storage.store_trace(&t2).await.unwrap();
1373
1374        let query = TraceQuery::new(tenant).with_provider(LLMProvider::Anthropic);
1375        let results = storage.query_traces(&query).await.unwrap();
1376        assert_eq!(results.len(), 1);
1377        assert_eq!(results[0].trace_id, tid2);
1378    }
1379
1380    #[tokio::test]
1381    async fn test_query_by_security_score() {
1382        let storage = test_storage().await;
1383        let tenant = TenantId::new();
1384
1385        let tid = Uuid::new_v4();
1386        let mut span = make_span(tid, tenant, LLMProvider::OpenAI, "gpt-4");
1387        span.add_security_finding(SecurityFinding::new(
1388            SecuritySeverity::Critical,
1389            "prompt_injection".to_string(),
1390            "test finding".to_string(),
1391            0.99,
1392        ));
1393
1394        let trace = TraceEvent {
1395            trace_id: tid,
1396            tenant_id: tenant,
1397            spans: vec![span],
1398            created_at: Utc::now(),
1399        };
1400        storage.store_trace(&trace).await.unwrap();
1401        storage.store_trace(&make_trace(tenant)).await.unwrap();
1402
1403        let query = TraceQuery::new(tenant).with_security_score_range(80, 100);
1404        let results = storage.query_traces(&query).await.unwrap();
1405        assert_eq!(results.len(), 1);
1406        assert_eq!(results[0].trace_id, tid);
1407    }
1408
1409    #[tokio::test]
1410    async fn test_query_with_limit_and_offset() {
1411        let storage = test_storage().await;
1412        let tenant = TenantId::new();
1413
1414        for _ in 0..5 {
1415            storage.store_trace(&make_trace(tenant)).await.unwrap();
1416        }
1417
1418        let query = TraceQuery::new(tenant).with_limit(2);
1419        let page1 = storage.query_traces(&query).await.unwrap();
1420        assert_eq!(page1.len(), 2);
1421
1422        let mut query2 = TraceQuery::new(tenant).with_limit(2);
1423        query2.offset = Some(2);
1424        let page2 = storage.query_traces(&query2).await.unwrap();
1425        assert_eq!(page2.len(), 2);
1426
1427        let ids1: Vec<Uuid> = page1.iter().map(|t| t.trace_id).collect();
1428        let ids2: Vec<Uuid> = page2.iter().map(|t| t.trace_id).collect();
1429        for id in &ids2 {
1430            assert!(!ids1.contains(id));
1431        }
1432    }
1433
1434    #[tokio::test]
1435    async fn test_query_nonexistent_tenant() {
1436        let storage = test_storage().await;
1437        let tenant = TenantId::new();
1438        storage.store_trace(&make_trace(tenant)).await.unwrap();
1439
1440        let other_tenant = TenantId::new();
1441        let results = storage
1442            .query_traces(&TraceQuery::new(other_tenant))
1443            .await
1444            .unwrap();
1445        assert!(results.is_empty());
1446    }
1447
1448    #[tokio::test]
1449    async fn test_query_spans_directly() {
1450        let storage = test_storage().await;
1451        let tenant = TenantId::new();
1452        let trace = make_trace(tenant);
1453        storage.store_trace(&trace).await.unwrap();
1454
1455        let spans = storage.query_spans(&TraceQuery::new(tenant)).await.unwrap();
1456        assert_eq!(spans.len(), 1);
1457        assert_eq!(spans[0].trace_id, trace.trace_id);
1458    }
1459
1460    #[tokio::test]
1461    async fn test_query_spans_by_provider() {
1462        let storage = test_storage().await;
1463        let tenant = TenantId::new();
1464
1465        let tid1 = Uuid::new_v4();
1466        let tid2 = Uuid::new_v4();
1467        let s1 = make_span(tid1, tenant, LLMProvider::OpenAI, "gpt-4");
1468        let s2 = make_span(tid2, tenant, LLMProvider::Anthropic, "claude-3");
1469        storage.store_span(&s1).await.unwrap();
1470        storage.store_span(&s2).await.unwrap();
1471
1472        let query = TraceQuery::new(tenant).with_provider(LLMProvider::OpenAI);
1473        let spans = storage.query_spans(&query).await.unwrap();
1474        assert_eq!(spans.len(), 1);
1475        assert_eq!(spans[0].provider, LLMProvider::OpenAI);
1476    }
1477
1478    #[tokio::test]
1479    async fn test_delete_traces_before() {
1480        let storage = test_storage().await;
1481        let tenant = TenantId::new();
1482
1483        let old_time = Utc::now() - chrono::Duration::hours(2);
1484        let recent_time = Utc::now();
1485
1486        let tid_old = Uuid::new_v4();
1487        let old_trace = TraceEvent {
1488            trace_id: tid_old,
1489            tenant_id: tenant,
1490            spans: vec![make_span(tid_old, tenant, LLMProvider::OpenAI, "gpt-4")],
1491            created_at: old_time,
1492        };
1493        let tid_new = Uuid::new_v4();
1494        let new_trace = TraceEvent {
1495            trace_id: tid_new,
1496            tenant_id: tenant,
1497            spans: vec![make_span(tid_new, tenant, LLMProvider::OpenAI, "gpt-4")],
1498            created_at: recent_time,
1499        };
1500
1501        storage.store_trace(&old_trace).await.unwrap();
1502        storage.store_trace(&new_trace).await.unwrap();
1503
1504        let cutoff = Utc::now() - chrono::Duration::hours(1);
1505        let deleted = storage.delete_traces_before(tenant, cutoff).await.unwrap();
1506        assert_eq!(deleted, 1);
1507
1508        let remaining = storage
1509            .query_traces(&TraceQuery::new(tenant))
1510            .await
1511            .unwrap();
1512        assert_eq!(remaining.len(), 1);
1513        assert_eq!(remaining[0].trace_id, tid_new);
1514
1515        let old_span = storage.get_trace(tenant, tid_old).await.unwrap();
1516        assert!(old_span.is_none());
1517    }
1518
1519    #[tokio::test]
1520    async fn test_get_stats() {
1521        let storage = test_storage().await;
1522        let tenant = TenantId::new();
1523
1524        storage.store_trace(&make_trace(tenant)).await.unwrap();
1525        storage.store_trace(&make_trace(tenant)).await.unwrap();
1526
1527        let stats = storage.get_stats(tenant).await.unwrap();
1528        assert_eq!(stats.total_traces, 2);
1529        assert_eq!(stats.total_spans, 2);
1530        assert!(stats.storage_size_bytes > 0);
1531        assert!(stats.oldest_trace.is_some());
1532        assert!(stats.newest_trace.is_some());
1533    }
1534
1535    #[tokio::test]
1536    async fn test_get_stats_empty_tenant() {
1537        let storage = test_storage().await;
1538        let tenant = TenantId::new();
1539        let stats = storage.get_stats(tenant).await.unwrap();
1540        assert_eq!(stats.total_traces, 0);
1541        assert_eq!(stats.total_spans, 0);
1542        assert_eq!(stats.storage_size_bytes, 0);
1543        assert!(stats.oldest_trace.is_none());
1544    }
1545
1546    #[tokio::test]
1547    async fn test_health_check() {
1548        let storage = test_storage().await;
1549        assert!(storage.health_check().await.is_ok());
1550    }
1551
1552    #[tokio::test]
1553    async fn test_duplicate_store_is_idempotent() {
1554        let storage = test_storage().await;
1555        let tenant = TenantId::new();
1556        let trace = make_trace(tenant);
1557
1558        storage.store_trace(&trace).await.unwrap();
1559        storage.store_trace(&trace).await.unwrap();
1560
1561        let results = storage
1562            .query_traces(&TraceQuery::new(tenant))
1563            .await
1564            .unwrap();
1565        assert_eq!(results.len(), 1);
1566    }
1567
1568    #[tokio::test]
1569    async fn test_store_trace_with_no_spans() {
1570        let storage = test_storage().await;
1571        let tenant = TenantId::new();
1572        let trace = TraceEvent {
1573            trace_id: Uuid::new_v4(),
1574            tenant_id: tenant,
1575            spans: vec![],
1576            created_at: Utc::now(),
1577        };
1578
1579        storage.store_trace(&trace).await.unwrap();
1580
1581        let retrieved = storage
1582            .get_trace(tenant, trace.trace_id)
1583            .await
1584            .unwrap()
1585            .expect("trace row should exist");
1586        assert!(retrieved.spans.is_empty());
1587    }
1588
1589    #[tokio::test]
1590    async fn test_roundtrip_complex_span_fields() {
1591        let storage = test_storage().await;
1592        let tenant = TenantId::new();
1593        let trace_id = Uuid::new_v4();
1594
1595        let mut span = make_span(
1596            trace_id,
1597            tenant,
1598            LLMProvider::Custom("my-llm".to_string()),
1599            "custom-v1",
1600        );
1601        span.response = Some("I am an AI assistant.".to_string());
1602        span.prompt_tokens = Some(10);
1603        span.completion_tokens = Some(20);
1604        span.total_tokens = Some(30);
1605        span.time_to_first_token_ms = Some(42);
1606        span.duration_ms = Some(123);
1607        span.status_code = Some(200);
1608        span.estimated_cost_usd = Some(0.0015);
1609        span.tags.insert("env".to_string(), "test".to_string());
1610        span.add_event(
1611            SpanEvent::new("token_received".to_string(), "first chunk".to_string())
1612                .with_data("index".to_string(), "0".to_string()),
1613        );
1614        span.add_security_finding(SecurityFinding::new(
1615            SecuritySeverity::Medium,
1616            "pii_detected".to_string(),
1617            "email found".to_string(),
1618            0.85,
1619        ));
1620
1621        let trace = TraceEvent {
1622            trace_id,
1623            tenant_id: tenant,
1624            spans: vec![span.clone()],
1625            created_at: Utc::now(),
1626        };
1627
1628        storage.store_trace(&trace).await.unwrap();
1629
1630        let retrieved = storage
1631            .get_span(tenant, span.span_id)
1632            .await
1633            .unwrap()
1634            .expect("span should exist");
1635
1636        assert_eq!(
1637            retrieved.provider,
1638            LLMProvider::Custom("my-llm".to_string())
1639        );
1640        assert_eq!(retrieved.response.as_deref(), Some("I am an AI assistant."));
1641        assert_eq!(retrieved.prompt_tokens, Some(10));
1642        assert_eq!(retrieved.completion_tokens, Some(20));
1643        assert_eq!(retrieved.total_tokens, Some(30));
1644        assert_eq!(retrieved.time_to_first_token_ms, Some(42));
1645        assert_eq!(retrieved.duration_ms, Some(123));
1646        assert_eq!(retrieved.status_code, Some(200));
1647        assert!((retrieved.estimated_cost_usd.unwrap() - 0.0015).abs() < f64::EPSILON);
1648        assert_eq!(retrieved.security_score, Some(60));
1649        assert_eq!(retrieved.security_findings.len(), 1);
1650        assert_eq!(retrieved.security_findings[0].finding_type, "pii_detected");
1651        assert_eq!(retrieved.tags.get("env"), Some(&"test".to_string()));
1652        assert_eq!(retrieved.events.len(), 1);
1653        assert_eq!(retrieved.events[0].event_type, "token_received");
1654    }
1655
1656    #[tokio::test]
1657    async fn test_trace_with_multiple_spans() {
1658        let storage = test_storage().await;
1659        let tenant = TenantId::new();
1660        let trace_id = Uuid::new_v4();
1661
1662        let span1 = make_span(trace_id, tenant, LLMProvider::OpenAI, "gpt-4");
1663        let mut span2 = make_span(trace_id, tenant, LLMProvider::OpenAI, "gpt-4");
1664        span2.parent_span_id = Some(span1.span_id);
1665        span2.operation_name = "embedding".to_string();
1666
1667        let trace = TraceEvent {
1668            trace_id,
1669            tenant_id: tenant,
1670            spans: vec![span1.clone(), span2.clone()],
1671            created_at: Utc::now(),
1672        };
1673
1674        storage.store_trace(&trace).await.unwrap();
1675
1676        let retrieved = storage
1677            .get_trace(tenant, trace_id)
1678            .await
1679            .unwrap()
1680            .expect("trace should exist");
1681        assert_eq!(retrieved.spans.len(), 2);
1682
1683        let child = retrieved
1684            .spans
1685            .iter()
1686            .find(|s| s.parent_span_id.is_some())
1687            .expect("child span should be present");
1688        assert_eq!(child.parent_span_id, Some(span1.span_id));
1689    }
1690
1691    // -- Metadata repository tests -----------------------------------------
1692
1693    async fn test_metadata() -> SqliteMetadataRepository {
1694        SqliteMetadataRepository::new("sqlite::memory:")
1695            .await
1696            .unwrap()
1697    }
1698
1699    #[tokio::test]
1700    async fn test_tenant_crud() {
1701        let repo = test_metadata().await;
1702        let tenant = Tenant {
1703            id: TenantId::new(),
1704            name: "Acme Corp".to_string(),
1705            api_token: "test-token".to_string(),
1706            plan: "pro".to_string(),
1707            created_at: Utc::now(),
1708            config: serde_json::json!({"max_traces": 10000}),
1709        };
1710
1711        repo.create_tenant(&tenant).await.unwrap();
1712
1713        let retrieved = repo
1714            .get_tenant(tenant.id)
1715            .await
1716            .unwrap()
1717            .expect("tenant should exist");
1718        assert_eq!(retrieved.name, "Acme Corp");
1719        assert_eq!(retrieved.plan, "pro");
1720
1721        let mut updated = tenant.clone();
1722        updated.name = "Acme Inc".to_string();
1723        updated.plan = "enterprise".to_string();
1724        repo.update_tenant(&updated).await.unwrap();
1725
1726        let after_update = repo
1727            .get_tenant(tenant.id)
1728            .await
1729            .unwrap()
1730            .expect("tenant should exist");
1731        assert_eq!(after_update.name, "Acme Inc");
1732        assert_eq!(after_update.plan, "enterprise");
1733
1734        let tenants = repo.list_tenants().await.unwrap();
1735        assert_eq!(tenants.len(), 1);
1736
1737        repo.delete_tenant(tenant.id).await.unwrap();
1738        let gone = repo.get_tenant(tenant.id).await.unwrap();
1739        assert!(gone.is_none());
1740    }
1741
1742    #[tokio::test]
1743    async fn test_update_nonexistent_tenant_returns_error() {
1744        let repo = test_metadata().await;
1745        let tenant = Tenant {
1746            id: TenantId::new(),
1747            name: "Ghost".to_string(),
1748            api_token: "ghost-token".to_string(),
1749            plan: "free".to_string(),
1750            created_at: Utc::now(),
1751            config: serde_json::json!({}),
1752        };
1753        let result = repo.update_tenant(&tenant).await;
1754        assert!(result.is_err());
1755    }
1756
1757    #[tokio::test]
1758    async fn test_tenant_config_crud() {
1759        let repo = test_metadata().await;
1760        let tenant_id = TenantId::new();
1761
1762        // Create tenant first
1763        let tenant = Tenant {
1764            id: tenant_id,
1765            name: "Test".to_string(),
1766            api_token: "test-token".to_string(),
1767            plan: "free".to_string(),
1768            created_at: Utc::now(),
1769            config: serde_json::json!({}),
1770        };
1771        repo.create_tenant(&tenant).await.unwrap();
1772
1773        // No config yet
1774        let cfg = repo.get_tenant_config(tenant_id).await.unwrap();
1775        assert!(cfg.is_none());
1776
1777        // Upsert config
1778        let mut thresholds = HashMap::new();
1779        thresholds.insert("alert_min_score".to_string(), 80.0);
1780        let mut flags = HashMap::new();
1781        flags.insert("enable_pii".to_string(), true);
1782
1783        let config = TenantConfig {
1784            tenant_id,
1785            security_thresholds: thresholds,
1786            feature_flags: flags,
1787            monitoring_scope: llmtrace_core::MonitoringScope::Hybrid,
1788            rate_limit_rpm: None,
1789            monthly_budget: None,
1790        };
1791        repo.upsert_tenant_config(&config).await.unwrap();
1792
1793        let retrieved = repo
1794            .get_tenant_config(tenant_id)
1795            .await
1796            .unwrap()
1797            .expect("config should exist");
1798        assert_eq!(
1799            retrieved.security_thresholds.get("alert_min_score"),
1800            Some(&80.0)
1801        );
1802        assert_eq!(retrieved.feature_flags.get("enable_pii"), Some(&true));
1803
1804        // Update config
1805        let mut updated = config.clone();
1806        updated
1807            .feature_flags
1808            .insert("enable_pii".to_string(), false);
1809        repo.upsert_tenant_config(&updated).await.unwrap();
1810
1811        let after = repo
1812            .get_tenant_config(tenant_id)
1813            .await
1814            .unwrap()
1815            .expect("config should exist");
1816        assert_eq!(after.feature_flags.get("enable_pii"), Some(&false));
1817    }
1818
1819    #[tokio::test]
1820    async fn test_audit_events() {
1821        let repo = test_metadata().await;
1822        let tenant_id = TenantId::new();
1823
1824        let event1 = AuditEvent {
1825            id: Uuid::new_v4(),
1826            tenant_id,
1827            event_type: "config_changed".to_string(),
1828            actor: "admin".to_string(),
1829            resource: "tenant_config".to_string(),
1830            data: serde_json::json!({"field": "enable_pii", "old": false, "new": true}),
1831            timestamp: Utc::now() - chrono::Duration::minutes(5),
1832        };
1833        let event2 = AuditEvent {
1834            id: Uuid::new_v4(),
1835            tenant_id,
1836            event_type: "tenant_created".to_string(),
1837            actor: "system".to_string(),
1838            resource: "tenant".to_string(),
1839            data: serde_json::json!({}),
1840            timestamp: Utc::now(),
1841        };
1842
1843        repo.record_audit_event(&event1).await.unwrap();
1844        repo.record_audit_event(&event2).await.unwrap();
1845
1846        // Query all
1847        let all = repo
1848            .query_audit_events(&AuditQuery::new(tenant_id))
1849            .await
1850            .unwrap();
1851        assert_eq!(all.len(), 2);
1852
1853        // Query by type
1854        let config_events = repo
1855            .query_audit_events(
1856                &AuditQuery::new(tenant_id).with_event_type("config_changed".to_string()),
1857            )
1858            .await
1859            .unwrap();
1860        assert_eq!(config_events.len(), 1);
1861        assert_eq!(config_events[0].event_type, "config_changed");
1862        assert_eq!(config_events[0].actor, "admin");
1863
1864        // Query with limit
1865        let limited = repo
1866            .query_audit_events(&AuditQuery::new(tenant_id).with_limit(1))
1867            .await
1868            .unwrap();
1869        assert_eq!(limited.len(), 1);
1870    }
1871
1872    #[tokio::test]
1873    async fn test_metadata_health_check() {
1874        let repo = test_metadata().await;
1875        assert!(repo.health_check().await.is_ok());
1876    }
1877}