1use crate::StorageConfig;
4use async_trait::async_trait;
5use chrono::Timelike;
6use otelite_core::storage::{
7 PurgeAllStats, PurgeOptions, QueryParams, Result, StorageBackend, StorageError, StorageStats,
8};
9use otelite_core::telemetry::{LogRecord, Metric, Span};
10use parking_lot::Mutex;
11use rusqlite::Connection;
12use std::path::PathBuf;
13use std::sync::Arc;
14
15pub mod purge;
16pub mod reader;
17pub mod schema;
18pub mod writer;
19
20pub struct SqliteBackend {
22 config: StorageConfig,
23 conn: Arc<Mutex<Option<Connection>>>,
24 purge_lock: Arc<purge::PurgeLock>,
25 purge_handle: Arc<Mutex<Option<tokio::task::JoinHandle<()>>>>,
26}
27
28impl SqliteBackend {
29 pub fn new(config: StorageConfig) -> Self {
31 Self {
32 config,
33 conn: Arc::new(Mutex::new(None)),
34 purge_lock: Arc::new(purge::PurgeLock::new()),
35 purge_handle: Arc::new(Mutex::new(None)),
36 }
37 }
38
39 fn db_path(&self) -> PathBuf {
40 if self
41 .config
42 .data_dir
43 .to_string_lossy()
44 .starts_with(":memory:")
45 {
46 self.config.data_dir.clone()
47 } else {
48 self.config.data_dir.join("otelite.db")
49 }
50 }
51}
52
53#[async_trait]
54impl StorageBackend for SqliteBackend {
55 async fn initialize(&mut self) -> Result<()> {
56 let db_path = self.db_path();
57
58 if !db_path.to_string_lossy().starts_with(":memory:") {
59 std::fs::create_dir_all(&self.config.data_dir).map_err(|e| {
60 StorageError::InitializationError(format!("Failed to create data directory: {}", e))
61 })?;
62 }
63
64 let conn = Connection::open(&db_path).map_err(|e| {
65 StorageError::InitializationError(format!("Failed to open database: {}", e))
66 })?;
67
68 conn.execute_batch("PRAGMA journal_mode=WAL; PRAGMA synchronous=NORMAL;")
69 .map_err(|e| {
70 StorageError::InitializationError(format!("Failed to configure SQLite: {}", e))
71 })?;
72
73 schema::initialize_schema(&conn).map_err(StorageError::from)?;
74
75 *self.conn.lock() = Some(conn);
76
77 if self.config.retention_days > 0 {
78 self.start_purge_scheduler(db_path);
79 }
80
81 Ok(())
82 }
83
84 async fn write_log(&self, log: &LogRecord) -> Result<()> {
85 let conn_guard = self.conn.lock();
86 let conn = conn_guard
87 .as_ref()
88 .ok_or_else(|| StorageError::WriteError("Database not initialized".to_string()))?;
89
90 writer::write_log(conn, log).map_err(StorageError::from)
91 }
92
93 async fn write_span(&self, span: &Span) -> Result<()> {
94 let conn_guard = self.conn.lock();
95 let conn = conn_guard
96 .as_ref()
97 .ok_or_else(|| StorageError::WriteError("Database not initialized".to_string()))?;
98
99 writer::write_span(conn, span).map_err(StorageError::from)
100 }
101
102 async fn write_metric(&self, metric: &Metric) -> Result<()> {
103 let conn_guard = self.conn.lock();
104 let conn = conn_guard
105 .as_ref()
106 .ok_or_else(|| StorageError::WriteError("Database not initialized".to_string()))?;
107
108 writer::write_metric(conn, metric).map_err(StorageError::from)
109 }
110
111 async fn query_logs(&self, params: &QueryParams) -> Result<Vec<LogRecord>> {
112 let conn_guard = self.conn.lock();
113 let conn = conn_guard
114 .as_ref()
115 .ok_or_else(|| StorageError::QueryError("Database not initialized".to_string()))?;
116
117 reader::query_logs(conn, params).map_err(StorageError::from)
118 }
119
120 async fn query_spans(&self, params: &QueryParams) -> Result<Vec<Span>> {
121 let conn_guard = self.conn.lock();
122 let conn = conn_guard
123 .as_ref()
124 .ok_or_else(|| StorageError::QueryError("Database not initialized".to_string()))?;
125
126 reader::query_spans(conn, params).map_err(StorageError::from)
127 }
128
129 async fn query_spans_for_trace_list(
130 &self,
131 params: &QueryParams,
132 trace_limit: usize,
133 ) -> Result<Vec<Span>> {
134 let conn_guard = self.conn.lock();
135 let conn = conn_guard
136 .as_ref()
137 .ok_or_else(|| StorageError::QueryError("Database not initialized".to_string()))?;
138
139 reader::query_spans_for_trace_list(conn, params, trace_limit).map_err(StorageError::from)
140 }
141
142 async fn query_metrics(&self, params: &QueryParams) -> Result<Vec<Metric>> {
143 let conn_guard = self.conn.lock();
144 let conn = conn_guard
145 .as_ref()
146 .ok_or_else(|| StorageError::QueryError("Database not initialized".to_string()))?;
147
148 reader::query_metrics(conn, params).map_err(StorageError::from)
149 }
150
151 async fn query_latest_metrics(&self, params: &QueryParams) -> Result<Vec<Metric>> {
152 let conn_guard = self.conn.lock();
153 let conn = conn_guard
154 .as_ref()
155 .ok_or_else(|| StorageError::QueryError("Database not initialized".to_string()))?;
156
157 reader::query_latest_metrics(conn, params).map_err(StorageError::from)
158 }
159
160 async fn stats(&self) -> Result<StorageStats> {
161 let conn_guard = self.conn.lock();
162 let conn = conn_guard
163 .as_ref()
164 .ok_or_else(|| StorageError::QueryError("Database not initialized".to_string()))?;
165
166 reader::get_stats(conn).map_err(StorageError::from)
167 }
168
169 async fn purge(&self, options: &PurgeOptions) -> Result<u64> {
170 let _guard = self
171 .purge_lock
172 .try_lock()
173 .await
174 .map_err(StorageError::from)?;
175
176 let mut conn_guard = self.conn.lock();
177 let conn = conn_guard
178 .as_mut()
179 .ok_or_else(|| StorageError::WriteError("Database not initialized".to_string()))?;
180
181 let cutoff_timestamp = if let Some(older_than) = options.older_than {
182 older_than
183 } else {
184 let cutoff =
185 chrono::Utc::now() - chrono::Duration::days(self.config.retention_days as i64);
186 cutoff.timestamp_nanos_opt().unwrap_or(0)
187 };
188
189 let record = purge::purge_old_data(
190 conn,
191 cutoff_timestamp,
192 10000,
193 &options.signal_types,
194 options.dry_run,
195 )
196 .map_err(StorageError::from)?;
197
198 if !options.dry_run {
199 purge::vacuum(conn).map_err(StorageError::from)?;
200 }
201
202 let total_deleted = record.logs_deleted + record.spans_deleted + record.metrics_deleted;
203 Ok(total_deleted as u64)
204 }
205
206 async fn purge_all(&self) -> Result<PurgeAllStats> {
207 let _guard = self
208 .purge_lock
209 .try_lock()
210 .await
211 .map_err(StorageError::from)?;
212
213 let mut conn_guard = self.conn.lock();
214 let conn = conn_guard
215 .as_mut()
216 .ok_or_else(|| StorageError::WriteError("Database not initialized".to_string()))?;
217
218 let tx = conn
219 .transaction()
220 .map_err(|e| StorageError::WriteError(format!("Failed to start transaction: {}", e)))?;
221
222 let logs_deleted = tx
223 .execute("DELETE FROM logs", [])
224 .map_err(|e| StorageError::WriteError(format!("Failed to delete logs: {}", e)))?
225 as u64;
226 let spans_deleted = tx
227 .execute("DELETE FROM spans", [])
228 .map_err(|e| StorageError::WriteError(format!("Failed to delete spans: {}", e)))?
229 as u64;
230 let metrics_deleted = tx
231 .execute("DELETE FROM metrics", [])
232 .map_err(|e| StorageError::WriteError(format!("Failed to delete metrics: {}", e)))?
233 as u64;
234
235 tx.commit().map_err(|e| {
236 StorageError::WriteError(format!("Failed to commit transaction: {}", e))
237 })?;
238
239 purge::vacuum(conn).map_err(StorageError::from)?;
240
241 Ok(PurgeAllStats {
242 logs_deleted,
243 spans_deleted,
244 metrics_deleted,
245 })
246 }
247
248 async fn distinct_resource_keys(&self, signal: &str) -> Result<Vec<String>> {
249 let conn_guard = self.conn.lock();
250 let conn = conn_guard
251 .as_ref()
252 .ok_or_else(|| StorageError::QueryError("Database not initialized".to_string()))?;
253
254 reader::distinct_resource_keys(conn, signal).map_err(StorageError::from)
255 }
256
257 async fn query_token_usage(
258 &self,
259 start_time: Option<i64>,
260 end_time: Option<i64>,
261 model: Option<&str>,
262 ) -> Result<(
263 otelite_core::api::TokenUsageSummary,
264 Vec<otelite_core::api::ModelUsage>,
265 Vec<otelite_core::api::SystemUsage>,
266 )> {
267 let conn_guard = self.conn.lock();
268 let conn = conn_guard
269 .as_ref()
270 .ok_or_else(|| StorageError::QueryError("Database not initialized".to_string()))?;
271
272 reader::query_token_usage(conn, start_time, end_time, model).map_err(StorageError::from)
273 }
274
275 async fn query_cost_series(
276 &self,
277 start_time: Option<i64>,
278 end_time: Option<i64>,
279 bucket_ns: i64,
280 model: Option<&str>,
281 ) -> Result<Vec<otelite_core::api::CostSeriesPoint>> {
282 let conn_guard = self.conn.lock();
283 let conn = conn_guard
284 .as_ref()
285 .ok_or_else(|| StorageError::QueryError("Database not initialized".to_string()))?;
286
287 reader::query_cost_series(conn, start_time, end_time, bucket_ns, model)
288 .map_err(StorageError::from)
289 }
290
291 async fn query_top_spans(
292 &self,
293 start_time: Option<i64>,
294 end_time: Option<i64>,
295 limit: usize,
296 sort_by: otelite_core::api::TopSpanSort,
297 truncated_only: bool,
298 ) -> Result<Vec<otelite_core::api::TopSpan>> {
299 let conn_guard = self.conn.lock();
300 let conn = conn_guard
301 .as_ref()
302 .ok_or_else(|| StorageError::QueryError("Database not initialized".to_string()))?;
303
304 reader::query_top_spans(conn, start_time, end_time, limit, sort_by, truncated_only)
305 .map_err(StorageError::from)
306 }
307
308 async fn query_top_sessions(
309 &self,
310 start_time: Option<i64>,
311 end_time: Option<i64>,
312 limit: usize,
313 ) -> Result<Vec<otelite_core::api::SessionCostRow>> {
314 let conn_guard = self.conn.lock();
315 let conn = conn_guard
316 .as_ref()
317 .ok_or_else(|| StorageError::QueryError("Database not initialized".to_string()))?;
318
319 reader::query_top_sessions(conn, start_time, end_time, limit).map_err(StorageError::from)
320 }
321
322 async fn query_top_conversations(
323 &self,
324 start_time: Option<i64>,
325 end_time: Option<i64>,
326 limit: usize,
327 ) -> Result<Vec<otelite_core::api::ConversationCostRow>> {
328 let conn_guard = self.conn.lock();
329 let conn = conn_guard
330 .as_ref()
331 .ok_or_else(|| StorageError::QueryError("Database not initialized".to_string()))?;
332
333 reader::query_top_conversations(conn, start_time, end_time, limit)
334 .map_err(StorageError::from)
335 }
336
337 async fn query_finish_reasons(
338 &self,
339 start_time: Option<i64>,
340 end_time: Option<i64>,
341 model: Option<&str>,
342 ) -> Result<Vec<otelite_core::api::FinishReasonCount>> {
343 let conn_guard = self.conn.lock();
344 let conn = conn_guard
345 .as_ref()
346 .ok_or_else(|| StorageError::QueryError("Database not initialized".to_string()))?;
347
348 reader::query_finish_reasons(conn, start_time, end_time, model).map_err(StorageError::from)
349 }
350
351 async fn query_latency_stats(
352 &self,
353 start_time: Option<i64>,
354 end_time: Option<i64>,
355 model: Option<&str>,
356 ) -> Result<Vec<otelite_core::api::LatencyStats>> {
357 let conn_guard = self.conn.lock();
358 let conn = conn_guard
359 .as_ref()
360 .ok_or_else(|| StorageError::QueryError("Database not initialized".to_string()))?;
361
362 reader::query_latency_stats(conn, start_time, end_time, model).map_err(StorageError::from)
363 }
364
365 async fn query_error_rate(
366 &self,
367 start_time: Option<i64>,
368 end_time: Option<i64>,
369 model: Option<&str>,
370 ) -> Result<Vec<otelite_core::api::ErrorRateByModel>> {
371 let conn_guard = self.conn.lock();
372 let conn = conn_guard
373 .as_ref()
374 .ok_or_else(|| StorageError::QueryError("Database not initialized".to_string()))?;
375
376 reader::query_error_rate(conn, start_time, end_time, model).map_err(StorageError::from)
377 }
378
379 async fn query_tool_usage(
380 &self,
381 start_time: Option<i64>,
382 end_time: Option<i64>,
383 limit: usize,
384 ) -> Result<Vec<otelite_core::api::ToolUsage>> {
385 let conn_guard = self.conn.lock();
386 let conn = conn_guard
387 .as_ref()
388 .ok_or_else(|| StorageError::QueryError("Database not initialized".to_string()))?;
389
390 reader::query_tool_usage(conn, start_time, end_time, limit).map_err(StorageError::from)
391 }
392
393 async fn query_retry_stats(
394 &self,
395 start_time: Option<i64>,
396 end_time: Option<i64>,
397 ) -> Result<otelite_core::api::RetryStats> {
398 let conn_guard = self.conn.lock();
399 let conn = conn_guard
400 .as_ref()
401 .ok_or_else(|| StorageError::QueryError("Database not initialized".to_string()))?;
402
403 reader::query_retry_stats(conn, start_time, end_time).map_err(StorageError::from)
404 }
405
406 async fn query_retrieval_stats(
407 &self,
408 start_time: Option<i64>,
409 end_time: Option<i64>,
410 top_queries_limit: usize,
411 ) -> Result<otelite_core::api::RetrievalStats> {
412 let conn_guard = self.conn.lock();
413 let conn = conn_guard
414 .as_ref()
415 .ok_or_else(|| StorageError::QueryError("Database not initialized".to_string()))?;
416
417 reader::query_retrieval_stats(conn, start_time, end_time, top_queries_limit)
418 .map_err(StorageError::from)
419 }
420
421 async fn query_truncation_rate(
422 &self,
423 start_time: Option<i64>,
424 end_time: Option<i64>,
425 model: Option<&str>,
426 ) -> Result<Vec<otelite_core::api::TruncationRateByModel>> {
427 let conn_guard = self.conn.lock();
428 let conn = conn_guard
429 .as_ref()
430 .ok_or_else(|| StorageError::QueryError("Database not initialized".to_string()))?;
431 reader::query_truncation_rate(conn, start_time, end_time, model).map_err(StorageError::from)
432 }
433
434 async fn query_cache_hit_rate(
435 &self,
436 start_time: Option<i64>,
437 end_time: Option<i64>,
438 model: Option<&str>,
439 ) -> Result<Vec<otelite_core::api::CacheHitRateByModel>> {
440 let conn_guard = self.conn.lock();
441 let conn = conn_guard
442 .as_ref()
443 .ok_or_else(|| StorageError::QueryError("Database not initialized".to_string()))?;
444 reader::query_cache_hit_rate(conn, start_time, end_time, model).map_err(StorageError::from)
445 }
446
447 async fn query_request_param_profile(
448 &self,
449 start_time: Option<i64>,
450 end_time: Option<i64>,
451 ) -> Result<otelite_core::api::RequestParamProfile> {
452 let conn_guard = self.conn.lock();
453 let conn = conn_guard
454 .as_ref()
455 .ok_or_else(|| StorageError::QueryError("Database not initialized".to_string()))?;
456 reader::query_request_param_profile(conn, start_time, end_time).map_err(StorageError::from)
457 }
458
459 async fn query_conversation_depth(
460 &self,
461 start_time: Option<i64>,
462 end_time: Option<i64>,
463 ) -> Result<otelite_core::api::ConversationDepthStats> {
464 let conn_guard = self.conn.lock();
465 let conn = conn_guard
466 .as_ref()
467 .ok_or_else(|| StorageError::QueryError("Database not initialized".to_string()))?;
468 reader::query_conversation_depth(conn, start_time, end_time).map_err(StorageError::from)
469 }
470
471 async fn query_calls_series(
472 &self,
473 start_time: Option<i64>,
474 end_time: Option<i64>,
475 bucket_secs: u64,
476 ) -> Result<Vec<otelite_core::api::CallsSeriesPoint>> {
477 let conn_guard = self.conn.lock();
478 let conn = conn_guard
479 .as_ref()
480 .ok_or_else(|| StorageError::QueryError("Database not initialized".to_string()))?;
481 reader::query_calls_series(conn, start_time, end_time, bucket_secs)
482 .map_err(StorageError::from)
483 }
484
485 async fn query_error_types(
486 &self,
487 start_time: Option<i64>,
488 end_time: Option<i64>,
489 model: Option<&str>,
490 ) -> Result<Vec<otelite_core::api::ErrorTypeBreakdown>> {
491 let conn_guard = self.conn.lock();
492 let conn = conn_guard
493 .as_ref()
494 .ok_or_else(|| StorageError::QueryError("Database not initialized".to_string()))?;
495 reader::query_error_types(conn, start_time, end_time, model).map_err(StorageError::from)
496 }
497
498 async fn query_model_drift(
499 &self,
500 start_time: Option<i64>,
501 end_time: Option<i64>,
502 ) -> Result<Vec<otelite_core::api::ModelDriftPair>> {
503 let conn_guard = self.conn.lock();
504 let conn = conn_guard
505 .as_ref()
506 .ok_or_else(|| StorageError::QueryError("Database not initialized".to_string()))?;
507 reader::query_model_drift(conn, start_time, end_time).map_err(StorageError::from)
508 }
509
510 async fn close(&mut self) -> Result<()> {
511 if let Some(handle) = self.purge_handle.lock().take() {
512 handle.abort();
513 }
514
515 let mut conn_guard = self.conn.lock();
516 if let Some(conn) = conn_guard.take() {
517 conn.close()
518 .map_err(|(_, e)| StorageError::DatabaseError(e.to_string()))?;
519 }
520 Ok(())
521 }
522}
523
524impl SqliteBackend {
525 fn start_purge_scheduler(&self, db_path: PathBuf) {
526 let config = self.config.clone();
527 let purge_lock = self.purge_lock.clone();
528
529 let handle = tokio::spawn(async move {
530 let mut conn = match Connection::open(&db_path) {
532 Ok(c) => c,
533 Err(e) => {
534 tracing::error!("Purge scheduler: failed to open DB connection: {}", e);
535 return;
536 },
537 };
538 if let Err(e) =
539 conn.execute_batch("PRAGMA journal_mode=WAL; PRAGMA synchronous=NORMAL;")
540 {
541 tracing::warn!("Purge scheduler: failed to set WAL mode: {}", e);
542 }
543
544 loop {
545 let now = chrono::Local::now();
546 let next_purge = if now.hour() < 2 {
547 now.date_naive().and_hms_opt(2, 0, 0).unwrap()
548 } else {
549 (now.date_naive() + chrono::Duration::days(1))
550 .and_hms_opt(2, 0, 0)
551 .unwrap()
552 };
553 let next_purge =
554 chrono::TimeZone::from_local_datetime(&chrono::Local, &next_purge).unwrap();
555 let duration = (next_purge - now)
556 .to_std()
557 .unwrap_or(std::time::Duration::from_secs(86400));
558
559 tokio::time::sleep(duration).await;
560
561 if let Ok(_guard) = purge_lock.try_lock().await {
562 let cutoff =
563 chrono::Utc::now() - chrono::Duration::days(config.retention_days as i64);
564 let cutoff_timestamp = cutoff.timestamp_nanos_opt().unwrap_or(0);
565
566 if let Ok(record) = purge::purge_old_data(
567 &mut conn,
568 cutoff_timestamp,
569 10000,
570 &[
571 crate::SignalType::Logs,
572 crate::SignalType::Traces,
573 crate::SignalType::Metrics,
574 ],
575 false,
576 ) {
577 tracing::info!(
578 "Automatic purge completed: {} logs, {} spans, {} metrics deleted",
579 record.logs_deleted,
580 record.spans_deleted,
581 record.metrics_deleted
582 );
583
584 let _ = purge::vacuum(&mut conn);
585 }
586 }
587 }
588 });
589
590 *self.purge_handle.lock() = Some(handle);
591 }
592}
593
594#[cfg(test)]
595mod tests {
596 use super::*;
597 use tempfile::TempDir;
598
599 #[tokio::test]
600 async fn test_sqlite_backend_creation() {
601 let temp_dir = TempDir::new().unwrap();
602 let config = StorageConfig::default().with_data_dir(temp_dir.path().to_path_buf());
603
604 let backend = SqliteBackend::new(config);
605 assert!(backend.conn.lock().is_none());
606 }
607
608 #[tokio::test]
609 async fn test_sqlite_backend_initialization() {
610 let temp_dir = TempDir::new().unwrap();
611 let config = StorageConfig::default().with_data_dir(temp_dir.path().to_path_buf());
612
613 let mut backend = SqliteBackend::new(config);
614 let result = backend.initialize().await;
615 assert!(result.is_ok());
616 assert!(backend.conn.lock().is_some());
617 }
618
619 #[tokio::test]
620 async fn test_stats_returns_counts() {
621 use otelite_core::telemetry::log::SeverityLevel;
622 use std::collections::HashMap;
623
624 let temp_dir = TempDir::new().unwrap();
625 let config = StorageConfig::default().with_data_dir(temp_dir.path().to_path_buf());
626
627 let mut backend = SqliteBackend::new(config);
628 backend.initialize().await.unwrap();
629
630 let log = LogRecord {
631 timestamp: 1000,
632 observed_timestamp: Some(1000),
633 severity: SeverityLevel::Info,
634 severity_text: Some("INFO".to_string()),
635 body: "test log".to_string(),
636 trace_id: None,
637 span_id: None,
638 attributes: HashMap::new(),
639 resource: None,
640 };
641 backend.write_log(&log).await.unwrap();
642
643 let stats = backend.stats().await.unwrap();
644
645 assert_eq!(stats.log_count, 1);
646 assert_eq!(stats.span_count, 0);
647 assert_eq!(stats.metric_count, 0);
648 assert!(stats.storage_size_bytes > 0);
649 }
650}