Skip to main content

otelite_storage/sqlite/
mod.rs

1//! SQLite backend implementation
2
3use crate::StorageConfig;
4use async_trait::async_trait;
5use chrono::Timelike;
6use otelite_core::storage::{
7    PurgeAllStats, PurgeOptions, QueryParams, Result, StorageBackend, StorageError, StorageStats,
8};
9use otelite_core::telemetry::{LogRecord, Metric, Span};
10use parking_lot::Mutex;
11use rusqlite::Connection;
12use std::path::PathBuf;
13use std::sync::Arc;
14
15pub mod purge;
16pub mod reader;
17pub mod schema;
18pub mod writer;
19
20/// SQLite storage backend
21pub struct SqliteBackend {
22    config: StorageConfig,
23    conn: Arc<Mutex<Option<Connection>>>,
24    purge_lock: Arc<purge::PurgeLock>,
25    purge_handle: Arc<Mutex<Option<tokio::task::JoinHandle<()>>>>,
26}
27
28impl SqliteBackend {
29    /// Create a new SQLite backend with the given configuration
30    pub fn new(config: StorageConfig) -> Self {
31        Self {
32            config,
33            conn: Arc::new(Mutex::new(None)),
34            purge_lock: Arc::new(purge::PurgeLock::new()),
35            purge_handle: Arc::new(Mutex::new(None)),
36        }
37    }
38
39    fn db_path(&self) -> PathBuf {
40        if self
41            .config
42            .data_dir
43            .to_string_lossy()
44            .starts_with(":memory:")
45        {
46            self.config.data_dir.clone()
47        } else {
48            self.config.data_dir.join("otelite.db")
49        }
50    }
51}
52
53#[async_trait]
54impl StorageBackend for SqliteBackend {
55    async fn initialize(&mut self) -> Result<()> {
56        let db_path = self.db_path();
57
58        if !db_path.to_string_lossy().starts_with(":memory:") {
59            std::fs::create_dir_all(&self.config.data_dir).map_err(|e| {
60                StorageError::InitializationError(format!("Failed to create data directory: {}", e))
61            })?;
62        }
63
64        let conn = Connection::open(&db_path).map_err(|e| {
65            StorageError::InitializationError(format!("Failed to open database: {}", e))
66        })?;
67
68        conn.execute_batch("PRAGMA journal_mode=WAL; PRAGMA synchronous=NORMAL;")
69            .map_err(|e| {
70                StorageError::InitializationError(format!("Failed to configure SQLite: {}", e))
71            })?;
72
73        schema::initialize_schema(&conn).map_err(StorageError::from)?;
74
75        *self.conn.lock() = Some(conn);
76
77        if self.config.retention_days > 0 {
78            self.start_purge_scheduler(db_path);
79        }
80
81        Ok(())
82    }
83
84    async fn write_log(&self, log: &LogRecord) -> Result<()> {
85        let conn_guard = self.conn.lock();
86        let conn = conn_guard
87            .as_ref()
88            .ok_or_else(|| StorageError::WriteError("Database not initialized".to_string()))?;
89
90        writer::write_log(conn, log).map_err(StorageError::from)
91    }
92
93    async fn write_span(&self, span: &Span) -> Result<()> {
94        let conn_guard = self.conn.lock();
95        let conn = conn_guard
96            .as_ref()
97            .ok_or_else(|| StorageError::WriteError("Database not initialized".to_string()))?;
98
99        writer::write_span(conn, span).map_err(StorageError::from)
100    }
101
102    async fn write_metric(&self, metric: &Metric) -> Result<()> {
103        let conn_guard = self.conn.lock();
104        let conn = conn_guard
105            .as_ref()
106            .ok_or_else(|| StorageError::WriteError("Database not initialized".to_string()))?;
107
108        writer::write_metric(conn, metric).map_err(StorageError::from)
109    }
110
111    async fn query_logs(&self, params: &QueryParams) -> Result<Vec<LogRecord>> {
112        let conn_guard = self.conn.lock();
113        let conn = conn_guard
114            .as_ref()
115            .ok_or_else(|| StorageError::QueryError("Database not initialized".to_string()))?;
116
117        reader::query_logs(conn, params).map_err(StorageError::from)
118    }
119
120    async fn query_spans(&self, params: &QueryParams) -> Result<Vec<Span>> {
121        let conn_guard = self.conn.lock();
122        let conn = conn_guard
123            .as_ref()
124            .ok_or_else(|| StorageError::QueryError("Database not initialized".to_string()))?;
125
126        reader::query_spans(conn, params).map_err(StorageError::from)
127    }
128
129    async fn query_spans_for_trace_list(
130        &self,
131        params: &QueryParams,
132        trace_limit: usize,
133    ) -> Result<Vec<Span>> {
134        let conn_guard = self.conn.lock();
135        let conn = conn_guard
136            .as_ref()
137            .ok_or_else(|| StorageError::QueryError("Database not initialized".to_string()))?;
138
139        reader::query_spans_for_trace_list(conn, params, trace_limit).map_err(StorageError::from)
140    }
141
142    async fn query_metrics(&self, params: &QueryParams) -> Result<Vec<Metric>> {
143        let conn_guard = self.conn.lock();
144        let conn = conn_guard
145            .as_ref()
146            .ok_or_else(|| StorageError::QueryError("Database not initialized".to_string()))?;
147
148        reader::query_metrics(conn, params).map_err(StorageError::from)
149    }
150
151    async fn query_latest_metrics(&self, params: &QueryParams) -> Result<Vec<Metric>> {
152        let conn_guard = self.conn.lock();
153        let conn = conn_guard
154            .as_ref()
155            .ok_or_else(|| StorageError::QueryError("Database not initialized".to_string()))?;
156
157        reader::query_latest_metrics(conn, params).map_err(StorageError::from)
158    }
159
160    async fn stats(&self) -> Result<StorageStats> {
161        let conn_guard = self.conn.lock();
162        let conn = conn_guard
163            .as_ref()
164            .ok_or_else(|| StorageError::QueryError("Database not initialized".to_string()))?;
165
166        reader::get_stats(conn).map_err(StorageError::from)
167    }
168
169    async fn purge(&self, options: &PurgeOptions) -> Result<u64> {
170        let _guard = self
171            .purge_lock
172            .try_lock()
173            .await
174            .map_err(StorageError::from)?;
175
176        let mut conn_guard = self.conn.lock();
177        let conn = conn_guard
178            .as_mut()
179            .ok_or_else(|| StorageError::WriteError("Database not initialized".to_string()))?;
180
181        let cutoff_timestamp = if let Some(older_than) = options.older_than {
182            older_than
183        } else {
184            let cutoff =
185                chrono::Utc::now() - chrono::Duration::days(self.config.retention_days as i64);
186            cutoff.timestamp_nanos_opt().unwrap_or(0)
187        };
188
189        let record = purge::purge_old_data(
190            conn,
191            cutoff_timestamp,
192            10000,
193            &options.signal_types,
194            options.dry_run,
195        )
196        .map_err(StorageError::from)?;
197
198        if !options.dry_run {
199            purge::vacuum(conn).map_err(StorageError::from)?;
200        }
201
202        let total_deleted = record.logs_deleted + record.spans_deleted + record.metrics_deleted;
203        Ok(total_deleted as u64)
204    }
205
206    async fn purge_all(&self) -> Result<PurgeAllStats> {
207        let _guard = self
208            .purge_lock
209            .try_lock()
210            .await
211            .map_err(StorageError::from)?;
212
213        let mut conn_guard = self.conn.lock();
214        let conn = conn_guard
215            .as_mut()
216            .ok_or_else(|| StorageError::WriteError("Database not initialized".to_string()))?;
217
218        let tx = conn
219            .transaction()
220            .map_err(|e| StorageError::WriteError(format!("Failed to start transaction: {}", e)))?;
221
222        let logs_deleted = tx
223            .execute("DELETE FROM logs", [])
224            .map_err(|e| StorageError::WriteError(format!("Failed to delete logs: {}", e)))?
225            as u64;
226        let spans_deleted = tx
227            .execute("DELETE FROM spans", [])
228            .map_err(|e| StorageError::WriteError(format!("Failed to delete spans: {}", e)))?
229            as u64;
230        let metrics_deleted = tx
231            .execute("DELETE FROM metrics", [])
232            .map_err(|e| StorageError::WriteError(format!("Failed to delete metrics: {}", e)))?
233            as u64;
234
235        tx.commit().map_err(|e| {
236            StorageError::WriteError(format!("Failed to commit transaction: {}", e))
237        })?;
238
239        purge::vacuum(conn).map_err(StorageError::from)?;
240
241        Ok(PurgeAllStats {
242            logs_deleted,
243            spans_deleted,
244            metrics_deleted,
245        })
246    }
247
248    async fn distinct_resource_keys(&self, signal: &str) -> Result<Vec<String>> {
249        let conn_guard = self.conn.lock();
250        let conn = conn_guard
251            .as_ref()
252            .ok_or_else(|| StorageError::QueryError("Database not initialized".to_string()))?;
253
254        reader::distinct_resource_keys(conn, signal).map_err(StorageError::from)
255    }
256
257    async fn query_token_usage(
258        &self,
259        start_time: Option<i64>,
260        end_time: Option<i64>,
261        model: Option<&str>,
262    ) -> Result<(
263        otelite_core::api::TokenUsageSummary,
264        Vec<otelite_core::api::ModelUsage>,
265        Vec<otelite_core::api::SystemUsage>,
266    )> {
267        let conn_guard = self.conn.lock();
268        let conn = conn_guard
269            .as_ref()
270            .ok_or_else(|| StorageError::QueryError("Database not initialized".to_string()))?;
271
272        reader::query_token_usage(conn, start_time, end_time, model).map_err(StorageError::from)
273    }
274
275    async fn query_cost_series(
276        &self,
277        start_time: Option<i64>,
278        end_time: Option<i64>,
279        bucket_ns: i64,
280        model: Option<&str>,
281    ) -> Result<Vec<otelite_core::api::CostSeriesPoint>> {
282        let conn_guard = self.conn.lock();
283        let conn = conn_guard
284            .as_ref()
285            .ok_or_else(|| StorageError::QueryError("Database not initialized".to_string()))?;
286
287        reader::query_cost_series(conn, start_time, end_time, bucket_ns, model)
288            .map_err(StorageError::from)
289    }
290
291    async fn query_top_spans(
292        &self,
293        start_time: Option<i64>,
294        end_time: Option<i64>,
295        limit: usize,
296        sort_by: otelite_core::api::TopSpanSort,
297        truncated_only: bool,
298    ) -> Result<Vec<otelite_core::api::TopSpan>> {
299        let conn_guard = self.conn.lock();
300        let conn = conn_guard
301            .as_ref()
302            .ok_or_else(|| StorageError::QueryError("Database not initialized".to_string()))?;
303
304        reader::query_top_spans(conn, start_time, end_time, limit, sort_by, truncated_only)
305            .map_err(StorageError::from)
306    }
307
308    async fn query_top_sessions(
309        &self,
310        start_time: Option<i64>,
311        end_time: Option<i64>,
312        limit: usize,
313    ) -> Result<Vec<otelite_core::api::SessionCostRow>> {
314        let conn_guard = self.conn.lock();
315        let conn = conn_guard
316            .as_ref()
317            .ok_or_else(|| StorageError::QueryError("Database not initialized".to_string()))?;
318
319        reader::query_top_sessions(conn, start_time, end_time, limit).map_err(StorageError::from)
320    }
321
322    async fn query_top_conversations(
323        &self,
324        start_time: Option<i64>,
325        end_time: Option<i64>,
326        limit: usize,
327    ) -> Result<Vec<otelite_core::api::ConversationCostRow>> {
328        let conn_guard = self.conn.lock();
329        let conn = conn_guard
330            .as_ref()
331            .ok_or_else(|| StorageError::QueryError("Database not initialized".to_string()))?;
332
333        reader::query_top_conversations(conn, start_time, end_time, limit)
334            .map_err(StorageError::from)
335    }
336
337    async fn query_finish_reasons(
338        &self,
339        start_time: Option<i64>,
340        end_time: Option<i64>,
341        model: Option<&str>,
342    ) -> Result<Vec<otelite_core::api::FinishReasonCount>> {
343        let conn_guard = self.conn.lock();
344        let conn = conn_guard
345            .as_ref()
346            .ok_or_else(|| StorageError::QueryError("Database not initialized".to_string()))?;
347
348        reader::query_finish_reasons(conn, start_time, end_time, model).map_err(StorageError::from)
349    }
350
351    async fn query_latency_stats(
352        &self,
353        start_time: Option<i64>,
354        end_time: Option<i64>,
355        model: Option<&str>,
356    ) -> Result<Vec<otelite_core::api::LatencyStats>> {
357        let conn_guard = self.conn.lock();
358        let conn = conn_guard
359            .as_ref()
360            .ok_or_else(|| StorageError::QueryError("Database not initialized".to_string()))?;
361
362        reader::query_latency_stats(conn, start_time, end_time, model).map_err(StorageError::from)
363    }
364
365    async fn query_error_rate(
366        &self,
367        start_time: Option<i64>,
368        end_time: Option<i64>,
369        model: Option<&str>,
370    ) -> Result<Vec<otelite_core::api::ErrorRateByModel>> {
371        let conn_guard = self.conn.lock();
372        let conn = conn_guard
373            .as_ref()
374            .ok_or_else(|| StorageError::QueryError("Database not initialized".to_string()))?;
375
376        reader::query_error_rate(conn, start_time, end_time, model).map_err(StorageError::from)
377    }
378
379    async fn query_tool_usage(
380        &self,
381        start_time: Option<i64>,
382        end_time: Option<i64>,
383        limit: usize,
384    ) -> Result<Vec<otelite_core::api::ToolUsage>> {
385        let conn_guard = self.conn.lock();
386        let conn = conn_guard
387            .as_ref()
388            .ok_or_else(|| StorageError::QueryError("Database not initialized".to_string()))?;
389
390        reader::query_tool_usage(conn, start_time, end_time, limit).map_err(StorageError::from)
391    }
392
393    async fn query_retry_stats(
394        &self,
395        start_time: Option<i64>,
396        end_time: Option<i64>,
397    ) -> Result<otelite_core::api::RetryStats> {
398        let conn_guard = self.conn.lock();
399        let conn = conn_guard
400            .as_ref()
401            .ok_or_else(|| StorageError::QueryError("Database not initialized".to_string()))?;
402
403        reader::query_retry_stats(conn, start_time, end_time).map_err(StorageError::from)
404    }
405
406    async fn query_retrieval_stats(
407        &self,
408        start_time: Option<i64>,
409        end_time: Option<i64>,
410        top_queries_limit: usize,
411    ) -> Result<otelite_core::api::RetrievalStats> {
412        let conn_guard = self.conn.lock();
413        let conn = conn_guard
414            .as_ref()
415            .ok_or_else(|| StorageError::QueryError("Database not initialized".to_string()))?;
416
417        reader::query_retrieval_stats(conn, start_time, end_time, top_queries_limit)
418            .map_err(StorageError::from)
419    }
420
421    async fn query_truncation_rate(
422        &self,
423        start_time: Option<i64>,
424        end_time: Option<i64>,
425        model: Option<&str>,
426    ) -> Result<Vec<otelite_core::api::TruncationRateByModel>> {
427        let conn_guard = self.conn.lock();
428        let conn = conn_guard
429            .as_ref()
430            .ok_or_else(|| StorageError::QueryError("Database not initialized".to_string()))?;
431        reader::query_truncation_rate(conn, start_time, end_time, model).map_err(StorageError::from)
432    }
433
434    async fn query_cache_hit_rate(
435        &self,
436        start_time: Option<i64>,
437        end_time: Option<i64>,
438        model: Option<&str>,
439    ) -> Result<Vec<otelite_core::api::CacheHitRateByModel>> {
440        let conn_guard = self.conn.lock();
441        let conn = conn_guard
442            .as_ref()
443            .ok_or_else(|| StorageError::QueryError("Database not initialized".to_string()))?;
444        reader::query_cache_hit_rate(conn, start_time, end_time, model).map_err(StorageError::from)
445    }
446
447    async fn query_request_param_profile(
448        &self,
449        start_time: Option<i64>,
450        end_time: Option<i64>,
451    ) -> Result<otelite_core::api::RequestParamProfile> {
452        let conn_guard = self.conn.lock();
453        let conn = conn_guard
454            .as_ref()
455            .ok_or_else(|| StorageError::QueryError("Database not initialized".to_string()))?;
456        reader::query_request_param_profile(conn, start_time, end_time).map_err(StorageError::from)
457    }
458
459    async fn query_conversation_depth(
460        &self,
461        start_time: Option<i64>,
462        end_time: Option<i64>,
463    ) -> Result<otelite_core::api::ConversationDepthStats> {
464        let conn_guard = self.conn.lock();
465        let conn = conn_guard
466            .as_ref()
467            .ok_or_else(|| StorageError::QueryError("Database not initialized".to_string()))?;
468        reader::query_conversation_depth(conn, start_time, end_time).map_err(StorageError::from)
469    }
470
471    async fn query_calls_series(
472        &self,
473        start_time: Option<i64>,
474        end_time: Option<i64>,
475        bucket_secs: u64,
476    ) -> Result<Vec<otelite_core::api::CallsSeriesPoint>> {
477        let conn_guard = self.conn.lock();
478        let conn = conn_guard
479            .as_ref()
480            .ok_or_else(|| StorageError::QueryError("Database not initialized".to_string()))?;
481        reader::query_calls_series(conn, start_time, end_time, bucket_secs)
482            .map_err(StorageError::from)
483    }
484
485    async fn query_error_types(
486        &self,
487        start_time: Option<i64>,
488        end_time: Option<i64>,
489        model: Option<&str>,
490    ) -> Result<Vec<otelite_core::api::ErrorTypeBreakdown>> {
491        let conn_guard = self.conn.lock();
492        let conn = conn_guard
493            .as_ref()
494            .ok_or_else(|| StorageError::QueryError("Database not initialized".to_string()))?;
495        reader::query_error_types(conn, start_time, end_time, model).map_err(StorageError::from)
496    }
497
498    async fn query_model_drift(
499        &self,
500        start_time: Option<i64>,
501        end_time: Option<i64>,
502    ) -> Result<Vec<otelite_core::api::ModelDriftPair>> {
503        let conn_guard = self.conn.lock();
504        let conn = conn_guard
505            .as_ref()
506            .ok_or_else(|| StorageError::QueryError("Database not initialized".to_string()))?;
507        reader::query_model_drift(conn, start_time, end_time).map_err(StorageError::from)
508    }
509
510    async fn close(&mut self) -> Result<()> {
511        if let Some(handle) = self.purge_handle.lock().take() {
512            handle.abort();
513        }
514
515        let mut conn_guard = self.conn.lock();
516        if let Some(conn) = conn_guard.take() {
517            conn.close()
518                .map_err(|(_, e)| StorageError::DatabaseError(e.to_string()))?;
519        }
520        Ok(())
521    }
522}
523
524impl SqliteBackend {
525    fn start_purge_scheduler(&self, db_path: PathBuf) {
526        let config = self.config.clone();
527        let purge_lock = self.purge_lock.clone();
528
529        let handle = tokio::spawn(async move {
530            // Dedicated connection: purge never competes with the main conn mutex.
531            let mut conn = match Connection::open(&db_path) {
532                Ok(c) => c,
533                Err(e) => {
534                    tracing::error!("Purge scheduler: failed to open DB connection: {}", e);
535                    return;
536                },
537            };
538            if let Err(e) =
539                conn.execute_batch("PRAGMA journal_mode=WAL; PRAGMA synchronous=NORMAL;")
540            {
541                tracing::warn!("Purge scheduler: failed to set WAL mode: {}", e);
542            }
543
544            loop {
545                let now = chrono::Local::now();
546                let next_purge = if now.hour() < 2 {
547                    now.date_naive().and_hms_opt(2, 0, 0).unwrap()
548                } else {
549                    (now.date_naive() + chrono::Duration::days(1))
550                        .and_hms_opt(2, 0, 0)
551                        .unwrap()
552                };
553                let next_purge =
554                    chrono::TimeZone::from_local_datetime(&chrono::Local, &next_purge).unwrap();
555                let duration = (next_purge - now)
556                    .to_std()
557                    .unwrap_or(std::time::Duration::from_secs(86400));
558
559                tokio::time::sleep(duration).await;
560
561                if let Ok(_guard) = purge_lock.try_lock().await {
562                    let cutoff =
563                        chrono::Utc::now() - chrono::Duration::days(config.retention_days as i64);
564                    let cutoff_timestamp = cutoff.timestamp_nanos_opt().unwrap_or(0);
565
566                    if let Ok(record) = purge::purge_old_data(
567                        &mut conn,
568                        cutoff_timestamp,
569                        10000,
570                        &[
571                            crate::SignalType::Logs,
572                            crate::SignalType::Traces,
573                            crate::SignalType::Metrics,
574                        ],
575                        false,
576                    ) {
577                        tracing::info!(
578                            "Automatic purge completed: {} logs, {} spans, {} metrics deleted",
579                            record.logs_deleted,
580                            record.spans_deleted,
581                            record.metrics_deleted
582                        );
583
584                        let _ = purge::vacuum(&mut conn);
585                    }
586                }
587            }
588        });
589
590        *self.purge_handle.lock() = Some(handle);
591    }
592}
593
594#[cfg(test)]
595mod tests {
596    use super::*;
597    use tempfile::TempDir;
598
599    #[tokio::test]
600    async fn test_sqlite_backend_creation() {
601        let temp_dir = TempDir::new().unwrap();
602        let config = StorageConfig::default().with_data_dir(temp_dir.path().to_path_buf());
603
604        let backend = SqliteBackend::new(config);
605        assert!(backend.conn.lock().is_none());
606    }
607
608    #[tokio::test]
609    async fn test_sqlite_backend_initialization() {
610        let temp_dir = TempDir::new().unwrap();
611        let config = StorageConfig::default().with_data_dir(temp_dir.path().to_path_buf());
612
613        let mut backend = SqliteBackend::new(config);
614        let result = backend.initialize().await;
615        assert!(result.is_ok());
616        assert!(backend.conn.lock().is_some());
617    }
618
619    #[tokio::test]
620    async fn test_stats_returns_counts() {
621        use otelite_core::telemetry::log::SeverityLevel;
622        use std::collections::HashMap;
623
624        let temp_dir = TempDir::new().unwrap();
625        let config = StorageConfig::default().with_data_dir(temp_dir.path().to_path_buf());
626
627        let mut backend = SqliteBackend::new(config);
628        backend.initialize().await.unwrap();
629
630        let log = LogRecord {
631            timestamp: 1000,
632            observed_timestamp: Some(1000),
633            severity: SeverityLevel::Info,
634            severity_text: Some("INFO".to_string()),
635            body: "test log".to_string(),
636            trace_id: None,
637            span_id: None,
638            attributes: HashMap::new(),
639            resource: None,
640        };
641        backend.write_log(&log).await.unwrap();
642
643        let stats = backend.stats().await.unwrap();
644
645        assert_eq!(stats.log_count, 1);
646        assert_eq!(stats.span_count, 0);
647        assert_eq!(stats.metric_count, 0);
648        assert!(stats.storage_size_bytes > 0);
649    }
650}