1use async_trait::async_trait;
8use chrono::{DateTime, Utc};
9use llmtrace_core::{
10 AgentAction, ApiKeyRecord, ApiKeyRole, AuditEvent, AuditQuery, LLMProvider, LLMTraceError,
11 MetadataRepository, Result, SecurityFinding, SpanEvent, StorageStats, Tenant, TenantConfig,
12 TenantId, TraceEvent, TraceQuery, TraceRepository, TraceSpan,
13};
14use sqlx::sqlite::{SqliteConnectOptions, SqliteJournalMode};
15use sqlx::{QueryBuilder, Row, Sqlite, SqlitePool};
16use std::collections::HashMap;
17use std::str::FromStr;
18use uuid::Uuid;
19
20pub(crate) async fn open_pool(database_url: &str) -> Result<SqlitePool> {
30 let connect_opts = SqliteConnectOptions::from_str(database_url)
31 .map_err(|e| LLMTraceError::Storage(format!("Invalid database URL: {e}")))?
32 .create_if_missing(true)
33 .journal_mode(SqliteJournalMode::Wal);
34
35 let max_conns: u32 = if database_url.contains(":memory:") {
38 1
39 } else {
40 10
41 };
42
43 sqlx::pool::PoolOptions::<Sqlite>::new()
44 .max_connections(max_conns)
45 .connect_with(connect_opts)
46 .await
47 .map_err(|e| LLMTraceError::Storage(format!("Failed to connect to SQLite: {e}")))
48}
49
50pub(crate) async fn run_migrations(pool: &SqlitePool) -> Result<()> {
56 crate::migration::run_sqlite_migrations(pool).await
57}
58
59fn serialize_provider(provider: &LLMProvider) -> String {
65 serde_json::to_string(provider).expect("LLMProvider serialization cannot fail")
66}
67
68fn deserialize_provider(s: &str) -> Result<LLMProvider> {
70 serde_json::from_str(s)
71 .map_err(|e| LLMTraceError::Storage(format!("Invalid provider value '{s}': {e}")))
72}
73
74fn parse_uuid(s: &str) -> Result<Uuid> {
76 Uuid::parse_str(s).map_err(|e| LLMTraceError::Storage(format!("Invalid UUID '{s}': {e}")))
77}
78
79fn parse_datetime(s: &str) -> Result<DateTime<Utc>> {
81 DateTime::parse_from_rfc3339(s)
82 .map(|dt| dt.with_timezone(&Utc))
83 .map_err(|e| LLMTraceError::Storage(format!("Invalid datetime '{s}': {e}")))
84}
85
86fn span_from_row(row: &sqlx::sqlite::SqliteRow) -> Result<TraceSpan> {
92 let trace_id = parse_uuid(&row.get::<String, _>("trace_id"))?;
93 let span_id = parse_uuid(&row.get::<String, _>("span_id"))?;
94 let parent_span_id = row
95 .get::<Option<String>, _>("parent_span_id")
96 .map(|s| parse_uuid(&s))
97 .transpose()?;
98 let tenant_id = TenantId(parse_uuid(&row.get::<String, _>("tenant_id"))?);
99 let operation_name: String = row.get("operation_name");
100 let start_time = parse_datetime(&row.get::<String, _>("start_time"))?;
101 let end_time = row
102 .get::<Option<String>, _>("end_time")
103 .map(|s| parse_datetime(&s))
104 .transpose()?;
105 let provider = deserialize_provider(&row.get::<String, _>("provider"))?;
106 let model_name: String = row.get("model_name");
107 let prompt: String = row.get("prompt");
108 let response: Option<String> = row.get("response");
109 let prompt_tokens = row.get::<Option<i64>, _>("prompt_tokens").map(|v| v as u32);
110 let completion_tokens = row
111 .get::<Option<i64>, _>("completion_tokens")
112 .map(|v| v as u32);
113 let total_tokens = row.get::<Option<i64>, _>("total_tokens").map(|v| v as u32);
114 let time_to_first_token_ms = row
115 .get::<Option<i64>, _>("time_to_first_token_ms")
116 .map(|v| v as u64);
117 let duration_ms = row.get::<Option<i64>, _>("duration_ms").map(|v| v as u64);
118 let status_code = row.get::<Option<i64>, _>("status_code").map(|v| v as u16);
119 let error_message: Option<String> = row.get("error_message");
120 let estimated_cost_usd: Option<f64> = row.get("estimated_cost_usd");
121 let security_score = row.get::<Option<i64>, _>("security_score").map(|v| v as u8);
122
123 let security_findings: Vec<SecurityFinding> = {
124 let raw: String = row.get("security_findings");
125 serde_json::from_str(&raw)
126 .map_err(|e| LLMTraceError::Storage(format!("Invalid security_findings JSON: {e}")))?
127 };
128 let tags: HashMap<String, String> = {
129 let raw: String = row.get("tags");
130 serde_json::from_str(&raw)
131 .map_err(|e| LLMTraceError::Storage(format!("Invalid tags JSON: {e}")))?
132 };
133 let events: Vec<SpanEvent> = {
134 let raw: String = row.get("events");
135 serde_json::from_str(&raw)
136 .map_err(|e| LLMTraceError::Storage(format!("Invalid events JSON: {e}")))?
137 };
138 let agent_actions: Vec<AgentAction> = {
139 let raw: String = row.get("agent_actions");
140 serde_json::from_str(&raw)
141 .map_err(|e| LLMTraceError::Storage(format!("Invalid agent_actions JSON: {e}")))?
142 };
143
144 Ok(TraceSpan {
145 trace_id,
146 span_id,
147 parent_span_id,
148 tenant_id,
149 operation_name,
150 start_time,
151 end_time,
152 provider,
153 model_name,
154 prompt,
155 response,
156 prompt_tokens,
157 completion_tokens,
158 total_tokens,
159 time_to_first_token_ms,
160 duration_ms,
161 status_code,
162 error_message,
163 estimated_cost_usd,
164 security_score,
165 security_findings,
166 tags,
167 events,
168 agent_actions,
169 })
170}
171
172fn report_from_row(row: &sqlx::sqlite::SqliteRow) -> Result<llmtrace_core::ComplianceReportRecord> {
178 let content: Option<serde_json::Value> = row
179 .get::<Option<String>, _>("content")
180 .map(|s| serde_json::from_str(&s))
181 .transpose()
182 .map_err(|e| LLMTraceError::Storage(format!("Invalid report content JSON: {e}")))?;
183
184 Ok(llmtrace_core::ComplianceReportRecord {
185 id: parse_uuid(&row.get::<String, _>("id"))?,
186 tenant_id: TenantId(parse_uuid(&row.get::<String, _>("tenant_id"))?),
187 report_type: row.get("report_type"),
188 status: row.get("status"),
189 period_start: parse_datetime(&row.get::<String, _>("period_start"))?,
190 period_end: parse_datetime(&row.get::<String, _>("period_end"))?,
191 created_at: parse_datetime(&row.get::<String, _>("created_at"))?,
192 completed_at: row
193 .get::<Option<String>, _>("completed_at")
194 .map(|s| parse_datetime(&s))
195 .transpose()?,
196 content,
197 error: row.get("error"),
198 })
199}
200
201pub struct SqliteTraceRepository {
211 pool: SqlitePool,
212}
213
214impl SqliteTraceRepository {
215 pub async fn new(database_url: &str) -> Result<Self> {
226 let pool = open_pool(database_url).await?;
227 run_migrations(&pool).await?;
228 Ok(Self { pool })
229 }
230
231 pub(crate) async fn from_pool(pool: SqlitePool) -> Result<Self> {
233 run_migrations(&pool).await?;
234 Ok(Self { pool })
235 }
236
237 async fn insert_span<'e, E>(&self, executor: E, span: &TraceSpan) -> Result<()>
239 where
240 E: sqlx::Executor<'e, Database = Sqlite>,
241 {
242 let security_findings_json = serde_json::to_string(&span.security_findings)
243 .map_err(|e| LLMTraceError::Storage(format!("serialize security_findings: {e}")))?;
244 let tags_json = serde_json::to_string(&span.tags)
245 .map_err(|e| LLMTraceError::Storage(format!("serialize tags: {e}")))?;
246 let events_json = serde_json::to_string(&span.events)
247 .map_err(|e| LLMTraceError::Storage(format!("serialize events: {e}")))?;
248 let agent_actions_json = serde_json::to_string(&span.agent_actions)
249 .map_err(|e| LLMTraceError::Storage(format!("serialize agent_actions: {e}")))?;
250
251 sqlx::query(
252 "INSERT OR REPLACE INTO spans (
253 span_id, trace_id, parent_span_id, tenant_id, operation_name,
254 start_time, end_time, provider, model_name, prompt,
255 response, prompt_tokens, completion_tokens, total_tokens,
256 time_to_first_token_ms, duration_ms, status_code, error_message,
257 estimated_cost_usd, security_score, security_findings, tags, events,
258 agent_actions
259 ) VALUES (
260 ?1, ?2, ?3, ?4, ?5,
261 ?6, ?7, ?8, ?9, ?10,
262 ?11, ?12, ?13, ?14,
263 ?15, ?16, ?17, ?18,
264 ?19, ?20, ?21, ?22, ?23,
265 ?24
266 )",
267 )
268 .bind(span.span_id.to_string())
269 .bind(span.trace_id.to_string())
270 .bind(span.parent_span_id.map(|id| id.to_string()))
271 .bind(span.tenant_id.0.to_string())
272 .bind(&span.operation_name)
273 .bind(span.start_time.to_rfc3339())
274 .bind(span.end_time.map(|t| t.to_rfc3339()))
275 .bind(serialize_provider(&span.provider))
276 .bind(&span.model_name)
277 .bind(&span.prompt)
278 .bind(span.response.as_deref())
279 .bind(span.prompt_tokens.map(|v| v as i64))
280 .bind(span.completion_tokens.map(|v| v as i64))
281 .bind(span.total_tokens.map(|v| v as i64))
282 .bind(span.time_to_first_token_ms.map(|v| v as i64))
283 .bind(span.duration_ms.map(|v| v as i64))
284 .bind(span.status_code.map(|v| v as i64))
285 .bind(span.error_message.as_deref())
286 .bind(span.estimated_cost_usd)
287 .bind(span.security_score.map(|v| v as i64))
288 .bind(&security_findings_json)
289 .bind(&tags_json)
290 .bind(&events_json)
291 .bind(&agent_actions_json)
292 .execute(executor)
293 .await
294 .map_err(|e| LLMTraceError::Storage(format!("Failed to insert span: {e}")))?;
295
296 Ok(())
297 }
298
299 async fn load_spans_for_traces(
301 &self,
302 tenant_id: &TenantId,
303 trace_ids: &[String],
304 ) -> Result<HashMap<String, Vec<TraceSpan>>> {
305 if trace_ids.is_empty() {
306 return Ok(HashMap::new());
307 }
308
309 let mut qb = QueryBuilder::<Sqlite>::new("SELECT * FROM spans WHERE tenant_id = ");
310 qb.push_bind(tenant_id.0.to_string());
311 qb.push(" AND trace_id IN (");
312 let mut sep = qb.separated(", ");
313 for tid in trace_ids {
314 sep.push_bind(tid.clone());
315 }
316 sep.push_unseparated(") ORDER BY start_time ASC");
317
318 let rows = qb
319 .build()
320 .fetch_all(&self.pool)
321 .await
322 .map_err(|e| LLMTraceError::Storage(format!("Failed to load spans: {e}")))?;
323
324 let mut grouped: HashMap<String, Vec<TraceSpan>> = HashMap::new();
325 for row in &rows {
326 let span = span_from_row(row)?;
327 grouped
328 .entry(span.trace_id.to_string())
329 .or_default()
330 .push(span);
331 }
332 Ok(grouped)
333 }
334
335 fn build_trace_id_query<'args>(&self, query: &TraceQuery) -> QueryBuilder<'args, Sqlite> {
337 let mut qb = QueryBuilder::<Sqlite>::new(
338 "SELECT DISTINCT s.trace_id FROM spans s \
339 JOIN traces t ON s.trace_id = t.trace_id AND s.tenant_id = t.tenant_id \
340 WHERE s.tenant_id = ",
341 );
342 qb.push_bind(query.tenant_id.0.to_string());
343
344 if let Some(ref trace_id) = query.trace_id {
345 qb.push(" AND s.trace_id = ");
346 qb.push_bind(trace_id.to_string());
347 }
348 if let Some(ref start) = query.start_time {
349 qb.push(" AND s.start_time >= ");
350 qb.push_bind(start.to_rfc3339());
351 }
352 if let Some(ref end) = query.end_time {
353 qb.push(" AND s.start_time <= ");
354 qb.push_bind(end.to_rfc3339());
355 }
356 if let Some(ref provider) = query.provider {
357 qb.push(" AND s.provider = ");
358 qb.push_bind(serialize_provider(provider));
359 }
360 if let Some(ref model) = query.model_name {
361 qb.push(" AND s.model_name = ");
362 qb.push_bind(model.clone());
363 }
364 if let Some(ref op) = query.operation_name {
365 qb.push(" AND s.operation_name = ");
366 qb.push_bind(op.clone());
367 }
368 if let Some(min) = query.min_security_score {
369 qb.push(" AND s.security_score >= ");
370 qb.push_bind(min as i64);
371 }
372 if let Some(max) = query.max_security_score {
373 qb.push(" AND s.security_score <= ");
374 qb.push_bind(max as i64);
375 }
376
377 qb.push(" ORDER BY t.created_at DESC");
378
379 if let Some(limit) = query.limit {
380 qb.push(" LIMIT ");
381 qb.push_bind(limit as i64);
382 }
383 if let Some(offset) = query.offset {
384 qb.push(" OFFSET ");
385 qb.push_bind(offset as i64);
386 }
387
388 qb
389 }
390
391 fn build_span_query<'args>(&self, query: &TraceQuery) -> QueryBuilder<'args, Sqlite> {
393 let mut qb = QueryBuilder::<Sqlite>::new("SELECT * FROM spans WHERE tenant_id = ");
394 qb.push_bind(query.tenant_id.0.to_string());
395
396 if let Some(ref trace_id) = query.trace_id {
397 qb.push(" AND trace_id = ");
398 qb.push_bind(trace_id.to_string());
399 }
400 if let Some(ref start) = query.start_time {
401 qb.push(" AND start_time >= ");
402 qb.push_bind(start.to_rfc3339());
403 }
404 if let Some(ref end) = query.end_time {
405 qb.push(" AND start_time <= ");
406 qb.push_bind(end.to_rfc3339());
407 }
408 if let Some(ref provider) = query.provider {
409 qb.push(" AND provider = ");
410 qb.push_bind(serialize_provider(provider));
411 }
412 if let Some(ref model) = query.model_name {
413 qb.push(" AND model_name = ");
414 qb.push_bind(model.clone());
415 }
416 if let Some(ref op) = query.operation_name {
417 qb.push(" AND operation_name = ");
418 qb.push_bind(op.clone());
419 }
420 if let Some(min) = query.min_security_score {
421 qb.push(" AND security_score >= ");
422 qb.push_bind(min as i64);
423 }
424 if let Some(max) = query.max_security_score {
425 qb.push(" AND security_score <= ");
426 qb.push_bind(max as i64);
427 }
428
429 qb.push(" ORDER BY start_time DESC");
430
431 if let Some(limit) = query.limit {
432 qb.push(" LIMIT ");
433 qb.push_bind(limit as i64);
434 }
435 if let Some(offset) = query.offset {
436 qb.push(" OFFSET ");
437 qb.push_bind(offset as i64);
438 }
439
440 qb
441 }
442}
443
444#[async_trait]
445impl TraceRepository for SqliteTraceRepository {
446 async fn store_trace(&self, trace: &TraceEvent) -> Result<()> {
447 let mut tx = self
448 .pool
449 .begin()
450 .await
451 .map_err(|e| LLMTraceError::Storage(format!("Failed to begin transaction: {e}")))?;
452
453 sqlx::query(
454 "INSERT OR REPLACE INTO traces (trace_id, tenant_id, created_at) VALUES (?1, ?2, ?3)",
455 )
456 .bind(trace.trace_id.to_string())
457 .bind(trace.tenant_id.0.to_string())
458 .bind(trace.created_at.to_rfc3339())
459 .execute(&mut *tx)
460 .await
461 .map_err(|e| LLMTraceError::Storage(format!("Failed to insert trace: {e}")))?;
462
463 for span in &trace.spans {
464 self.insert_span(&mut *tx, span).await?;
465 }
466
467 tx.commit()
468 .await
469 .map_err(|e| LLMTraceError::Storage(format!("Failed to commit transaction: {e}")))?;
470
471 Ok(())
472 }
473
474 async fn store_span(&self, span: &TraceSpan) -> Result<()> {
475 let mut tx = self
476 .pool
477 .begin()
478 .await
479 .map_err(|e| LLMTraceError::Storage(format!("Failed to begin transaction: {e}")))?;
480
481 sqlx::query(
482 "INSERT OR IGNORE INTO traces (trace_id, tenant_id, created_at) VALUES (?1, ?2, ?3)",
483 )
484 .bind(span.trace_id.to_string())
485 .bind(span.tenant_id.0.to_string())
486 .bind(span.start_time.to_rfc3339())
487 .execute(&mut *tx)
488 .await
489 .map_err(|e| LLMTraceError::Storage(format!("Failed to ensure trace row: {e}")))?;
490
491 self.insert_span(&mut *tx, span).await?;
492
493 tx.commit()
494 .await
495 .map_err(|e| LLMTraceError::Storage(format!("Failed to commit transaction: {e}")))?;
496
497 Ok(())
498 }
499
500 async fn query_traces(&self, query: &TraceQuery) -> Result<Vec<TraceEvent>> {
501 let mut qb = self.build_trace_id_query(query);
502 let rows = qb
503 .build()
504 .fetch_all(&self.pool)
505 .await
506 .map_err(|e| LLMTraceError::Storage(format!("Failed to query trace IDs: {e}")))?;
507
508 let trace_ids: Vec<String> = rows.iter().map(|r| r.get("trace_id")).collect();
509 if trace_ids.is_empty() {
510 return Ok(Vec::new());
511 }
512
513 let mut meta_qb = QueryBuilder::<Sqlite>::new("SELECT * FROM traces WHERE tenant_id = ");
514 meta_qb.push_bind(query.tenant_id.0.to_string());
515 meta_qb.push(" AND trace_id IN (");
516 let mut sep = meta_qb.separated(", ");
517 for tid in &trace_ids {
518 sep.push_bind(tid.clone());
519 }
520 sep.push_unseparated(") ORDER BY created_at DESC");
521
522 let meta_rows =
523 meta_qb.build().fetch_all(&self.pool).await.map_err(|e| {
524 LLMTraceError::Storage(format!("Failed to load trace metadata: {e}"))
525 })?;
526
527 let spans_map = self
528 .load_spans_for_traces(&query.tenant_id, &trace_ids)
529 .await?;
530
531 let mut results = Vec::with_capacity(meta_rows.len());
532 for row in &meta_rows {
533 let tid_str: String = row.get("trace_id");
534 let trace_id = parse_uuid(&tid_str)?;
535 let tenant_id = TenantId(parse_uuid(&row.get::<String, _>("tenant_id"))?);
536 let created_at = parse_datetime(&row.get::<String, _>("created_at"))?;
537 let spans = spans_map.get(&tid_str).cloned().unwrap_or_default();
538
539 results.push(TraceEvent {
540 trace_id,
541 tenant_id,
542 spans,
543 created_at,
544 });
545 }
546
547 Ok(results)
548 }
549
550 async fn query_spans(&self, query: &TraceQuery) -> Result<Vec<TraceSpan>> {
551 let mut qb = self.build_span_query(query);
552 let rows = qb
553 .build()
554 .fetch_all(&self.pool)
555 .await
556 .map_err(|e| LLMTraceError::Storage(format!("Failed to query spans: {e}")))?;
557
558 rows.iter().map(span_from_row).collect()
559 }
560
561 async fn get_trace(&self, tenant_id: TenantId, trace_id: Uuid) -> Result<Option<TraceEvent>> {
562 let row = sqlx::query("SELECT * FROM traces WHERE tenant_id = ?1 AND trace_id = ?2")
563 .bind(tenant_id.0.to_string())
564 .bind(trace_id.to_string())
565 .fetch_optional(&self.pool)
566 .await
567 .map_err(|e| LLMTraceError::Storage(format!("Failed to get trace: {e}")))?;
568
569 let Some(row) = row else {
570 return Ok(None);
571 };
572
573 let created_at = parse_datetime(&row.get::<String, _>("created_at"))?;
574
575 let span_rows = sqlx::query(
576 "SELECT * FROM spans WHERE tenant_id = ?1 AND trace_id = ?2 ORDER BY start_time ASC",
577 )
578 .bind(tenant_id.0.to_string())
579 .bind(trace_id.to_string())
580 .fetch_all(&self.pool)
581 .await
582 .map_err(|e| LLMTraceError::Storage(format!("Failed to load trace spans: {e}")))?;
583
584 let spans: Vec<TraceSpan> = span_rows.iter().map(span_from_row).collect::<Result<_>>()?;
585
586 Ok(Some(TraceEvent {
587 trace_id,
588 tenant_id,
589 spans,
590 created_at,
591 }))
592 }
593
594 async fn get_span(&self, tenant_id: TenantId, span_id: Uuid) -> Result<Option<TraceSpan>> {
595 let row = sqlx::query("SELECT * FROM spans WHERE tenant_id = ?1 AND span_id = ?2")
596 .bind(tenant_id.0.to_string())
597 .bind(span_id.to_string())
598 .fetch_optional(&self.pool)
599 .await
600 .map_err(|e| LLMTraceError::Storage(format!("Failed to get span: {e}")))?;
601
602 match row {
603 Some(ref r) => Ok(Some(span_from_row(r)?)),
604 None => Ok(None),
605 }
606 }
607
608 async fn delete_traces_before(
609 &self,
610 tenant_id: TenantId,
611 before: DateTime<Utc>,
612 ) -> Result<u64> {
613 let mut tx = self
614 .pool
615 .begin()
616 .await
617 .map_err(|e| LLMTraceError::Storage(format!("Failed to begin transaction: {e}")))?;
618
619 let before_str = before.to_rfc3339();
620 let tid = tenant_id.0.to_string();
621
622 sqlx::query(
623 "DELETE FROM spans WHERE tenant_id = ?1 AND trace_id IN \
624 (SELECT trace_id FROM traces WHERE tenant_id = ?2 AND created_at < ?3)",
625 )
626 .bind(&tid)
627 .bind(&tid)
628 .bind(&before_str)
629 .execute(&mut *tx)
630 .await
631 .map_err(|e| LLMTraceError::Storage(format!("Failed to delete spans: {e}")))?;
632
633 let result = sqlx::query("DELETE FROM traces WHERE tenant_id = ?1 AND created_at < ?2")
634 .bind(&tid)
635 .bind(&before_str)
636 .execute(&mut *tx)
637 .await
638 .map_err(|e| LLMTraceError::Storage(format!("Failed to delete traces: {e}")))?;
639
640 tx.commit()
641 .await
642 .map_err(|e| LLMTraceError::Storage(format!("Failed to commit delete: {e}")))?;
643
644 Ok(result.rows_affected())
645 }
646
647 async fn get_stats(&self, tenant_id: TenantId) -> Result<StorageStats> {
648 let tid = tenant_id.0.to_string();
649
650 let trace_count: i64 =
651 sqlx::query("SELECT COUNT(*) as cnt FROM traces WHERE tenant_id = ?1")
652 .bind(&tid)
653 .fetch_one(&self.pool)
654 .await
655 .map_err(|e| LLMTraceError::Storage(format!("Failed to count traces: {e}")))?
656 .get("cnt");
657
658 let span_count: i64 = sqlx::query("SELECT COUNT(*) as cnt FROM spans WHERE tenant_id = ?1")
659 .bind(&tid)
660 .fetch_one(&self.pool)
661 .await
662 .map_err(|e| LLMTraceError::Storage(format!("Failed to count spans: {e}")))?
663 .get("cnt");
664
665 let size_row = sqlx::query(
666 "SELECT COALESCE(SUM(LENGTH(prompt) + COALESCE(LENGTH(response), 0)), 0) as sz \
667 FROM spans WHERE tenant_id = ?1",
668 )
669 .bind(&tid)
670 .fetch_one(&self.pool)
671 .await
672 .map_err(|e| LLMTraceError::Storage(format!("Failed to calculate size: {e}")))?;
673 let storage_size_bytes: i64 = size_row.get("sz");
674
675 let time_row = sqlx::query(
676 "SELECT MIN(created_at) as oldest, MAX(created_at) as newest \
677 FROM traces WHERE tenant_id = ?1",
678 )
679 .bind(&tid)
680 .fetch_one(&self.pool)
681 .await
682 .map_err(|e| LLMTraceError::Storage(format!("Failed to get time range: {e}")))?;
683
684 let oldest_trace = time_row
685 .get::<Option<String>, _>("oldest")
686 .map(|s| parse_datetime(&s))
687 .transpose()?;
688 let newest_trace = time_row
689 .get::<Option<String>, _>("newest")
690 .map(|s| parse_datetime(&s))
691 .transpose()?;
692
693 Ok(StorageStats {
694 total_traces: trace_count as u64,
695 total_spans: span_count as u64,
696 storage_size_bytes: storage_size_bytes as u64,
697 oldest_trace,
698 newest_trace,
699 })
700 }
701
702 async fn health_check(&self) -> Result<()> {
703 sqlx::query("SELECT 1")
704 .fetch_one(&self.pool)
705 .await
706 .map_err(|e| LLMTraceError::Storage(format!("Health check failed: {e}")))?;
707 Ok(())
708 }
709}
710
711pub struct SqliteMetadataRepository {
717 pool: SqlitePool,
718}
719
720impl SqliteMetadataRepository {
721 pub async fn new(database_url: &str) -> Result<Self> {
723 let pool = open_pool(database_url).await?;
724 run_migrations(&pool).await?;
725 Ok(Self { pool })
726 }
727
728 pub(crate) async fn from_pool(pool: SqlitePool) -> Result<Self> {
730 run_migrations(&pool).await?;
731 Ok(Self { pool })
732 }
733}
734
735#[async_trait]
736impl MetadataRepository for SqliteMetadataRepository {
737 async fn create_tenant(&self, tenant: &Tenant) -> Result<()> {
738 let config_json = serde_json::to_string(&tenant.config)
739 .map_err(|e| LLMTraceError::Storage(format!("serialize tenant config: {e}")))?;
740
741 sqlx::query(
742 "INSERT INTO tenants (id, name, api_token, plan, created_at, config) VALUES (?1, ?2, ?3, ?4, ?5, ?6)",
743 )
744 .bind(tenant.id.0.to_string())
745 .bind(&tenant.name)
746 .bind(&tenant.api_token)
747 .bind(&tenant.plan)
748 .bind(tenant.created_at.to_rfc3339())
749 .bind(&config_json)
750 .execute(&self.pool)
751 .await
752 .map_err(|e| LLMTraceError::Storage(format!("Failed to create tenant: {e}")))?;
753
754 Ok(())
755 }
756
757 async fn get_tenant(&self, id: TenantId) -> Result<Option<Tenant>> {
758 let row = sqlx::query("SELECT * FROM tenants WHERE id = ?1")
759 .bind(id.0.to_string())
760 .fetch_optional(&self.pool)
761 .await
762 .map_err(|e| LLMTraceError::Storage(format!("Failed to get tenant: {e}")))?;
763
764 let Some(row) = row else {
765 return Ok(None);
766 };
767
768 let config_str: String = row.get("config");
769 let config: serde_json::Value = serde_json::from_str(&config_str)
770 .map_err(|e| LLMTraceError::Storage(format!("Invalid tenant config JSON: {e}")))?;
771
772 Ok(Some(Tenant {
773 id: TenantId(parse_uuid(&row.get::<String, _>("id"))?),
774 name: row.get("name"),
775 api_token: row
776 .get::<Option<String>, _>("api_token")
777 .unwrap_or_default(),
778 plan: row.get("plan"),
779 created_at: parse_datetime(&row.get::<String, _>("created_at"))?,
780 config,
781 }))
782 }
783
784 async fn get_tenant_by_token(&self, token: &str) -> Result<Option<Tenant>> {
785 let row = sqlx::query("SELECT * FROM tenants WHERE api_token = ?1")
786 .bind(token)
787 .fetch_optional(&self.pool)
788 .await
789 .map_err(|e| LLMTraceError::Storage(format!("Failed to get tenant by token: {e}")))?;
790
791 let Some(row) = row else {
792 return Ok(None);
793 };
794
795 let config_str: String = row.get("config");
796 let config: serde_json::Value = serde_json::from_str(&config_str)
797 .map_err(|e| LLMTraceError::Storage(format!("Invalid tenant config JSON: {e}")))?;
798
799 Ok(Some(Tenant {
800 id: TenantId(parse_uuid(&row.get::<String, _>("id"))?),
801 name: row.get("name"),
802 api_token: row
803 .get::<Option<String>, _>("api_token")
804 .unwrap_or_default(),
805 plan: row.get("plan"),
806 created_at: parse_datetime(&row.get::<String, _>("created_at"))?,
807 config,
808 }))
809 }
810
811 async fn update_tenant(&self, tenant: &Tenant) -> Result<()> {
812 let config_json = serde_json::to_string(&tenant.config)
813 .map_err(|e| LLMTraceError::Storage(format!("serialize tenant config: {e}")))?;
814
815 let result = sqlx::query(
816 "UPDATE tenants SET name = ?1, plan = ?2, config = ?3, api_token = ?4 WHERE id = ?5",
817 )
818 .bind(&tenant.name)
819 .bind(&tenant.plan)
820 .bind(&config_json)
821 .bind(&tenant.api_token)
822 .bind(tenant.id.0.to_string())
823 .execute(&self.pool)
824 .await
825 .map_err(|e| LLMTraceError::Storage(format!("Failed to update tenant: {e}")))?;
826
827 if result.rows_affected() == 0 {
828 return Err(LLMTraceError::InvalidTenant {
829 tenant_id: tenant.id,
830 });
831 }
832 Ok(())
833 }
834
835 async fn list_tenants(&self) -> Result<Vec<Tenant>> {
836 let rows = sqlx::query("SELECT * FROM tenants ORDER BY created_at DESC")
837 .fetch_all(&self.pool)
838 .await
839 .map_err(|e| LLMTraceError::Storage(format!("Failed to list tenants: {e}")))?;
840
841 rows.iter()
842 .map(|row| {
843 let config_str: String = row.get("config");
844 let config: serde_json::Value = serde_json::from_str(&config_str).map_err(|e| {
845 LLMTraceError::Storage(format!("Invalid tenant config JSON: {e}"))
846 })?;
847 Ok(Tenant {
848 id: TenantId(parse_uuid(&row.get::<String, _>("id"))?),
849 name: row.get("name"),
850 api_token: row
851 .get::<Option<String>, _>("api_token")
852 .unwrap_or_default(),
853 plan: row.get("plan"),
854 created_at: parse_datetime(&row.get::<String, _>("created_at"))?,
855 config,
856 })
857 })
858 .collect()
859 }
860
861 async fn delete_tenant(&self, id: TenantId) -> Result<()> {
862 sqlx::query("DELETE FROM tenants WHERE id = ?1")
863 .bind(id.0.to_string())
864 .execute(&self.pool)
865 .await
866 .map_err(|e| LLMTraceError::Storage(format!("Failed to delete tenant: {e}")))?;
867 Ok(())
868 }
869
870 async fn get_tenant_config(&self, tenant_id: TenantId) -> Result<Option<TenantConfig>> {
871 let row = sqlx::query("SELECT * FROM tenant_configs WHERE tenant_id = ?1")
872 .bind(tenant_id.0.to_string())
873 .fetch_optional(&self.pool)
874 .await
875 .map_err(|e| LLMTraceError::Storage(format!("Failed to get tenant config: {e}")))?;
876
877 let Some(row) = row else {
878 return Ok(None);
879 };
880
881 let thresholds_str: String = row.get("security_thresholds");
882 let flags_str: String = row.get("feature_flags");
883 let monitoring_scope_str: String = row.get("monitoring_scope");
884 let rate_limit_rpm: Option<i32> = row.get("rate_limit_rpm");
885 let monthly_budget: Option<f64> = row.get("monthly_budget");
886
887 Ok(Some(TenantConfig {
888 tenant_id: TenantId(parse_uuid(&row.get::<String, _>("tenant_id"))?),
889 security_thresholds: serde_json::from_str(&thresholds_str).map_err(|e| {
890 LLMTraceError::Storage(format!("Invalid security_thresholds JSON: {e}"))
891 })?,
892 feature_flags: serde_json::from_str(&flags_str)
893 .map_err(|e| LLMTraceError::Storage(format!("Invalid feature_flags JSON: {e}")))?,
894 monitoring_scope: monitoring_scope_str.parse().map_err(|e| {
895 LLMTraceError::Storage(format!("Invalid monitoring_scope value: {e}"))
896 })?,
897 rate_limit_rpm: rate_limit_rpm.map(|v| v as u32),
898 monthly_budget,
899 }))
900 }
901
902 async fn upsert_tenant_config(&self, config: &TenantConfig) -> Result<()> {
903 let thresholds_json = serde_json::to_string(&config.security_thresholds)
904 .map_err(|e| LLMTraceError::Storage(format!("serialize thresholds: {e}")))?;
905 let flags_json = serde_json::to_string(&config.feature_flags)
906 .map_err(|e| LLMTraceError::Storage(format!("serialize feature_flags: {e}")))?;
907
908 sqlx::query(
909 "INSERT INTO tenant_configs (tenant_id, security_thresholds, feature_flags, monitoring_scope, rate_limit_rpm, monthly_budget)
910 VALUES (?1, ?2, ?3, ?4, ?5, ?6)
911 ON CONFLICT(tenant_id) DO UPDATE SET
912 security_thresholds = excluded.security_thresholds,
913 feature_flags = excluded.feature_flags,
914 monitoring_scope = excluded.monitoring_scope,
915 rate_limit_rpm = excluded.rate_limit_rpm,
916 monthly_budget = excluded.monthly_budget",
917 )
918 .bind(config.tenant_id.0.to_string())
919 .bind(&thresholds_json)
920 .bind(&flags_json)
921 .bind(config.monitoring_scope.to_string())
922 .bind(config.rate_limit_rpm.map(|v| v as i32))
923 .bind(config.monthly_budget)
924 .execute(&self.pool)
925 .await
926 .map_err(|e| LLMTraceError::Storage(format!("Failed to upsert tenant config: {e}")))?;
927
928 Ok(())
929 }
930
931 async fn record_audit_event(&self, event: &AuditEvent) -> Result<()> {
932 let data_json = serde_json::to_string(&event.data)
933 .map_err(|e| LLMTraceError::Storage(format!("serialize audit data: {e}")))?;
934
935 sqlx::query(
936 "INSERT INTO audit_events (id, tenant_id, event_type, actor, resource, data, timestamp)
937 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)",
938 )
939 .bind(event.id.to_string())
940 .bind(event.tenant_id.0.to_string())
941 .bind(&event.event_type)
942 .bind(&event.actor)
943 .bind(&event.resource)
944 .bind(&data_json)
945 .bind(event.timestamp.to_rfc3339())
946 .execute(&self.pool)
947 .await
948 .map_err(|e| LLMTraceError::Storage(format!("Failed to record audit event: {e}")))?;
949
950 Ok(())
951 }
952
953 async fn query_audit_events(&self, query: &AuditQuery) -> Result<Vec<AuditEvent>> {
954 let mut qb = QueryBuilder::<Sqlite>::new("SELECT * FROM audit_events WHERE tenant_id = ");
955 qb.push_bind(query.tenant_id.0.to_string());
956
957 if let Some(ref event_type) = query.event_type {
958 qb.push(" AND event_type = ");
959 qb.push_bind(event_type.clone());
960 }
961 if let Some(ref start) = query.start_time {
962 qb.push(" AND timestamp >= ");
963 qb.push_bind(start.to_rfc3339());
964 }
965 if let Some(ref end) = query.end_time {
966 qb.push(" AND timestamp <= ");
967 qb.push_bind(end.to_rfc3339());
968 }
969
970 qb.push(" ORDER BY timestamp DESC");
971
972 if let Some(limit) = query.limit {
973 qb.push(" LIMIT ");
974 qb.push_bind(limit as i64);
975 }
976 if let Some(offset) = query.offset {
977 qb.push(" OFFSET ");
978 qb.push_bind(offset as i64);
979 }
980
981 let rows =
982 qb.build().fetch_all(&self.pool).await.map_err(|e| {
983 LLMTraceError::Storage(format!("Failed to query audit events: {e}"))
984 })?;
985
986 rows.iter()
987 .map(|row| {
988 let data_str: String = row.get("data");
989 let data: serde_json::Value = serde_json::from_str(&data_str).map_err(|e| {
990 LLMTraceError::Storage(format!("Invalid audit event data JSON: {e}"))
991 })?;
992 Ok(AuditEvent {
993 id: parse_uuid(&row.get::<String, _>("id"))?,
994 tenant_id: TenantId(parse_uuid(&row.get::<String, _>("tenant_id"))?),
995 event_type: row.get("event_type"),
996 actor: row.get("actor"),
997 resource: row.get("resource"),
998 data,
999 timestamp: parse_datetime(&row.get::<String, _>("timestamp"))?,
1000 })
1001 })
1002 .collect()
1003 }
1004
1005 async fn create_api_key(&self, key: &ApiKeyRecord) -> Result<()> {
1006 sqlx::query(
1007 "INSERT INTO api_keys (id, tenant_id, name, key_hash, key_prefix, role, created_at, revoked_at)
1008 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)",
1009 )
1010 .bind(key.id.to_string())
1011 .bind(key.tenant_id.0.to_string())
1012 .bind(&key.name)
1013 .bind(&key.key_hash)
1014 .bind(&key.key_prefix)
1015 .bind(key.role.to_string())
1016 .bind(key.created_at.to_rfc3339())
1017 .bind(key.revoked_at.map(|t| t.to_rfc3339()))
1018 .execute(&self.pool)
1019 .await
1020 .map_err(|e| LLMTraceError::Storage(format!("Failed to create API key: {e}")))?;
1021 Ok(())
1022 }
1023
1024 async fn get_api_key_by_hash(&self, key_hash: &str) -> Result<Option<ApiKeyRecord>> {
1025 let row = sqlx::query("SELECT * FROM api_keys WHERE key_hash = ?1 AND revoked_at IS NULL")
1026 .bind(key_hash)
1027 .fetch_optional(&self.pool)
1028 .await
1029 .map_err(|e| LLMTraceError::Storage(format!("Failed to look up API key: {e}")))?;
1030
1031 let Some(row) = row else {
1032 return Ok(None);
1033 };
1034
1035 Ok(Some(ApiKeyRecord {
1036 id: parse_uuid(&row.get::<String, _>("id"))?,
1037 tenant_id: TenantId(parse_uuid(&row.get::<String, _>("tenant_id"))?),
1038 name: row.get("name"),
1039 key_hash: row.get("key_hash"),
1040 key_prefix: row.get("key_prefix"),
1041 role: row
1042 .get::<String, _>("role")
1043 .parse::<ApiKeyRole>()
1044 .map_err(|e| LLMTraceError::Storage(format!("Invalid API key role: {e}")))?,
1045 created_at: parse_datetime(&row.get::<String, _>("created_at"))?,
1046 revoked_at: {
1047 let s: Option<String> = row.get("revoked_at");
1048 s.map(|v| parse_datetime(&v)).transpose()?
1049 },
1050 }))
1051 }
1052
1053 async fn list_api_keys(&self, tenant_id: TenantId) -> Result<Vec<ApiKeyRecord>> {
1054 let rows =
1055 sqlx::query("SELECT * FROM api_keys WHERE tenant_id = ?1 ORDER BY created_at DESC")
1056 .bind(tenant_id.0.to_string())
1057 .fetch_all(&self.pool)
1058 .await
1059 .map_err(|e| LLMTraceError::Storage(format!("Failed to list API keys: {e}")))?;
1060
1061 rows.iter()
1062 .map(|row| {
1063 Ok(ApiKeyRecord {
1064 id: parse_uuid(&row.get::<String, _>("id"))?,
1065 tenant_id: TenantId(parse_uuid(&row.get::<String, _>("tenant_id"))?),
1066 name: row.get("name"),
1067 key_hash: row.get("key_hash"),
1068 key_prefix: row.get("key_prefix"),
1069 role: row
1070 .get::<String, _>("role")
1071 .parse::<ApiKeyRole>()
1072 .map_err(|e| {
1073 LLMTraceError::Storage(format!("Invalid API key role: {e}"))
1074 })?,
1075 created_at: parse_datetime(&row.get::<String, _>("created_at"))?,
1076 revoked_at: {
1077 let s: Option<String> = row.get("revoked_at");
1078 s.map(|v| parse_datetime(&v)).transpose()?
1079 },
1080 })
1081 })
1082 .collect()
1083 }
1084
1085 async fn revoke_api_key(&self, key_id: Uuid) -> Result<bool> {
1086 let result =
1087 sqlx::query("UPDATE api_keys SET revoked_at = ?1 WHERE id = ?2 AND revoked_at IS NULL")
1088 .bind(Utc::now().to_rfc3339())
1089 .bind(key_id.to_string())
1090 .execute(&self.pool)
1091 .await
1092 .map_err(|e| LLMTraceError::Storage(format!("Failed to revoke API key: {e}")))?;
1093
1094 Ok(result.rows_affected() > 0)
1095 }
1096
1097 async fn store_report(&self, report: &llmtrace_core::ComplianceReportRecord) -> Result<()> {
1098 let content_json = report
1099 .content
1100 .as_ref()
1101 .map(serde_json::to_string)
1102 .transpose()
1103 .map_err(|e| LLMTraceError::Storage(format!("serialize report content: {e}")))?;
1104
1105 sqlx::query(
1106 "INSERT OR REPLACE INTO compliance_reports
1107 (id, tenant_id, report_type, status, period_start, period_end, created_at, completed_at, content, error)
1108 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10)",
1109 )
1110 .bind(report.id.to_string())
1111 .bind(report.tenant_id.0.to_string())
1112 .bind(&report.report_type)
1113 .bind(&report.status)
1114 .bind(report.period_start.to_rfc3339())
1115 .bind(report.period_end.to_rfc3339())
1116 .bind(report.created_at.to_rfc3339())
1117 .bind(report.completed_at.map(|t| t.to_rfc3339()))
1118 .bind(content_json.as_deref())
1119 .bind(report.error.as_deref())
1120 .execute(&self.pool)
1121 .await
1122 .map_err(|e| LLMTraceError::Storage(format!("Failed to store compliance report: {e}")))?;
1123
1124 Ok(())
1125 }
1126
1127 async fn get_report(
1128 &self,
1129 report_id: Uuid,
1130 ) -> Result<Option<llmtrace_core::ComplianceReportRecord>> {
1131 let row = sqlx::query("SELECT * FROM compliance_reports WHERE id = ?1")
1132 .bind(report_id.to_string())
1133 .fetch_optional(&self.pool)
1134 .await
1135 .map_err(|e| LLMTraceError::Storage(format!("Failed to get report: {e}")))?;
1136
1137 let Some(row) = row else {
1138 return Ok(None);
1139 };
1140
1141 Ok(Some(report_from_row(&row)?))
1142 }
1143
1144 async fn list_reports(
1145 &self,
1146 query: &llmtrace_core::ReportQuery,
1147 ) -> Result<Vec<llmtrace_core::ComplianceReportRecord>> {
1148 let mut qb =
1149 QueryBuilder::<Sqlite>::new("SELECT * FROM compliance_reports WHERE tenant_id = ");
1150 qb.push_bind(query.tenant_id.0.to_string());
1151 qb.push(" ORDER BY created_at DESC");
1152
1153 if let Some(limit) = query.limit {
1154 qb.push(" LIMIT ");
1155 qb.push_bind(limit as i64);
1156 }
1157 if let Some(offset) = query.offset {
1158 qb.push(" OFFSET ");
1159 qb.push_bind(offset as i64);
1160 }
1161
1162 let rows = qb.build().fetch_all(&self.pool).await.map_err(|e| {
1163 LLMTraceError::Storage(format!("Failed to list compliance reports: {e}"))
1164 })?;
1165
1166 rows.iter().map(report_from_row).collect()
1167 }
1168
1169 async fn health_check(&self) -> Result<()> {
1170 sqlx::query("SELECT 1")
1171 .fetch_one(&self.pool)
1172 .await
1173 .map_err(|e| LLMTraceError::Storage(format!("Metadata health check failed: {e}")))?;
1174 Ok(())
1175 }
1176}
1177
1178#[cfg(test)]
1183mod tests {
1184 use super::*;
1185 use llmtrace_core::{SecurityFinding, SecuritySeverity, SpanEvent};
1186
1187 async fn test_storage() -> SqliteTraceRepository {
1189 SqliteTraceRepository::new("sqlite::memory:").await.unwrap()
1190 }
1191
1192 fn make_span(
1194 trace_id: Uuid,
1195 tenant_id: TenantId,
1196 provider: LLMProvider,
1197 model: &str,
1198 ) -> TraceSpan {
1199 TraceSpan::new(
1200 trace_id,
1201 tenant_id,
1202 "chat_completion".to_string(),
1203 provider,
1204 model.to_string(),
1205 "Hello, world!".to_string(),
1206 )
1207 }
1208
1209 fn make_trace(tenant_id: TenantId) -> TraceEvent {
1211 let trace_id = Uuid::new_v4();
1212 let span = make_span(trace_id, tenant_id, LLMProvider::OpenAI, "gpt-4");
1213 TraceEvent {
1214 trace_id,
1215 tenant_id,
1216 spans: vec![span],
1217 created_at: Utc::now(),
1218 }
1219 }
1220
1221 #[tokio::test]
1224 async fn test_store_and_retrieve_trace() {
1225 let storage = test_storage().await;
1226 let tenant = TenantId::new();
1227 let trace = make_trace(tenant);
1228
1229 storage.store_trace(&trace).await.unwrap();
1230
1231 let retrieved = storage
1232 .get_trace(tenant, trace.trace_id)
1233 .await
1234 .unwrap()
1235 .expect("trace should exist");
1236
1237 assert_eq!(retrieved.trace_id, trace.trace_id);
1238 assert_eq!(retrieved.tenant_id, tenant);
1239 assert_eq!(retrieved.spans.len(), 1);
1240 assert_eq!(retrieved.spans[0].model_name, "gpt-4");
1241 assert_eq!(retrieved.spans[0].prompt, "Hello, world!");
1242 }
1243
1244 #[tokio::test]
1245 async fn test_store_span_individually() {
1246 let storage = test_storage().await;
1247 let tenant = TenantId::new();
1248 let trace_id = Uuid::new_v4();
1249 let span = make_span(trace_id, tenant, LLMProvider::Anthropic, "claude-3");
1250
1251 storage.store_span(&span).await.unwrap();
1252
1253 let retrieved = storage
1254 .get_span(tenant, span.span_id)
1255 .await
1256 .unwrap()
1257 .expect("span should exist");
1258
1259 assert_eq!(retrieved.span_id, span.span_id);
1260 assert_eq!(retrieved.provider, LLMProvider::Anthropic);
1261 assert_eq!(retrieved.model_name, "claude-3");
1262
1263 let trace = storage
1264 .get_trace(tenant, trace_id)
1265 .await
1266 .unwrap()
1267 .expect("parent trace row should exist");
1268 assert_eq!(trace.spans.len(), 1);
1269 }
1270
1271 #[tokio::test]
1272 async fn test_get_nonexistent_trace() {
1273 let storage = test_storage().await;
1274 let tenant = TenantId::new();
1275 let result = storage.get_trace(tenant, Uuid::new_v4()).await.unwrap();
1276 assert!(result.is_none());
1277 }
1278
1279 #[tokio::test]
1280 async fn test_get_nonexistent_span() {
1281 let storage = test_storage().await;
1282 let tenant = TenantId::new();
1283 let result = storage.get_span(tenant, Uuid::new_v4()).await.unwrap();
1284 assert!(result.is_none());
1285 }
1286
1287 #[tokio::test]
1290 async fn test_query_by_tenant_isolation() {
1291 let storage = test_storage().await;
1292 let tenant_a = TenantId::new();
1293 let tenant_b = TenantId::new();
1294
1295 storage.store_trace(&make_trace(tenant_a)).await.unwrap();
1296 storage.store_trace(&make_trace(tenant_b)).await.unwrap();
1297
1298 let results_a = storage
1299 .query_traces(&TraceQuery::new(tenant_a))
1300 .await
1301 .unwrap();
1302 assert_eq!(results_a.len(), 1);
1303 assert_eq!(results_a[0].tenant_id, tenant_a);
1304
1305 let results_b = storage
1306 .query_traces(&TraceQuery::new(tenant_b))
1307 .await
1308 .unwrap();
1309 assert_eq!(results_b.len(), 1);
1310 assert_eq!(results_b[0].tenant_id, tenant_b);
1311 }
1312
1313 #[tokio::test]
1314 async fn test_query_by_time_range() {
1315 let storage = test_storage().await;
1316 let tenant = TenantId::new();
1317
1318 let old_time = Utc::now() - chrono::Duration::hours(2);
1319 let recent_time = Utc::now();
1320
1321 let trace_id_old = Uuid::new_v4();
1322 let mut span_old = make_span(trace_id_old, tenant, LLMProvider::OpenAI, "gpt-4");
1323 span_old.start_time = old_time;
1324 let old_trace = TraceEvent {
1325 trace_id: trace_id_old,
1326 tenant_id: tenant,
1327 spans: vec![span_old],
1328 created_at: old_time,
1329 };
1330
1331 let trace_id_new = Uuid::new_v4();
1332 let mut span_new = make_span(trace_id_new, tenant, LLMProvider::OpenAI, "gpt-4");
1333 span_new.start_time = recent_time;
1334 let new_trace = TraceEvent {
1335 trace_id: trace_id_new,
1336 tenant_id: tenant,
1337 spans: vec![span_new],
1338 created_at: recent_time,
1339 };
1340
1341 storage.store_trace(&old_trace).await.unwrap();
1342 storage.store_trace(&new_trace).await.unwrap();
1343
1344 let one_hour_ago = Utc::now() - chrono::Duration::hours(1);
1345 let query = TraceQuery::new(tenant).with_time_range(one_hour_ago, Utc::now());
1346 let results = storage.query_traces(&query).await.unwrap();
1347 assert_eq!(results.len(), 1);
1348 assert_eq!(results[0].trace_id, trace_id_new);
1349 }
1350
1351 #[tokio::test]
1352 async fn test_query_by_provider() {
1353 let storage = test_storage().await;
1354 let tenant = TenantId::new();
1355
1356 let tid1 = Uuid::new_v4();
1357 let t1 = TraceEvent {
1358 trace_id: tid1,
1359 tenant_id: tenant,
1360 spans: vec![make_span(tid1, tenant, LLMProvider::OpenAI, "gpt-4")],
1361 created_at: Utc::now(),
1362 };
1363 let tid2 = Uuid::new_v4();
1364 let t2 = TraceEvent {
1365 trace_id: tid2,
1366 tenant_id: tenant,
1367 spans: vec![make_span(tid2, tenant, LLMProvider::Anthropic, "claude-3")],
1368 created_at: Utc::now(),
1369 };
1370
1371 storage.store_trace(&t1).await.unwrap();
1372 storage.store_trace(&t2).await.unwrap();
1373
1374 let query = TraceQuery::new(tenant).with_provider(LLMProvider::Anthropic);
1375 let results = storage.query_traces(&query).await.unwrap();
1376 assert_eq!(results.len(), 1);
1377 assert_eq!(results[0].trace_id, tid2);
1378 }
1379
1380 #[tokio::test]
1381 async fn test_query_by_security_score() {
1382 let storage = test_storage().await;
1383 let tenant = TenantId::new();
1384
1385 let tid = Uuid::new_v4();
1386 let mut span = make_span(tid, tenant, LLMProvider::OpenAI, "gpt-4");
1387 span.add_security_finding(SecurityFinding::new(
1388 SecuritySeverity::Critical,
1389 "prompt_injection".to_string(),
1390 "test finding".to_string(),
1391 0.99,
1392 ));
1393
1394 let trace = TraceEvent {
1395 trace_id: tid,
1396 tenant_id: tenant,
1397 spans: vec![span],
1398 created_at: Utc::now(),
1399 };
1400 storage.store_trace(&trace).await.unwrap();
1401 storage.store_trace(&make_trace(tenant)).await.unwrap();
1402
1403 let query = TraceQuery::new(tenant).with_security_score_range(80, 100);
1404 let results = storage.query_traces(&query).await.unwrap();
1405 assert_eq!(results.len(), 1);
1406 assert_eq!(results[0].trace_id, tid);
1407 }
1408
1409 #[tokio::test]
1410 async fn test_query_with_limit_and_offset() {
1411 let storage = test_storage().await;
1412 let tenant = TenantId::new();
1413
1414 for _ in 0..5 {
1415 storage.store_trace(&make_trace(tenant)).await.unwrap();
1416 }
1417
1418 let query = TraceQuery::new(tenant).with_limit(2);
1419 let page1 = storage.query_traces(&query).await.unwrap();
1420 assert_eq!(page1.len(), 2);
1421
1422 let mut query2 = TraceQuery::new(tenant).with_limit(2);
1423 query2.offset = Some(2);
1424 let page2 = storage.query_traces(&query2).await.unwrap();
1425 assert_eq!(page2.len(), 2);
1426
1427 let ids1: Vec<Uuid> = page1.iter().map(|t| t.trace_id).collect();
1428 let ids2: Vec<Uuid> = page2.iter().map(|t| t.trace_id).collect();
1429 for id in &ids2 {
1430 assert!(!ids1.contains(id));
1431 }
1432 }
1433
1434 #[tokio::test]
1435 async fn test_query_nonexistent_tenant() {
1436 let storage = test_storage().await;
1437 let tenant = TenantId::new();
1438 storage.store_trace(&make_trace(tenant)).await.unwrap();
1439
1440 let other_tenant = TenantId::new();
1441 let results = storage
1442 .query_traces(&TraceQuery::new(other_tenant))
1443 .await
1444 .unwrap();
1445 assert!(results.is_empty());
1446 }
1447
1448 #[tokio::test]
1449 async fn test_query_spans_directly() {
1450 let storage = test_storage().await;
1451 let tenant = TenantId::new();
1452 let trace = make_trace(tenant);
1453 storage.store_trace(&trace).await.unwrap();
1454
1455 let spans = storage.query_spans(&TraceQuery::new(tenant)).await.unwrap();
1456 assert_eq!(spans.len(), 1);
1457 assert_eq!(spans[0].trace_id, trace.trace_id);
1458 }
1459
1460 #[tokio::test]
1461 async fn test_query_spans_by_provider() {
1462 let storage = test_storage().await;
1463 let tenant = TenantId::new();
1464
1465 let tid1 = Uuid::new_v4();
1466 let tid2 = Uuid::new_v4();
1467 let s1 = make_span(tid1, tenant, LLMProvider::OpenAI, "gpt-4");
1468 let s2 = make_span(tid2, tenant, LLMProvider::Anthropic, "claude-3");
1469 storage.store_span(&s1).await.unwrap();
1470 storage.store_span(&s2).await.unwrap();
1471
1472 let query = TraceQuery::new(tenant).with_provider(LLMProvider::OpenAI);
1473 let spans = storage.query_spans(&query).await.unwrap();
1474 assert_eq!(spans.len(), 1);
1475 assert_eq!(spans[0].provider, LLMProvider::OpenAI);
1476 }
1477
1478 #[tokio::test]
1479 async fn test_delete_traces_before() {
1480 let storage = test_storage().await;
1481 let tenant = TenantId::new();
1482
1483 let old_time = Utc::now() - chrono::Duration::hours(2);
1484 let recent_time = Utc::now();
1485
1486 let tid_old = Uuid::new_v4();
1487 let old_trace = TraceEvent {
1488 trace_id: tid_old,
1489 tenant_id: tenant,
1490 spans: vec![make_span(tid_old, tenant, LLMProvider::OpenAI, "gpt-4")],
1491 created_at: old_time,
1492 };
1493 let tid_new = Uuid::new_v4();
1494 let new_trace = TraceEvent {
1495 trace_id: tid_new,
1496 tenant_id: tenant,
1497 spans: vec![make_span(tid_new, tenant, LLMProvider::OpenAI, "gpt-4")],
1498 created_at: recent_time,
1499 };
1500
1501 storage.store_trace(&old_trace).await.unwrap();
1502 storage.store_trace(&new_trace).await.unwrap();
1503
1504 let cutoff = Utc::now() - chrono::Duration::hours(1);
1505 let deleted = storage.delete_traces_before(tenant, cutoff).await.unwrap();
1506 assert_eq!(deleted, 1);
1507
1508 let remaining = storage
1509 .query_traces(&TraceQuery::new(tenant))
1510 .await
1511 .unwrap();
1512 assert_eq!(remaining.len(), 1);
1513 assert_eq!(remaining[0].trace_id, tid_new);
1514
1515 let old_span = storage.get_trace(tenant, tid_old).await.unwrap();
1516 assert!(old_span.is_none());
1517 }
1518
1519 #[tokio::test]
1520 async fn test_get_stats() {
1521 let storage = test_storage().await;
1522 let tenant = TenantId::new();
1523
1524 storage.store_trace(&make_trace(tenant)).await.unwrap();
1525 storage.store_trace(&make_trace(tenant)).await.unwrap();
1526
1527 let stats = storage.get_stats(tenant).await.unwrap();
1528 assert_eq!(stats.total_traces, 2);
1529 assert_eq!(stats.total_spans, 2);
1530 assert!(stats.storage_size_bytes > 0);
1531 assert!(stats.oldest_trace.is_some());
1532 assert!(stats.newest_trace.is_some());
1533 }
1534
1535 #[tokio::test]
1536 async fn test_get_stats_empty_tenant() {
1537 let storage = test_storage().await;
1538 let tenant = TenantId::new();
1539 let stats = storage.get_stats(tenant).await.unwrap();
1540 assert_eq!(stats.total_traces, 0);
1541 assert_eq!(stats.total_spans, 0);
1542 assert_eq!(stats.storage_size_bytes, 0);
1543 assert!(stats.oldest_trace.is_none());
1544 }
1545
1546 #[tokio::test]
1547 async fn test_health_check() {
1548 let storage = test_storage().await;
1549 assert!(storage.health_check().await.is_ok());
1550 }
1551
1552 #[tokio::test]
1553 async fn test_duplicate_store_is_idempotent() {
1554 let storage = test_storage().await;
1555 let tenant = TenantId::new();
1556 let trace = make_trace(tenant);
1557
1558 storage.store_trace(&trace).await.unwrap();
1559 storage.store_trace(&trace).await.unwrap();
1560
1561 let results = storage
1562 .query_traces(&TraceQuery::new(tenant))
1563 .await
1564 .unwrap();
1565 assert_eq!(results.len(), 1);
1566 }
1567
1568 #[tokio::test]
1569 async fn test_store_trace_with_no_spans() {
1570 let storage = test_storage().await;
1571 let tenant = TenantId::new();
1572 let trace = TraceEvent {
1573 trace_id: Uuid::new_v4(),
1574 tenant_id: tenant,
1575 spans: vec![],
1576 created_at: Utc::now(),
1577 };
1578
1579 storage.store_trace(&trace).await.unwrap();
1580
1581 let retrieved = storage
1582 .get_trace(tenant, trace.trace_id)
1583 .await
1584 .unwrap()
1585 .expect("trace row should exist");
1586 assert!(retrieved.spans.is_empty());
1587 }
1588
1589 #[tokio::test]
1590 async fn test_roundtrip_complex_span_fields() {
1591 let storage = test_storage().await;
1592 let tenant = TenantId::new();
1593 let trace_id = Uuid::new_v4();
1594
1595 let mut span = make_span(
1596 trace_id,
1597 tenant,
1598 LLMProvider::Custom("my-llm".to_string()),
1599 "custom-v1",
1600 );
1601 span.response = Some("I am an AI assistant.".to_string());
1602 span.prompt_tokens = Some(10);
1603 span.completion_tokens = Some(20);
1604 span.total_tokens = Some(30);
1605 span.time_to_first_token_ms = Some(42);
1606 span.duration_ms = Some(123);
1607 span.status_code = Some(200);
1608 span.estimated_cost_usd = Some(0.0015);
1609 span.tags.insert("env".to_string(), "test".to_string());
1610 span.add_event(
1611 SpanEvent::new("token_received".to_string(), "first chunk".to_string())
1612 .with_data("index".to_string(), "0".to_string()),
1613 );
1614 span.add_security_finding(SecurityFinding::new(
1615 SecuritySeverity::Medium,
1616 "pii_detected".to_string(),
1617 "email found".to_string(),
1618 0.85,
1619 ));
1620
1621 let trace = TraceEvent {
1622 trace_id,
1623 tenant_id: tenant,
1624 spans: vec![span.clone()],
1625 created_at: Utc::now(),
1626 };
1627
1628 storage.store_trace(&trace).await.unwrap();
1629
1630 let retrieved = storage
1631 .get_span(tenant, span.span_id)
1632 .await
1633 .unwrap()
1634 .expect("span should exist");
1635
1636 assert_eq!(
1637 retrieved.provider,
1638 LLMProvider::Custom("my-llm".to_string())
1639 );
1640 assert_eq!(retrieved.response.as_deref(), Some("I am an AI assistant."));
1641 assert_eq!(retrieved.prompt_tokens, Some(10));
1642 assert_eq!(retrieved.completion_tokens, Some(20));
1643 assert_eq!(retrieved.total_tokens, Some(30));
1644 assert_eq!(retrieved.time_to_first_token_ms, Some(42));
1645 assert_eq!(retrieved.duration_ms, Some(123));
1646 assert_eq!(retrieved.status_code, Some(200));
1647 assert!((retrieved.estimated_cost_usd.unwrap() - 0.0015).abs() < f64::EPSILON);
1648 assert_eq!(retrieved.security_score, Some(60));
1649 assert_eq!(retrieved.security_findings.len(), 1);
1650 assert_eq!(retrieved.security_findings[0].finding_type, "pii_detected");
1651 assert_eq!(retrieved.tags.get("env"), Some(&"test".to_string()));
1652 assert_eq!(retrieved.events.len(), 1);
1653 assert_eq!(retrieved.events[0].event_type, "token_received");
1654 }
1655
1656 #[tokio::test]
1657 async fn test_trace_with_multiple_spans() {
1658 let storage = test_storage().await;
1659 let tenant = TenantId::new();
1660 let trace_id = Uuid::new_v4();
1661
1662 let span1 = make_span(trace_id, tenant, LLMProvider::OpenAI, "gpt-4");
1663 let mut span2 = make_span(trace_id, tenant, LLMProvider::OpenAI, "gpt-4");
1664 span2.parent_span_id = Some(span1.span_id);
1665 span2.operation_name = "embedding".to_string();
1666
1667 let trace = TraceEvent {
1668 trace_id,
1669 tenant_id: tenant,
1670 spans: vec![span1.clone(), span2.clone()],
1671 created_at: Utc::now(),
1672 };
1673
1674 storage.store_trace(&trace).await.unwrap();
1675
1676 let retrieved = storage
1677 .get_trace(tenant, trace_id)
1678 .await
1679 .unwrap()
1680 .expect("trace should exist");
1681 assert_eq!(retrieved.spans.len(), 2);
1682
1683 let child = retrieved
1684 .spans
1685 .iter()
1686 .find(|s| s.parent_span_id.is_some())
1687 .expect("child span should be present");
1688 assert_eq!(child.parent_span_id, Some(span1.span_id));
1689 }
1690
1691 async fn test_metadata() -> SqliteMetadataRepository {
1694 SqliteMetadataRepository::new("sqlite::memory:")
1695 .await
1696 .unwrap()
1697 }
1698
1699 #[tokio::test]
1700 async fn test_tenant_crud() {
1701 let repo = test_metadata().await;
1702 let tenant = Tenant {
1703 id: TenantId::new(),
1704 name: "Acme Corp".to_string(),
1705 api_token: "test-token".to_string(),
1706 plan: "pro".to_string(),
1707 created_at: Utc::now(),
1708 config: serde_json::json!({"max_traces": 10000}),
1709 };
1710
1711 repo.create_tenant(&tenant).await.unwrap();
1712
1713 let retrieved = repo
1714 .get_tenant(tenant.id)
1715 .await
1716 .unwrap()
1717 .expect("tenant should exist");
1718 assert_eq!(retrieved.name, "Acme Corp");
1719 assert_eq!(retrieved.plan, "pro");
1720
1721 let mut updated = tenant.clone();
1722 updated.name = "Acme Inc".to_string();
1723 updated.plan = "enterprise".to_string();
1724 repo.update_tenant(&updated).await.unwrap();
1725
1726 let after_update = repo
1727 .get_tenant(tenant.id)
1728 .await
1729 .unwrap()
1730 .expect("tenant should exist");
1731 assert_eq!(after_update.name, "Acme Inc");
1732 assert_eq!(after_update.plan, "enterprise");
1733
1734 let tenants = repo.list_tenants().await.unwrap();
1735 assert_eq!(tenants.len(), 1);
1736
1737 repo.delete_tenant(tenant.id).await.unwrap();
1738 let gone = repo.get_tenant(tenant.id).await.unwrap();
1739 assert!(gone.is_none());
1740 }
1741
1742 #[tokio::test]
1743 async fn test_update_nonexistent_tenant_returns_error() {
1744 let repo = test_metadata().await;
1745 let tenant = Tenant {
1746 id: TenantId::new(),
1747 name: "Ghost".to_string(),
1748 api_token: "ghost-token".to_string(),
1749 plan: "free".to_string(),
1750 created_at: Utc::now(),
1751 config: serde_json::json!({}),
1752 };
1753 let result = repo.update_tenant(&tenant).await;
1754 assert!(result.is_err());
1755 }
1756
1757 #[tokio::test]
1758 async fn test_tenant_config_crud() {
1759 let repo = test_metadata().await;
1760 let tenant_id = TenantId::new();
1761
1762 let tenant = Tenant {
1764 id: tenant_id,
1765 name: "Test".to_string(),
1766 api_token: "test-token".to_string(),
1767 plan: "free".to_string(),
1768 created_at: Utc::now(),
1769 config: serde_json::json!({}),
1770 };
1771 repo.create_tenant(&tenant).await.unwrap();
1772
1773 let cfg = repo.get_tenant_config(tenant_id).await.unwrap();
1775 assert!(cfg.is_none());
1776
1777 let mut thresholds = HashMap::new();
1779 thresholds.insert("alert_min_score".to_string(), 80.0);
1780 let mut flags = HashMap::new();
1781 flags.insert("enable_pii".to_string(), true);
1782
1783 let config = TenantConfig {
1784 tenant_id,
1785 security_thresholds: thresholds,
1786 feature_flags: flags,
1787 monitoring_scope: llmtrace_core::MonitoringScope::Hybrid,
1788 rate_limit_rpm: None,
1789 monthly_budget: None,
1790 };
1791 repo.upsert_tenant_config(&config).await.unwrap();
1792
1793 let retrieved = repo
1794 .get_tenant_config(tenant_id)
1795 .await
1796 .unwrap()
1797 .expect("config should exist");
1798 assert_eq!(
1799 retrieved.security_thresholds.get("alert_min_score"),
1800 Some(&80.0)
1801 );
1802 assert_eq!(retrieved.feature_flags.get("enable_pii"), Some(&true));
1803
1804 let mut updated = config.clone();
1806 updated
1807 .feature_flags
1808 .insert("enable_pii".to_string(), false);
1809 repo.upsert_tenant_config(&updated).await.unwrap();
1810
1811 let after = repo
1812 .get_tenant_config(tenant_id)
1813 .await
1814 .unwrap()
1815 .expect("config should exist");
1816 assert_eq!(after.feature_flags.get("enable_pii"), Some(&false));
1817 }
1818
1819 #[tokio::test]
1820 async fn test_audit_events() {
1821 let repo = test_metadata().await;
1822 let tenant_id = TenantId::new();
1823
1824 let event1 = AuditEvent {
1825 id: Uuid::new_v4(),
1826 tenant_id,
1827 event_type: "config_changed".to_string(),
1828 actor: "admin".to_string(),
1829 resource: "tenant_config".to_string(),
1830 data: serde_json::json!({"field": "enable_pii", "old": false, "new": true}),
1831 timestamp: Utc::now() - chrono::Duration::minutes(5),
1832 };
1833 let event2 = AuditEvent {
1834 id: Uuid::new_v4(),
1835 tenant_id,
1836 event_type: "tenant_created".to_string(),
1837 actor: "system".to_string(),
1838 resource: "tenant".to_string(),
1839 data: serde_json::json!({}),
1840 timestamp: Utc::now(),
1841 };
1842
1843 repo.record_audit_event(&event1).await.unwrap();
1844 repo.record_audit_event(&event2).await.unwrap();
1845
1846 let all = repo
1848 .query_audit_events(&AuditQuery::new(tenant_id))
1849 .await
1850 .unwrap();
1851 assert_eq!(all.len(), 2);
1852
1853 let config_events = repo
1855 .query_audit_events(
1856 &AuditQuery::new(tenant_id).with_event_type("config_changed".to_string()),
1857 )
1858 .await
1859 .unwrap();
1860 assert_eq!(config_events.len(), 1);
1861 assert_eq!(config_events[0].event_type, "config_changed");
1862 assert_eq!(config_events[0].actor, "admin");
1863
1864 let limited = repo
1866 .query_audit_events(&AuditQuery::new(tenant_id).with_limit(1))
1867 .await
1868 .unwrap();
1869 assert_eq!(limited.len(), 1);
1870 }
1871
1872 #[tokio::test]
1873 async fn test_metadata_health_check() {
1874 let repo = test_metadata().await;
1875 assert!(repo.health_check().await.is_ok());
1876 }
1877}