Skip to main content

otelite_core/
storage.rs

1//! Storage abstraction layer for otelite.
2//!
3//! Defines the `StorageBackend` trait and all associated types so that
4//! downstream crates (`otelite-receiver`, `otelite-api`) can depend only on
5//! `otelite-core` rather than the concrete SQLite implementation.
6
7use async_trait::async_trait;
8use thiserror::Error;
9
10use crate::api::{
11    CostSeriesPoint, ErrorRateByModel, FinishReasonCount, LatencyStats, ModelUsage, RetryStats,
12    SystemUsage, TokenUsageSummary, ToolUsage, TopSpan,
13};
14use crate::query::QueryPredicate;
15use crate::telemetry::log::SeverityLevel;
16use crate::telemetry::{LogRecord, Metric, Span};
17
18/// Result type for storage operations.
19pub type Result<T> = std::result::Result<T, StorageError>;
20
21/// Generic storage errors returned by `StorageBackend` implementations.
22///
23/// All variants carry string payloads so this type has no dependency on any
24/// database library. Backend-specific error types should convert to these via
25/// a `From` impl.
26#[derive(Error, Debug)]
27pub enum StorageError {
28    /// Storage initialization failed.
29    #[error("Failed to initialize storage: {0}")]
30    InitializationError(String),
31
32    /// Write operation failed.
33    #[error("Failed to write data: {0}")]
34    WriteError(String),
35
36    /// Query operation failed.
37    #[error("Failed to query data: {0}")]
38    QueryError(String),
39
40    /// Disk is full or insufficient space.
41    #[error("Insufficient disk space: {0}")]
42    DiskFullError(String),
43
44    /// Storage corruption detected.
45    #[error("Storage corruption detected: {0}")]
46    CorruptionError(String),
47
48    /// Permission denied.
49    #[error("Permission denied: {0}")]
50    PermissionError(String),
51
52    /// Configuration error.
53    #[error("Configuration error: {0}")]
54    ConfigError(String),
55
56    /// Purge operation failed.
57    #[error("Purge operation failed: {0}")]
58    PurgeError(String),
59
60    /// Underlying database error (string representation).
61    #[error("Database error: {0}")]
62    DatabaseError(String),
63
64    /// I/O error (string representation).
65    #[error("I/O error: {0}")]
66    IoError(String),
67
68    /// Serialization error (string representation).
69    #[error("Serialization error: {0}")]
70    SerializationError(String),
71}
72
73impl StorageError {
74    pub fn is_recoverable(&self) -> bool {
75        matches!(
76            self,
77            StorageError::WriteError(_) | StorageError::QueryError(_) | StorageError::PurgeError(_)
78        )
79    }
80
81    pub fn is_corruption(&self) -> bool {
82        matches!(self, StorageError::CorruptionError(_))
83    }
84
85    pub fn is_disk_full(&self) -> bool {
86        matches!(self, StorageError::DiskFullError(_))
87    }
88}
89
90/// Statistics returned after a `purge_all` operation.
91#[derive(Debug, Clone)]
92pub struct PurgeAllStats {
93    pub logs_deleted: u64,
94    pub spans_deleted: u64,
95    pub metrics_deleted: u64,
96}
97
98/// Statistics about stored telemetry data.
99#[derive(Debug, Clone)]
100pub struct StorageStats {
101    pub log_count: u64,
102    pub span_count: u64,
103    pub metric_count: u64,
104    /// Oldest record timestamp (nanoseconds since Unix epoch).
105    pub oldest_timestamp: Option<i64>,
106    /// Newest record timestamp (nanoseconds since Unix epoch).
107    pub newest_timestamp: Option<i64>,
108    pub storage_size_bytes: u64,
109}
110
111/// Query parameters for filtering telemetry data.
112#[derive(Debug, Clone, Default)]
113pub struct QueryParams {
114    pub start_time: Option<i64>,
115    pub end_time: Option<i64>,
116    pub limit: Option<usize>,
117    pub trace_id: Option<String>,
118    pub span_id: Option<String>,
119    pub min_severity: Option<SeverityLevel>,
120    pub search_text: Option<String>,
121    pub predicates: Vec<QueryPredicate>,
122}
123
124/// Options for manual data cleanup.
125#[derive(Debug, Clone)]
126pub struct PurgeOptions {
127    /// Purge data older than this timestamp (nanoseconds since Unix epoch).
128    pub older_than: Option<i64>,
129    pub signal_types: Vec<SignalType>,
130    pub dry_run: bool,
131}
132
133/// Signal type discriminator used in purge operations.
134#[derive(Debug, Clone, Copy, PartialEq, Eq)]
135pub enum SignalType {
136    Logs,
137    Traces,
138    Metrics,
139}
140
141/// Pluggable storage backend trait.
142///
143/// Both `otelite-receiver` (writes) and `otelite-api` (reads) depend only on
144/// this trait; neither needs a direct dependency on the SQLite implementation.
145#[async_trait]
146pub trait StorageBackend: Send + Sync {
147    async fn initialize(&mut self) -> Result<()>;
148    async fn write_log(&self, log: &LogRecord) -> Result<()>;
149    async fn write_span(&self, span: &Span) -> Result<()>;
150    async fn write_metric(&self, metric: &Metric) -> Result<()>;
151    async fn query_logs(&self, params: &QueryParams) -> Result<Vec<LogRecord>>;
152    async fn query_spans(&self, params: &QueryParams) -> Result<Vec<Span>>;
153    /// Query all spans for the N most-recent distinct traces matching the filters.
154    async fn query_spans_for_trace_list(
155        &self,
156        params: &QueryParams,
157        trace_limit: usize,
158    ) -> Result<Vec<Span>>;
159    /// Query metrics (raw time-series rows, latest first).
160    async fn query_metrics(&self, params: &QueryParams) -> Result<Vec<Metric>>;
161    /// Query metrics returning the single most-recent data point per unique name.
162    async fn query_latest_metrics(&self, params: &QueryParams) -> Result<Vec<Metric>>;
163    async fn stats(&self) -> Result<StorageStats>;
164    async fn purge(&self, options: &PurgeOptions) -> Result<u64>;
165    async fn purge_all(&self) -> Result<PurgeAllStats>;
166    async fn close(&mut self) -> Result<()>;
167    /// Return distinct resource attribute keys for the given signal type.
168    /// `signal` must be one of `"logs"`, `"spans"`, or `"metrics"`.
169    async fn distinct_resource_keys(&self, signal: &str) -> Result<Vec<String>>;
170    async fn query_token_usage(
171        &self,
172        start_time: Option<i64>,
173        end_time: Option<i64>,
174    ) -> Result<(TokenUsageSummary, Vec<ModelUsage>, Vec<SystemUsage>)>;
175
176    /// Time-bucketed token usage grouped by model for cost-over-time analysis.
177    ///
178    /// `bucket_ns` is the bucket size in nanoseconds (e.g. 3_600_000_000_000 for 1h).
179    async fn query_cost_series(
180        &self,
181        start_time: Option<i64>,
182        end_time: Option<i64>,
183        bucket_ns: i64,
184    ) -> Result<Vec<CostSeriesPoint>>;
185
186    /// Top-N most expensive LLM spans by total tokens.
187    async fn query_top_spans(
188        &self,
189        start_time: Option<i64>,
190        end_time: Option<i64>,
191        limit: usize,
192    ) -> Result<Vec<TopSpan>>;
193
194    /// Finish-reason distribution across LLM spans and Claude Code api_response_body logs.
195    async fn query_finish_reasons(
196        &self,
197        start_time: Option<i64>,
198        end_time: Option<i64>,
199    ) -> Result<Vec<FinishReasonCount>>;
200
201    /// Latency (and optional TTFT) percentile statistics per model for LLM spans.
202    async fn query_latency_stats(
203        &self,
204        start_time: Option<i64>,
205        end_time: Option<i64>,
206    ) -> Result<Vec<LatencyStats>>;
207
208    /// Error rate by model across LLM spans.
209    async fn query_error_rate(
210        &self,
211        start_time: Option<i64>,
212        end_time: Option<i64>,
213    ) -> Result<Vec<ErrorRateByModel>>;
214
215    /// Aggregated tool-execution usage counts and durations.
216    async fn query_tool_usage(
217        &self,
218        start_time: Option<i64>,
219        end_time: Option<i64>,
220        limit: usize,
221    ) -> Result<Vec<ToolUsage>>;
222
223    /// Retry statistics across LLM spans.
224    async fn query_retry_stats(
225        &self,
226        start_time: Option<i64>,
227        end_time: Option<i64>,
228    ) -> Result<RetryStats>;
229}