Skip to main content

otelite_core/
storage.rs

1//! Storage abstraction layer for otelite.
2//!
3//! Defines the `StorageBackend` trait and all associated types so that
4//! downstream crates (`otelite-receiver`, `otelite-api`) can depend only on
5//! `otelite-core` rather than the concrete SQLite implementation.
6
7use async_trait::async_trait;
8use thiserror::Error;
9
10use crate::api::{
11    CacheHitRateByModel, CallsSeriesPoint, ConversationCostRow, ConversationDepthStats,
12    CostSeriesPoint, ErrorRateByModel, ErrorTypeBreakdown, FinishReasonCount, LatencyStats,
13    ModelDriftPair, ModelUsage, RequestParamProfile, RetrievalStats, RetryStats, SessionCostRow,
14    SystemUsage, TokenUsageSummary, ToolUsage, TopSpan, TopSpanSort, TruncationRateByModel,
15};
16use crate::query::QueryPredicate;
17use crate::telemetry::log::SeverityLevel;
18use crate::telemetry::{LogRecord, Metric, Span};
19
20/// Result type for storage operations.
21pub type Result<T> = std::result::Result<T, StorageError>;
22
23/// Generic storage errors returned by `StorageBackend` implementations.
24///
25/// All variants carry string payloads so this type has no dependency on any
26/// database library. Backend-specific error types should convert to these via
27/// a `From` impl.
28#[derive(Error, Debug)]
29pub enum StorageError {
30    /// Storage initialization failed.
31    #[error("Failed to initialize storage: {0}")]
32    InitializationError(String),
33
34    /// Write operation failed.
35    #[error("Failed to write data: {0}")]
36    WriteError(String),
37
38    /// Query operation failed.
39    #[error("Failed to query data: {0}")]
40    QueryError(String),
41
42    /// Disk is full or insufficient space.
43    #[error("Insufficient disk space: {0}")]
44    DiskFullError(String),
45
46    /// Storage corruption detected.
47    #[error("Storage corruption detected: {0}")]
48    CorruptionError(String),
49
50    /// Permission denied.
51    #[error("Permission denied: {0}")]
52    PermissionError(String),
53
54    /// Configuration error.
55    #[error("Configuration error: {0}")]
56    ConfigError(String),
57
58    /// Purge operation failed.
59    #[error("Purge operation failed: {0}")]
60    PurgeError(String),
61
62    /// Underlying database error (string representation).
63    #[error("Database error: {0}")]
64    DatabaseError(String),
65
66    /// I/O error (string representation).
67    #[error("I/O error: {0}")]
68    IoError(String),
69
70    /// Serialization error (string representation).
71    #[error("Serialization error: {0}")]
72    SerializationError(String),
73}
74
75impl StorageError {
76    pub fn is_recoverable(&self) -> bool {
77        matches!(
78            self,
79            StorageError::WriteError(_) | StorageError::QueryError(_) | StorageError::PurgeError(_)
80        )
81    }
82
83    pub fn is_corruption(&self) -> bool {
84        matches!(self, StorageError::CorruptionError(_))
85    }
86
87    pub fn is_disk_full(&self) -> bool {
88        matches!(self, StorageError::DiskFullError(_))
89    }
90}
91
92/// Statistics returned after a `purge_all` operation.
93#[derive(Debug, Clone)]
94pub struct PurgeAllStats {
95    pub logs_deleted: u64,
96    pub spans_deleted: u64,
97    pub metrics_deleted: u64,
98}
99
100/// Statistics about stored telemetry data.
101#[derive(Debug, Clone)]
102pub struct StorageStats {
103    pub log_count: u64,
104    pub span_count: u64,
105    pub metric_count: u64,
106    /// Oldest record timestamp (nanoseconds since Unix epoch).
107    pub oldest_timestamp: Option<i64>,
108    /// Newest record timestamp (nanoseconds since Unix epoch).
109    pub newest_timestamp: Option<i64>,
110    pub storage_size_bytes: u64,
111}
112
113/// Query parameters for filtering telemetry data.
114#[derive(Debug, Clone, Default)]
115pub struct QueryParams {
116    pub start_time: Option<i64>,
117    pub end_time: Option<i64>,
118    pub limit: Option<usize>,
119    pub trace_id: Option<String>,
120    pub span_id: Option<String>,
121    pub min_severity: Option<SeverityLevel>,
122    pub search_text: Option<String>,
123    pub predicates: Vec<QueryPredicate>,
124}
125
126/// Options for manual data cleanup.
127#[derive(Debug, Clone)]
128pub struct PurgeOptions {
129    /// Purge data older than this timestamp (nanoseconds since Unix epoch).
130    pub older_than: Option<i64>,
131    pub signal_types: Vec<SignalType>,
132    pub dry_run: bool,
133}
134
135/// Signal type discriminator used in purge operations.
136#[derive(Debug, Clone, Copy, PartialEq, Eq)]
137pub enum SignalType {
138    Logs,
139    Traces,
140    Metrics,
141}
142
143/// Pluggable storage backend trait.
144///
145/// Both `otelite-receiver` (writes) and `otelite-api` (reads) depend only on
146/// this trait; neither needs a direct dependency on the SQLite implementation.
147#[async_trait]
148pub trait StorageBackend: Send + Sync {
149    async fn initialize(&mut self) -> Result<()>;
150    async fn write_log(&self, log: &LogRecord) -> Result<()>;
151    async fn write_span(&self, span: &Span) -> Result<()>;
152    async fn write_metric(&self, metric: &Metric) -> Result<()>;
153    async fn query_logs(&self, params: &QueryParams) -> Result<Vec<LogRecord>>;
154    async fn query_spans(&self, params: &QueryParams) -> Result<Vec<Span>>;
155    /// Query all spans for the N most-recent distinct traces matching the filters.
156    async fn query_spans_for_trace_list(
157        &self,
158        params: &QueryParams,
159        trace_limit: usize,
160    ) -> Result<Vec<Span>>;
161    /// Query metrics (raw time-series rows, latest first).
162    async fn query_metrics(&self, params: &QueryParams) -> Result<Vec<Metric>>;
163    /// Query metrics returning the single most-recent data point per unique name.
164    async fn query_latest_metrics(&self, params: &QueryParams) -> Result<Vec<Metric>>;
165    async fn stats(&self) -> Result<StorageStats>;
166    async fn purge(&self, options: &PurgeOptions) -> Result<u64>;
167    async fn purge_all(&self) -> Result<PurgeAllStats>;
168    async fn close(&mut self) -> Result<()>;
169    /// Return distinct resource attribute keys for the given signal type.
170    /// `signal` must be one of `"logs"`, `"spans"`, or `"metrics"`.
171    async fn distinct_resource_keys(&self, signal: &str) -> Result<Vec<String>>;
172    async fn query_token_usage(
173        &self,
174        start_time: Option<i64>,
175        end_time: Option<i64>,
176        model: Option<&str>,
177    ) -> Result<(TokenUsageSummary, Vec<ModelUsage>, Vec<SystemUsage>)>;
178
179    /// Time-bucketed token usage grouped by model for cost-over-time analysis.
180    ///
181    /// `bucket_ns` is the bucket size in nanoseconds (e.g. 3_600_000_000_000 for 1h).
182    async fn query_cost_series(
183        &self,
184        start_time: Option<i64>,
185        end_time: Option<i64>,
186        bucket_ns: i64,
187        model: Option<&str>,
188    ) -> Result<Vec<CostSeriesPoint>>;
189
190    /// Top-N LLM spans ordered by the given sort dimension.
191    ///
192    /// When `truncated_only` is true, only spans whose finish reason is
193    /// `max_tokens` or `length` are returned.
194    #[allow(clippy::too_many_arguments)]
195    async fn query_top_spans(
196        &self,
197        start_time: Option<i64>,
198        end_time: Option<i64>,
199        limit: usize,
200        sort_by: TopSpanSort,
201        truncated_only: bool,
202    ) -> Result<Vec<TopSpan>>;
203
204    /// Top-N sessions by total tokens, suitable for cost enrichment.
205    async fn query_top_sessions(
206        &self,
207        start_time: Option<i64>,
208        end_time: Option<i64>,
209        limit: usize,
210    ) -> Result<Vec<SessionCostRow>>;
211
212    /// Top-N conversations (gen_ai.conversation.id) by total tokens.
213    async fn query_top_conversations(
214        &self,
215        start_time: Option<i64>,
216        end_time: Option<i64>,
217        limit: usize,
218    ) -> Result<Vec<ConversationCostRow>>;
219
220    /// Finish-reason distribution across LLM spans and Claude Code api_response_body logs.
221    async fn query_finish_reasons(
222        &self,
223        start_time: Option<i64>,
224        end_time: Option<i64>,
225        model: Option<&str>,
226    ) -> Result<Vec<FinishReasonCount>>;
227
228    /// Latency (and optional TTFT) percentile statistics per model for LLM spans.
229    async fn query_latency_stats(
230        &self,
231        start_time: Option<i64>,
232        end_time: Option<i64>,
233        model: Option<&str>,
234    ) -> Result<Vec<LatencyStats>>;
235
236    /// Error rate by model across LLM spans.
237    async fn query_error_rate(
238        &self,
239        start_time: Option<i64>,
240        end_time: Option<i64>,
241        model: Option<&str>,
242    ) -> Result<Vec<ErrorRateByModel>>;
243
244    /// Aggregated tool-execution usage counts and durations.
245    async fn query_tool_usage(
246        &self,
247        start_time: Option<i64>,
248        end_time: Option<i64>,
249        limit: usize,
250    ) -> Result<Vec<ToolUsage>>;
251
252    /// Retry statistics across LLM spans.
253    async fn query_retry_stats(
254        &self,
255        start_time: Option<i64>,
256        end_time: Option<i64>,
257    ) -> Result<RetryStats>;
258
259    /// Aggregated retrieval / RAG statistics across retriever spans.
260    async fn query_retrieval_stats(
261        &self,
262        start_time: Option<i64>,
263        end_time: Option<i64>,
264        top_queries_limit: usize,
265    ) -> Result<RetrievalStats>;
266
267    /// Truncation rate (finish_reason = max_tokens / length) per model.
268    async fn query_truncation_rate(
269        &self,
270        start_time: Option<i64>,
271        end_time: Option<i64>,
272        model: Option<&str>,
273    ) -> Result<Vec<TruncationRateByModel>>;
274
275    /// Cache token hit rate per model.
276    async fn query_cache_hit_rate(
277        &self,
278        start_time: Option<i64>,
279        end_time: Option<i64>,
280        model: Option<&str>,
281    ) -> Result<Vec<CacheHitRateByModel>>;
282
283    /// Distribution of request parameter settings (temperature, max_tokens).
284    async fn query_request_param_profile(
285        &self,
286        start_time: Option<i64>,
287        end_time: Option<i64>,
288    ) -> Result<RequestParamProfile>;
289
290    /// Turn-count distribution across conversations with a known conversation_id.
291    async fn query_conversation_depth(
292        &self,
293        start_time: Option<i64>,
294        end_time: Option<i64>,
295    ) -> Result<ConversationDepthStats>;
296
297    /// LLM call volume per time bucket (parallel to query_cost_series).
298    async fn query_calls_series(
299        &self,
300        start_time: Option<i64>,
301        end_time: Option<i64>,
302        bucket_secs: u64,
303    ) -> Result<Vec<CallsSeriesPoint>>;
304
305    /// Per-(model, error_type) breakdown of error spans, bucketed into actionable categories.
306    async fn query_error_types(
307        &self,
308        start_time: Option<i64>,
309        end_time: Option<i64>,
310        model: Option<&str>,
311    ) -> Result<Vec<ErrorTypeBreakdown>>;
312
313    /// All observed (request_model, response_model) pairs with a `differs` flag.
314    async fn query_model_drift(
315        &self,
316        start_time: Option<i64>,
317        end_time: Option<i64>,
318    ) -> Result<Vec<ModelDriftPair>>;
319}