use crate::error::{Result, StorageError};
use crate::{QueryParams, StorageStats};
use otelite_core::query::{Operator, QueryPredicate, QueryValue};
use otelite_core::telemetry::log::SeverityLevel;
use otelite_core::telemetry::trace::{SpanKind, SpanStatus, StatusCode};
use otelite_core::telemetry::{LogRecord, Metric, Span};
use rusqlite::{Connection, Row};
pub fn query_logs(conn: &Connection, params: &QueryParams) -> Result<Vec<LogRecord>> {
let mut query = String::from("SELECT * FROM logs WHERE 1=1");
let mut sql_params: Vec<Box<dyn rusqlite::ToSql>> = Vec::new();
if let Some(start) = params.start_time {
query.push_str(" AND timestamp >= ?");
sql_params.push(Box::new(start));
}
if let Some(end) = params.end_time {
query.push_str(" AND timestamp <= ?");
sql_params.push(Box::new(end));
}
if let Some(ref trace_id) = params.trace_id {
query.push_str(" AND trace_id = ?");
sql_params.push(Box::new(trace_id.clone()));
}
if let Some(ref span_id) = params.span_id {
query.push_str(" AND span_id = ?");
sql_params.push(Box::new(span_id.clone()));
}
if let Some(min_severity) = params.min_severity {
query.push_str(" AND severity_number >= ?");
sql_params.push(Box::new(min_severity.to_i32()));
}
if let Some(ref search) = params.search_text {
query.push_str(" AND id IN (SELECT rowid FROM logs_fts WHERE body MATCH ?)");
sql_params.push(Box::new(search.clone()));
}
append_predicates("logs", ¶ms.predicates, &mut query, &mut sql_params)?;
query.push_str(" ORDER BY timestamp DESC");
if let Some(limit) = params.limit {
query.push_str(" LIMIT ?");
sql_params.push(Box::new(limit as i64));
}
let mut stmt = conn
.prepare(&query)
.map_err(|e| StorageError::QueryError(format!("Failed to prepare query: {}", e)))?;
let param_refs: Vec<&dyn rusqlite::ToSql> = sql_params.iter().map(|p| p.as_ref()).collect();
let logs = stmt
.query_map(param_refs.as_slice(), parse_log_row)
.map_err(|e| StorageError::QueryError(format!("Failed to execute query: {}", e)))?
.collect::<std::result::Result<Vec<_>, _>>()
.map_err(|e| StorageError::QueryError(format!("Failed to parse results: {}", e)))?;
Ok(logs)
}
pub fn query_spans(conn: &Connection, params: &QueryParams) -> Result<Vec<Span>> {
let mut query = String::from("SELECT * FROM spans WHERE 1=1");
let mut sql_params: Vec<Box<dyn rusqlite::ToSql>> = Vec::new();
if let Some(start) = params.start_time {
query.push_str(" AND start_time >= ?");
sql_params.push(Box::new(start));
}
if let Some(end) = params.end_time {
query.push_str(" AND end_time <= ?");
sql_params.push(Box::new(end));
}
if let Some(ref trace_id) = params.trace_id {
query.push_str(" AND trace_id = ?");
sql_params.push(Box::new(trace_id.clone()));
}
append_predicates("spans", ¶ms.predicates, &mut query, &mut sql_params)?;
query.push_str(" ORDER BY start_time DESC");
if let Some(limit) = params.limit {
query.push_str(" LIMIT ?");
sql_params.push(Box::new(limit as i64));
}
let mut stmt = conn
.prepare(&query)
.map_err(|e| StorageError::QueryError(format!("Failed to prepare query: {}", e)))?;
let param_refs: Vec<&dyn rusqlite::ToSql> = sql_params.iter().map(|p| p.as_ref()).collect();
let spans = stmt
.query_map(param_refs.as_slice(), parse_span_row)
.map_err(|e| StorageError::QueryError(format!("Failed to execute query: {}", e)))?
.collect::<std::result::Result<Vec<_>, _>>()
.map_err(|e| StorageError::QueryError(format!("Failed to parse results: {}", e)))?;
Ok(spans)
}
pub fn query_spans_for_trace_list(
conn: &Connection,
params: &QueryParams,
trace_limit: usize,
) -> Result<Vec<Span>> {
let mut sql_params: Vec<Box<dyn rusqlite::ToSql>> = Vec::new();
let mut subquery = String::from("SELECT trace_id FROM spans WHERE 1=1");
if let Some(start) = params.start_time {
subquery.push_str(" AND start_time >= ?");
sql_params.push(Box::new(start));
}
if let Some(end) = params.end_time {
subquery.push_str(" AND end_time <= ?");
sql_params.push(Box::new(end));
}
if let Some(ref trace_id) = params.trace_id {
subquery.push_str(" AND trace_id = ?");
sql_params.push(Box::new(trace_id.clone()));
}
subquery.push_str(" GROUP BY trace_id ORDER BY MAX(start_time) DESC LIMIT ?");
sql_params.push(Box::new(trace_limit as i64));
let query = format!(
"SELECT * FROM spans WHERE trace_id IN ({}) ORDER BY start_time DESC",
subquery
);
let mut stmt = conn
.prepare(&query)
.map_err(|e| StorageError::QueryError(format!("Failed to prepare query: {}", e)))?;
let param_refs: Vec<&dyn rusqlite::ToSql> = sql_params.iter().map(|p| p.as_ref()).collect();
let spans = stmt
.query_map(param_refs.as_slice(), parse_span_row)
.map_err(|e| StorageError::QueryError(format!("Failed to execute query: {}", e)))?
.collect::<std::result::Result<Vec<_>, _>>()
.map_err(|e| StorageError::QueryError(format!("Failed to parse results: {}", e)))?;
Ok(spans)
}
pub fn query_metrics(conn: &Connection, params: &QueryParams) -> Result<Vec<Metric>> {
let mut query = String::from("SELECT * FROM metrics WHERE 1=1");
let mut sql_params: Vec<Box<dyn rusqlite::ToSql>> = Vec::new();
if let Some(start) = params.start_time {
query.push_str(" AND timestamp >= ?");
sql_params.push(Box::new(start));
}
if let Some(end) = params.end_time {
query.push_str(" AND timestamp <= ?");
sql_params.push(Box::new(end));
}
append_predicates("metrics", ¶ms.predicates, &mut query, &mut sql_params)?;
query.push_str(" ORDER BY timestamp DESC");
if let Some(limit) = params.limit {
query.push_str(" LIMIT ?");
sql_params.push(Box::new(limit as i64));
}
let mut stmt = conn
.prepare(&query)
.map_err(|e| StorageError::QueryError(format!("Failed to prepare query: {}", e)))?;
let param_refs: Vec<&dyn rusqlite::ToSql> = sql_params.iter().map(|p| p.as_ref()).collect();
let metrics = stmt
.query_map(param_refs.as_slice(), parse_metric_row)
.map_err(|e| StorageError::QueryError(format!("Failed to execute query: {}", e)))?
.collect::<std::result::Result<Vec<_>, _>>()
.map_err(|e| StorageError::QueryError(format!("Failed to parse results: {}", e)))?;
Ok(metrics)
}
pub fn query_latest_metrics(conn: &Connection, params: &QueryParams) -> Result<Vec<Metric>> {
let mut query = String::from(
"SELECT * FROM metrics WHERE rowid IN (\
SELECT rowid FROM metrics GROUP BY name HAVING timestamp = MAX(timestamp)\
) AND 1=1",
);
let mut sql_params: Vec<Box<dyn rusqlite::ToSql>> = Vec::new();
if let Some(start) = params.start_time {
query.push_str(" AND timestamp >= ?");
sql_params.push(Box::new(start));
}
if let Some(end) = params.end_time {
query.push_str(" AND timestamp <= ?");
sql_params.push(Box::new(end));
}
append_predicates("metrics", ¶ms.predicates, &mut query, &mut sql_params)?;
query.push_str(" ORDER BY name ASC");
if let Some(limit) = params.limit {
query.push_str(" LIMIT ?");
sql_params.push(Box::new(limit as i64));
}
let mut stmt = conn
.prepare(&query)
.map_err(|e| StorageError::QueryError(format!("Failed to prepare query: {}", e)))?;
let param_refs: Vec<&dyn rusqlite::ToSql> = sql_params.iter().map(|p| p.as_ref()).collect();
let metrics = stmt
.query_map(param_refs.as_slice(), parse_metric_row)
.map_err(|e| StorageError::QueryError(format!("Failed to execute query: {}", e)))?
.collect::<std::result::Result<Vec<_>, _>>()
.map_err(|e| StorageError::QueryError(format!("Failed to parse results: {}", e)))?;
Ok(metrics)
}
pub fn get_stats(conn: &Connection) -> Result<StorageStats> {
let log_count: i64 = conn
.query_row("SELECT COUNT(*) FROM logs", [], |row| row.get(0))
.map_err(|e| StorageError::QueryError(format!("Failed to count logs: {}", e)))?;
let span_count: i64 = conn
.query_row("SELECT COUNT(*) FROM spans", [], |row| row.get(0))
.map_err(|e| StorageError::QueryError(format!("Failed to count spans: {}", e)))?;
let metric_count: i64 = conn
.query_row("SELECT COUNT(*) FROM metrics", [], |row| row.get(0))
.map_err(|e| StorageError::QueryError(format!("Failed to count metrics: {}", e)))?;
let oldest_timestamp: Option<i64> = conn
.query_row(
"SELECT MIN(timestamp) FROM (
SELECT timestamp FROM logs
UNION ALL SELECT start_time as timestamp FROM spans
UNION ALL SELECT timestamp FROM metrics
)",
[],
|row| row.get(0),
)
.ok();
let newest_timestamp: Option<i64> = conn
.query_row(
"SELECT MAX(timestamp) FROM (
SELECT timestamp FROM logs
UNION ALL SELECT end_time as timestamp FROM spans
UNION ALL SELECT timestamp FROM metrics
)",
[],
|row| row.get(0),
)
.ok();
let page_count: i64 = conn
.query_row("PRAGMA page_count", [], |row| row.get(0))
.unwrap_or(0);
let page_size: i64 = conn
.query_row("PRAGMA page_size", [], |row| row.get(0))
.unwrap_or(4096);
let total_size_bytes = page_count * page_size;
Ok(StorageStats {
log_count: log_count as u64,
span_count: span_count as u64,
metric_count: metric_count as u64,
oldest_timestamp,
newest_timestamp,
storage_size_bytes: total_size_bytes as u64,
})
}
fn append_predicates(
signal_type: &str,
predicates: &[QueryPredicate],
query: &mut String,
sql_params: &mut Vec<Box<dyn rusqlite::ToSql>>,
) -> Result<()> {
for predicate in predicates {
let clause = predicate_to_sql(signal_type, predicate, sql_params)?;
query.push_str(" AND ");
query.push_str(&clause);
}
Ok(())
}
fn predicate_to_sql(
signal_type: &str,
predicate: &QueryPredicate,
sql_params: &mut Vec<Box<dyn rusqlite::ToSql>>,
) -> Result<String> {
let lhs = field_to_sql(signal_type, &predicate.field)?;
let operator = sql_operator(&predicate.operator);
match (&predicate.field[..], &predicate.operator, &predicate.value) {
("duration", op, QueryValue::Duration(value)) if signal_type == "spans" => {
sql_params.push(Box::new(*value as i64));
Ok(format!("((end_time - start_time) {} ?)", sql_operator(op)))
},
("duration", _, _) if signal_type == "spans" => Err(StorageError::QueryError(
"Structured query field 'duration' for spans requires a duration value like 500ms"
.to_string(),
)),
(_, Operator::Contains, QueryValue::String(value)) => {
sql_params.push(Box::new(format!("%{}%", value)));
Ok(format!("{} LIKE ?", lhs))
},
(_, Operator::Contains, _) => Err(StorageError::QueryError(format!(
"Structured query operator 'contains' for field '{}' requires a quoted string value",
predicate.field
))),
(_, _, QueryValue::String(value)) => {
sql_params.push(Box::new(value.clone()));
Ok(format!("{} {} ?", lhs, operator))
},
(_, _, QueryValue::Number(value)) => {
sql_params.push(Box::new(*value));
Ok(format!("{} {} ?", lhs, operator))
},
(_, _, QueryValue::Duration(value)) => {
sql_params.push(Box::new(*value as i64));
Ok(format!("{} {} ?", lhs, operator))
},
}
}
fn field_to_sql(signal_type: &str, field: &str) -> Result<String> {
let direct_column = match (signal_type, field) {
("logs", "timestamp") => Some("timestamp"),
("logs", "trace_id") => Some("trace_id"),
("logs", "span_id") => Some("span_id"),
("logs", "severity") | ("logs", "severity_number") => Some("severity_number"),
("logs", "body") => Some("body"),
("spans", "trace_id") => Some("trace_id"),
("spans", "span_id") => Some("span_id"),
("spans", "parent_span_id") => Some("parent_span_id"),
("spans", "name") => Some("name"),
("spans", "kind") => Some("kind"),
("spans", "start_time") => Some("start_time"),
("spans", "end_time") => Some("end_time"),
("metrics", "name") => Some("name"),
("metrics", "description") => Some("description"),
("metrics", "unit") => Some("unit"),
("metrics", "timestamp") => Some("timestamp"),
_ => None,
};
if let Some(column) = direct_column {
return Ok(column.to_string());
}
if let Some(attribute_field) = field.strip_prefix("attributes.") {
return Ok(format!(
"json_extract(attributes, '{}')",
json_path_for_key(attribute_field)
));
}
if let Some(resource_field) = field.strip_prefix("resource.") {
return Ok(format!(
"json_extract(resource, '$.attributes{}')",
json_key_accessor(resource_field)
));
}
Ok(format!(
"json_extract(attributes, '{}')",
json_path_for_key(field)
))
}
fn json_path_for_key(field: &str) -> String {
format!("$.\"{}\"", field)
}
fn json_key_accessor(field: &str) -> String {
format!(".\"{}\"", field)
}
fn sql_operator(operator: &Operator) -> &'static str {
match operator {
Operator::Equal => "=",
Operator::NotEqual => "!=",
Operator::GreaterThan => ">",
Operator::LessThan => "<",
Operator::GreaterThanOrEqual => ">=",
Operator::LessThanOrEqual => "<=",
Operator::Contains => "LIKE",
}
}
fn parse_log_row(row: &Row) -> rusqlite::Result<LogRecord> {
let attributes_json: String = row.get("attributes")?;
let attributes = serde_json::from_str(&attributes_json).unwrap_or_default();
let resource_json: String = row.get("resource")?;
let resource = serde_json::from_str(&resource_json).ok();
let severity_num: i32 = row.get("severity_number")?;
let severity = SeverityLevel::from_i32(severity_num).unwrap_or(SeverityLevel::Info);
Ok(LogRecord {
timestamp: row.get("timestamp")?,
observed_timestamp: row.get("observed_timestamp")?,
trace_id: row.get("trace_id")?,
span_id: row.get("span_id")?,
severity,
severity_text: row.get("severity_text")?,
body: row.get("body")?,
attributes,
resource,
})
}
fn parse_span_row(row: &Row) -> rusqlite::Result<Span> {
let attributes_json: String = row.get("attributes")?;
let attributes = serde_json::from_str(&attributes_json).unwrap_or_default();
let events_json: String = row.get("events")?;
let events = serde_json::from_str(&events_json).unwrap_or_default();
let resource_json: String = row.get("resource")?;
let resource = serde_json::from_str(&resource_json).ok();
let kind_num: i32 = row.get("kind")?;
let kind = SpanKind::from_i32(kind_num).unwrap_or(SpanKind::Internal);
let status_code_num: i32 = row.get("status_code")?;
let status_code = StatusCode::from_i32(status_code_num).unwrap_or(StatusCode::Unset);
let status = SpanStatus {
code: status_code,
message: row.get("status_message")?,
};
Ok(Span {
trace_id: row.get("trace_id")?,
span_id: row.get("span_id")?,
parent_span_id: row.get("parent_span_id")?,
name: row.get("name")?,
kind,
start_time: row.get("start_time")?,
end_time: row.get("end_time")?,
attributes,
events,
status,
resource,
})
}
fn parse_metric_row(row: &Row) -> rusqlite::Result<Metric> {
use otelite_core::telemetry::metric::MetricType;
let attributes_json: String = row.get("attributes")?;
let attributes = serde_json::from_str(&attributes_json).unwrap_or_default();
let resource_json: String = row.get("resource")?;
let resource = serde_json::from_str(&resource_json).ok();
let metric_type_int: i32 = row.get("metric_type")?;
let metric_type = match metric_type_int {
0 => {
let value: f64 = row.get("value_double")?;
MetricType::Gauge(value)
},
1 => {
let value: i64 = row.get("value_int")?;
MetricType::Counter(value as u64)
},
2 => {
let histogram_json: String = row.get("value_histogram")?;
let (count, sum, buckets) =
serde_json::from_str(&histogram_json).unwrap_or((0, 0.0, Vec::new()));
MetricType::Histogram {
count,
sum,
buckets,
}
},
3 => {
let summary_json: String = row.get("value_summary")?;
let (count, sum, quantiles) =
serde_json::from_str(&summary_json).unwrap_or((0, 0.0, Vec::new()));
MetricType::Summary {
count,
sum,
quantiles,
}
},
_ => MetricType::Gauge(0.0),
};
Ok(Metric {
name: row.get("name")?,
description: row.get("description")?,
unit: row.get("unit")?,
metric_type,
timestamp: row.get("timestamp")?,
attributes,
resource,
})
}
struct TokenExprs {
input: &'static str,
output: &'static str,
cache_creation: &'static str,
cache_read: &'static str,
model: &'static str,
system: &'static str,
llm_span_guard: &'static str,
}
fn token_exprs() -> TokenExprs {
TokenExprs {
input: "COALESCE(\
CAST(json_extract(attributes, '$.\"gen_ai.usage.input_tokens\"') AS INTEGER), \
CAST(json_extract(attributes, '$.\"gen_ai.usage.prompt_tokens\"') AS INTEGER), \
CAST(json_extract(attributes, '$.\"llm.usage.prompt_tokens\"') AS INTEGER), \
CAST(json_extract(attributes, '$.\"llm.token_count.prompt\"') AS INTEGER), \
CAST(json_extract(attributes, '$.\"prompt_tokens\"') AS INTEGER), \
CAST(json_extract(attributes, '$.\"input_tokens\"') AS INTEGER))",
output: "COALESCE(\
CAST(json_extract(attributes, '$.\"gen_ai.usage.output_tokens\"') AS INTEGER), \
CAST(json_extract(attributes, '$.\"gen_ai.usage.completion_tokens\"') AS INTEGER), \
CAST(json_extract(attributes, '$.\"llm.usage.completion_tokens\"') AS INTEGER), \
CAST(json_extract(attributes, '$.\"llm.token_count.completion\"') AS INTEGER), \
CAST(json_extract(attributes, '$.\"completion_tokens\"') AS INTEGER), \
CAST(json_extract(attributes, '$.\"output_tokens\"') AS INTEGER))",
cache_creation: "COALESCE(\
CAST(json_extract(attributes, '$.\"gen_ai.usage.cache_creation.input_tokens\"') AS INTEGER), \
CAST(json_extract(attributes, '$.\"gen_ai.usage.cache_creation_input_tokens\"') AS INTEGER), \
CAST(json_extract(attributes, '$.\"cache_creation_input_tokens\"') AS INTEGER), \
CAST(json_extract(attributes, '$.\"cache_creation_tokens\"') AS INTEGER))",
cache_read: "COALESCE(\
CAST(json_extract(attributes, '$.\"gen_ai.usage.cache_read.input_tokens\"') AS INTEGER), \
CAST(json_extract(attributes, '$.\"gen_ai.usage.cache_read_input_tokens\"') AS INTEGER), \
CAST(json_extract(attributes, '$.\"cache_read_input_tokens\"') AS INTEGER), \
CAST(json_extract(attributes, '$.\"cache_read_tokens\"') AS INTEGER))",
model: "COALESCE(\
json_extract(attributes, '$.\"gen_ai.request.model\"'), \
json_extract(attributes, '$.\"gen_ai.response.model\"'), \
json_extract(attributes, '$.\"llm.request.model\"'), \
json_extract(attributes, '$.\"llm.model_name\"'), \
json_extract(attributes, '$.\"model\"'))",
system: "COALESCE(\
json_extract(attributes, '$.\"gen_ai.provider.name\"'), \
json_extract(attributes, '$.\"gen_ai.system\"'), \
json_extract(attributes, '$.\"llm.system\"'), \
json_extract(attributes, '$.\"llm.vendor\"'))",
llm_span_guard: "(json_extract(attributes, '$.\"gen_ai.system\"') IS NOT NULL \
OR json_extract(attributes, '$.\"gen_ai.provider.name\"') IS NOT NULL \
OR json_extract(attributes, '$.\"llm.system\"') IS NOT NULL \
OR json_extract(attributes, '$.\"llm.vendor\"') IS NOT NULL \
OR json_extract(attributes, '$.\"llm.request.model\"') IS NOT NULL)",
}
}
pub fn query_token_usage(
conn: &Connection,
start_time: Option<i64>,
end_time: Option<i64>,
) -> Result<(
otelite_core::api::TokenUsageSummary,
Vec<otelite_core::api::ModelUsage>,
Vec<otelite_core::api::SystemUsage>,
)> {
let exprs = token_exprs();
let mut where_clause = format!("WHERE {}", exprs.llm_span_guard);
let mut params: Vec<Box<dyn rusqlite::ToSql>> = Vec::new();
if let Some(start) = start_time {
where_clause.push_str(" AND start_time >= ?");
params.push(Box::new(start));
}
if let Some(end) = end_time {
where_clause.push_str(" AND end_time <= ?");
params.push(Box::new(end));
}
let input_expr = exprs.input;
let output_expr = exprs.output;
let cache_creation_expr = exprs.cache_creation;
let cache_read_expr = exprs.cache_read;
let summary_query = format!(
"SELECT
COALESCE(SUM({input_expr}), 0) as total_input,
COALESCE(SUM({output_expr}), 0) as total_output,
COUNT(*) as total_requests,
COALESCE(SUM({cache_creation_expr}), 0) as cache_creation,
COALESCE(SUM({cache_read_expr}), 0) as cache_read
FROM spans
{where_clause}"
);
let param_refs: Vec<&dyn rusqlite::ToSql> = params.iter().map(|p| p.as_ref()).collect();
let summary = conn
.query_row(&summary_query, param_refs.as_slice(), |row| {
Ok(otelite_core::api::TokenUsageSummary {
total_input_tokens: row.get::<_, i64>(0)? as u64,
total_output_tokens: row.get::<_, i64>(1)? as u64,
total_requests: row.get::<_, i64>(2)? as usize,
total_cache_creation_tokens: row.get::<_, i64>(3)? as u64,
total_cache_read_tokens: row.get::<_, i64>(4)? as u64,
})
})
.map_err(|e| StorageError::QueryError(format!("Failed to query token summary: {}", e)))?;
let model_expr = exprs.model;
let model_query = format!(
"SELECT
{model_expr} as model,
COALESCE(SUM({input_expr}), 0) as input_tokens,
COALESCE(SUM({output_expr}), 0) as output_tokens,
COUNT(*) as requests
FROM spans
{where_clause}
GROUP BY model
HAVING model IS NOT NULL
ORDER BY input_tokens + output_tokens DESC"
);
let mut stmt = conn
.prepare(&model_query)
.map_err(|e| StorageError::QueryError(format!("Failed to prepare model query: {}", e)))?;
let param_refs: Vec<&dyn rusqlite::ToSql> = params.iter().map(|p| p.as_ref()).collect();
let by_model = stmt
.query_map(param_refs.as_slice(), |row| {
Ok(otelite_core::api::ModelUsage {
model: row.get(0)?,
input_tokens: row.get::<_, i64>(1)? as u64,
output_tokens: row.get::<_, i64>(2)? as u64,
requests: row.get::<_, i64>(3)? as usize,
})
})
.map_err(|e| StorageError::QueryError(format!("Failed to execute model query: {}", e)))?
.collect::<std::result::Result<Vec<_>, _>>()
.map_err(|e| StorageError::QueryError(format!("Failed to parse model results: {}", e)))?;
let system_expr = exprs.system;
let system_query = format!(
"SELECT
{system_expr} as system,
COALESCE(SUM({input_expr}), 0) as input_tokens,
COALESCE(SUM({output_expr}), 0) as output_tokens,
COUNT(*) as requests
FROM spans
{where_clause}
GROUP BY system
HAVING system IS NOT NULL
ORDER BY input_tokens + output_tokens DESC"
);
let mut stmt = conn
.prepare(&system_query)
.map_err(|e| StorageError::QueryError(format!("Failed to prepare system query: {}", e)))?;
let param_refs: Vec<&dyn rusqlite::ToSql> = params.iter().map(|p| p.as_ref()).collect();
let by_system = stmt
.query_map(param_refs.as_slice(), |row| {
Ok(otelite_core::api::SystemUsage {
system: row.get(0)?,
input_tokens: row.get::<_, i64>(1)? as u64,
output_tokens: row.get::<_, i64>(2)? as u64,
requests: row.get::<_, i64>(3)? as usize,
})
})
.map_err(|e| StorageError::QueryError(format!("Failed to execute system query: {}", e)))?
.collect::<std::result::Result<Vec<_>, _>>()
.map_err(|e| StorageError::QueryError(format!("Failed to parse system results: {}", e)))?;
Ok((summary, by_model, by_system))
}
pub fn query_cost_series(
conn: &Connection,
start_time: Option<i64>,
end_time: Option<i64>,
bucket_ns: i64,
) -> Result<Vec<otelite_core::api::CostSeriesPoint>> {
if bucket_ns <= 0 {
return Err(StorageError::QueryError(format!(
"bucket_ns must be positive, got {}",
bucket_ns
)));
}
let exprs = token_exprs();
let mut where_clause = format!("WHERE {}", exprs.llm_span_guard);
let mut params: Vec<Box<dyn rusqlite::ToSql>> = Vec::new();
if let Some(start) = start_time {
where_clause.push_str(" AND start_time >= ?");
params.push(Box::new(start));
}
if let Some(end) = end_time {
where_clause.push_str(" AND end_time <= ?");
params.push(Box::new(end));
}
let sql = format!(
"SELECT
(start_time / ?) * ? as bucket,
{model} as model,
COALESCE(SUM({input}), 0),
COALESCE(SUM({output}), 0),
COALESCE(SUM({cache_creation}), 0),
COALESCE(SUM({cache_read}), 0),
COUNT(*) as requests
FROM spans
{where_clause}
GROUP BY bucket, model
ORDER BY bucket ASC",
model = exprs.model,
input = exprs.input,
output = exprs.output,
cache_creation = exprs.cache_creation,
cache_read = exprs.cache_read,
);
let mut all_params: Vec<Box<dyn rusqlite::ToSql>> = Vec::with_capacity(params.len() + 2);
all_params.push(Box::new(bucket_ns));
all_params.push(Box::new(bucket_ns));
all_params.extend(params);
let param_refs: Vec<&dyn rusqlite::ToSql> = all_params.iter().map(|p| p.as_ref()).collect();
let mut stmt = conn.prepare(&sql).map_err(|e| {
StorageError::QueryError(format!("Failed to prepare cost_series query: {}", e))
})?;
let rows = stmt
.query_map(param_refs.as_slice(), |row| {
Ok(otelite_core::api::CostSeriesPoint {
timestamp: row.get::<_, i64>(0)?,
model: row.get::<_, Option<String>>(1)?,
input_tokens: row.get::<_, i64>(2)? as u64,
output_tokens: row.get::<_, i64>(3)? as u64,
cache_creation_tokens: row.get::<_, i64>(4)? as u64,
cache_read_tokens: row.get::<_, i64>(5)? as u64,
requests: row.get::<_, i64>(6)? as usize,
})
})
.map_err(|e| {
StorageError::QueryError(format!("Failed to execute cost_series query: {}", e))
})?
.collect::<std::result::Result<Vec<_>, _>>()
.map_err(|e| {
StorageError::QueryError(format!("Failed to parse cost_series results: {}", e))
})?;
Ok(rows)
}
pub fn query_top_spans(
conn: &Connection,
start_time: Option<i64>,
end_time: Option<i64>,
limit: usize,
) -> Result<Vec<otelite_core::api::TopSpan>> {
let exprs = token_exprs();
let mut where_clause = format!("WHERE {}", exprs.llm_span_guard);
let mut params: Vec<Box<dyn rusqlite::ToSql>> = Vec::new();
if let Some(start) = start_time {
where_clause.push_str(" AND start_time >= ?");
params.push(Box::new(start));
}
if let Some(end) = end_time {
where_clause.push_str(" AND end_time <= ?");
params.push(Box::new(end));
}
let sql = format!(
"SELECT
trace_id,
span_id,
start_time,
(end_time - start_time) as duration,
{model} as model,
{system} as system,
json_extract(attributes, '$.\"session.id\"') as session_id,
json_extract(attributes, '$.\"prompt.id\"') as prompt_id,
COALESCE({input}, 0) as input_tokens,
COALESCE({output}, 0) as output_tokens,
COALESCE({cache_creation}, 0) as cache_creation_tokens,
COALESCE({cache_read}, 0) as cache_read_tokens,
COALESCE({input}, 0) + COALESCE({output}, 0) + COALESCE({cache_creation}, 0) + COALESCE({cache_read}, 0) as total_tokens
FROM spans
{where_clause}
ORDER BY total_tokens DESC
LIMIT ?",
model = exprs.model,
system = exprs.system,
input = exprs.input,
output = exprs.output,
cache_creation = exprs.cache_creation,
cache_read = exprs.cache_read,
);
params.push(Box::new(limit as i64));
let param_refs: Vec<&dyn rusqlite::ToSql> = params.iter().map(|p| p.as_ref()).collect();
let mut stmt = conn.prepare(&sql).map_err(|e| {
StorageError::QueryError(format!("Failed to prepare top_spans query: {}", e))
})?;
let rows = stmt
.query_map(param_refs.as_slice(), |row| {
Ok(otelite_core::api::TopSpan {
trace_id: row.get(0)?,
span_id: row.get(1)?,
start_time: row.get::<_, i64>(2)?,
duration: row.get::<_, i64>(3)?,
model: row.get::<_, Option<String>>(4)?,
system: row.get::<_, Option<String>>(5)?,
session_id: row.get::<_, Option<String>>(6)?,
prompt_id: row.get::<_, Option<String>>(7)?,
input_tokens: row.get::<_, i64>(8)? as u64,
output_tokens: row.get::<_, i64>(9)? as u64,
cache_creation_tokens: row.get::<_, i64>(10)? as u64,
cache_read_tokens: row.get::<_, i64>(11)? as u64,
total_tokens: row.get::<_, i64>(12)? as u64,
})
})
.map_err(|e| StorageError::QueryError(format!("Failed to execute top_spans query: {}", e)))?
.collect::<std::result::Result<Vec<_>, _>>()
.map_err(|e| {
StorageError::QueryError(format!("Failed to parse top_spans results: {}", e))
})?;
Ok(rows)
}
pub fn query_finish_reasons(
conn: &Connection,
start_time: Option<i64>,
end_time: Option<i64>,
) -> Result<Vec<otelite_core::api::FinishReasonCount>> {
let mut spans_time_filter = String::new();
let mut logs_time_filter = String::new();
let mut params: Vec<Box<dyn rusqlite::ToSql>> = Vec::new();
if let Some(start) = start_time {
spans_time_filter.push_str(" AND start_time >= ?");
params.push(Box::new(start));
}
if let Some(end) = end_time {
spans_time_filter.push_str(" AND end_time <= ?");
params.push(Box::new(end));
}
if let Some(start) = start_time {
params.push(Box::new(start));
}
if let Some(end) = end_time {
params.push(Box::new(end));
}
if let Some(start) = start_time {
logs_time_filter.push_str(" AND timestamp >= ?");
params.push(Box::new(start));
}
if let Some(end) = end_time {
logs_time_filter.push_str(" AND timestamp <= ?");
params.push(Box::new(end));
}
let sql = format!(
"WITH reasons AS (
SELECT json_extract(attributes, '$.\"gen_ai.response.finish_reason\"') AS reason
FROM spans
WHERE json_extract(attributes, '$.\"gen_ai.response.finish_reason\"') IS NOT NULL
{spans_time_filter}
UNION ALL
SELECT je.value AS reason
FROM spans, json_each(json_extract(attributes, '$.\"gen_ai.response.finish_reasons\"')) je
WHERE json_extract(attributes, '$.\"gen_ai.response.finish_reasons\"') IS NOT NULL
{spans_time_filter}
UNION ALL
SELECT json_extract(json_extract(attributes, '$.body'), '$.stop_reason') AS reason
FROM logs
WHERE body = 'claude_code.api_response_body'
AND json_extract(json_extract(attributes, '$.body'), '$.stop_reason') IS NOT NULL
{logs_time_filter}
)
SELECT reason, COUNT(*) as cnt
FROM reasons
WHERE reason IS NOT NULL
GROUP BY reason
ORDER BY cnt DESC"
);
let param_refs: Vec<&dyn rusqlite::ToSql> = params.iter().map(|p| p.as_ref()).collect();
let mut stmt = conn.prepare(&sql).map_err(|e| {
StorageError::QueryError(format!("Failed to prepare finish_reasons query: {}", e))
})?;
let rows = stmt
.query_map(param_refs.as_slice(), |row| {
Ok(otelite_core::api::FinishReasonCount {
reason: row.get::<_, String>(0)?,
count: row.get::<_, i64>(1)? as usize,
})
})
.map_err(|e| {
StorageError::QueryError(format!("Failed to execute finish_reasons query: {}", e))
})?
.collect::<std::result::Result<Vec<_>, _>>()
.map_err(|e| {
StorageError::QueryError(format!("Failed to parse finish_reasons results: {}", e))
})?;
Ok(rows)
}
pub fn distinct_resource_keys(conn: &Connection, signal: &str) -> Result<Vec<String>> {
let table = match signal {
"logs" => "logs",
"spans" => "spans",
"metrics" => "metrics",
other => {
return Err(StorageError::QueryError(format!(
"Unknown signal type: {}",
other
)));
},
};
let sql = format!(
"SELECT DISTINCT je.key \
FROM {table}, json_each(json_extract({table}.resource, '$.attributes')) je \
WHERE {table}.resource IS NOT NULL \
AND json_extract({table}.resource, '$.attributes') IS NOT NULL \
LIMIT 50"
);
let mut stmt = conn
.prepare(&sql)
.map_err(|e| StorageError::QueryError(format!("Failed to prepare query: {}", e)))?;
let keys = stmt
.query_map([], |row| row.get::<_, String>(0))
.map_err(|e| StorageError::QueryError(format!("Failed to execute query: {}", e)))?
.filter_map(|r| r.ok())
.collect();
Ok(keys)
}
#[cfg(test)]
mod tests {
use super::*;
use crate::sqlite::schema;
fn setup_test_db() -> Connection {
let conn = Connection::open_in_memory().unwrap();
schema::initialize_schema(&conn).unwrap();
conn
}
#[test]
fn test_query_logs_empty() {
let conn = setup_test_db();
let params = QueryParams::default();
let logs = query_logs(&conn, ¶ms).unwrap();
assert_eq!(logs.len(), 0);
}
#[test]
fn test_get_stats_empty() {
let conn = setup_test_db();
let stats = get_stats(&conn).unwrap();
assert_eq!(stats.log_count, 0);
assert_eq!(stats.span_count, 0);
assert_eq!(stats.metric_count, 0);
}
#[test]
fn test_field_to_sql_for_attribute_field() {
let sql = field_to_sql("logs", "gen_ai.system").unwrap();
assert_eq!(sql, "json_extract(attributes, '$.\"gen_ai.system\"')");
}
#[test]
fn test_field_to_sql_for_explicit_attribute_prefix() {
let sql = field_to_sql("logs", "attributes.http.method").unwrap();
assert_eq!(sql, "json_extract(attributes, '$.\"http.method\"')");
}
#[test]
fn test_field_to_sql_for_resource_prefix() {
let sql = field_to_sql("logs", "resource.service.name").unwrap();
assert_eq!(
sql,
"json_extract(resource, '$.attributes.\"service.name\"')"
);
}
#[test]
fn test_json_key_accessor_quotes_dotted_keys() {
assert_eq!(json_key_accessor("service.name"), ".\"service.name\"");
}
#[test]
fn test_predicate_to_sql_for_attribute_equality() {
let mut params: Vec<Box<dyn rusqlite::ToSql>> = Vec::new();
let sql = predicate_to_sql(
"logs",
&QueryPredicate {
field: "gen_ai.system".to_string(),
operator: Operator::Equal,
value: QueryValue::String("anthropic".to_string()),
},
&mut params,
)
.unwrap();
assert_eq!(sql, "json_extract(attributes, '$.\"gen_ai.system\"') = ?");
assert_eq!(params.len(), 1);
}
#[test]
fn test_predicate_to_sql_for_resource_equality() {
let mut params: Vec<Box<dyn rusqlite::ToSql>> = Vec::new();
let sql = predicate_to_sql(
"logs",
&QueryPredicate {
field: "resource.service.name".to_string(),
operator: Operator::Equal,
value: QueryValue::String("gateway".to_string()),
},
&mut params,
)
.unwrap();
assert_eq!(
sql,
"json_extract(resource, '$.attributes.\"service.name\"') = ?"
);
assert_eq!(params.len(), 1);
}
#[test]
fn test_span_duration_predicate_requires_duration_value() {
let mut params = Vec::new();
let err = predicate_to_sql(
"spans",
&QueryPredicate {
field: "duration".to_string(),
operator: Operator::GreaterThan,
value: QueryValue::Number(100.0),
},
&mut params,
)
.unwrap_err();
assert!(err
.to_string()
.contains("requires a duration value like 500ms"));
}
#[test]
fn test_query_logs_with_structured_attribute_and_resource_predicates() {
let conn = setup_test_db();
conn.execute(
"INSERT INTO logs (
timestamp, observed_timestamp, trace_id, span_id,
severity_number, severity_text, body, attributes, resource, scope
) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10)",
rusqlite::params![
1000_i64,
1000_i64,
"trace-a",
"span-a",
SeverityLevel::Info.to_i32(),
"INFO",
"matching log body",
r#"{"gen_ai.system":"anthropic"}"#,
r#"{"attributes":{"service.name":"gateway"}}"#,
"{}",
],
)
.unwrap();
let params = QueryParams {
predicates: vec![
QueryPredicate {
field: "gen_ai.system".to_string(),
operator: Operator::Equal,
value: QueryValue::String("anthropic".to_string()),
},
QueryPredicate {
field: "resource.service.name".to_string(),
operator: Operator::Equal,
value: QueryValue::String("gateway".to_string()),
},
],
..Default::default()
};
let attr_match: Option<String> = conn
.query_row(
"SELECT json_extract(attributes, '$.\"gen_ai.system\"') FROM logs WHERE timestamp = 1000",
[],
|row| row.get(0),
)
.unwrap();
let resource_match: Option<String> = conn
.query_row(
"SELECT json_extract(resource, '$.attributes.\"service.name\"') FROM logs WHERE timestamp = 1000",
[],
|row| row.get(0),
)
.unwrap();
assert_eq!(attr_match.as_deref(), Some("anthropic"));
assert_eq!(resource_match.as_deref(), Some("gateway"));
let logs = query_logs(&conn, ¶ms).unwrap();
assert_eq!(logs.len(), 1);
assert_eq!(logs[0].body, "matching log body");
}
}