use crate::error::{Result, StorageError};
use crate::{QueryParams, StorageStats};
use otelite_core::query::{Operator, QueryPredicate, QueryValue};
use otelite_core::telemetry::log::SeverityLevel;
use otelite_core::telemetry::trace::{SpanKind, SpanStatus, StatusCode};
use otelite_core::telemetry::{LogRecord, Metric, Span};
use rusqlite::{Connection, Row};
pub fn query_logs(conn: &Connection, params: &QueryParams) -> Result<Vec<LogRecord>> {
let mut query = String::from("SELECT * FROM logs WHERE 1=1");
let mut sql_params: Vec<Box<dyn rusqlite::ToSql>> = Vec::new();
if let Some(start) = params.start_time {
query.push_str(" AND timestamp >= ?");
sql_params.push(Box::new(start));
}
if let Some(end) = params.end_time {
query.push_str(" AND timestamp <= ?");
sql_params.push(Box::new(end));
}
if let Some(ref trace_id) = params.trace_id {
query.push_str(" AND trace_id = ?");
sql_params.push(Box::new(trace_id.clone()));
}
if let Some(ref span_id) = params.span_id {
query.push_str(" AND span_id = ?");
sql_params.push(Box::new(span_id.clone()));
}
if let Some(min_severity) = params.min_severity {
query.push_str(" AND severity_number >= ?");
sql_params.push(Box::new(min_severity.to_i32()));
}
if let Some(ref search) = params.search_text {
query.push_str(" AND id IN (SELECT rowid FROM logs_fts WHERE body MATCH ?)");
sql_params.push(Box::new(search.clone()));
}
append_predicates("logs", ¶ms.predicates, &mut query, &mut sql_params)?;
query.push_str(" ORDER BY timestamp DESC");
if let Some(limit) = params.limit {
query.push_str(" LIMIT ?");
sql_params.push(Box::new(limit as i64));
}
let mut stmt = conn
.prepare(&query)
.map_err(|e| StorageError::QueryError(format!("Failed to prepare query: {}", e)))?;
let param_refs: Vec<&dyn rusqlite::ToSql> = sql_params.iter().map(|p| p.as_ref()).collect();
let logs = stmt
.query_map(param_refs.as_slice(), parse_log_row)
.map_err(|e| StorageError::QueryError(format!("Failed to execute query: {}", e)))?
.collect::<std::result::Result<Vec<_>, _>>()
.map_err(|e| StorageError::QueryError(format!("Failed to parse results: {}", e)))?;
Ok(logs)
}
pub fn query_spans(conn: &Connection, params: &QueryParams) -> Result<Vec<Span>> {
let mut query = String::from("SELECT * FROM spans WHERE 1=1");
let mut sql_params: Vec<Box<dyn rusqlite::ToSql>> = Vec::new();
if let Some(start) = params.start_time {
query.push_str(" AND start_time >= ?");
sql_params.push(Box::new(start));
}
if let Some(end) = params.end_time {
query.push_str(" AND end_time <= ?");
sql_params.push(Box::new(end));
}
if let Some(ref trace_id) = params.trace_id {
query.push_str(" AND trace_id = ?");
sql_params.push(Box::new(trace_id.clone()));
}
append_predicates("spans", ¶ms.predicates, &mut query, &mut sql_params)?;
query.push_str(" ORDER BY start_time DESC");
if let Some(limit) = params.limit {
query.push_str(" LIMIT ?");
sql_params.push(Box::new(limit as i64));
}
let mut stmt = conn
.prepare(&query)
.map_err(|e| StorageError::QueryError(format!("Failed to prepare query: {}", e)))?;
let param_refs: Vec<&dyn rusqlite::ToSql> = sql_params.iter().map(|p| p.as_ref()).collect();
let spans = stmt
.query_map(param_refs.as_slice(), parse_span_row)
.map_err(|e| StorageError::QueryError(format!("Failed to execute query: {}", e)))?
.collect::<std::result::Result<Vec<_>, _>>()
.map_err(|e| StorageError::QueryError(format!("Failed to parse results: {}", e)))?;
Ok(spans)
}
pub fn query_spans_for_trace_list(
conn: &Connection,
params: &QueryParams,
trace_limit: usize,
) -> Result<Vec<Span>> {
let mut sql_params: Vec<Box<dyn rusqlite::ToSql>> = Vec::new();
let mut subquery = String::from("SELECT trace_id FROM spans WHERE 1=1");
if let Some(start) = params.start_time {
subquery.push_str(" AND start_time >= ?");
sql_params.push(Box::new(start));
}
if let Some(end) = params.end_time {
subquery.push_str(" AND end_time <= ?");
sql_params.push(Box::new(end));
}
if let Some(ref trace_id) = params.trace_id {
subquery.push_str(" AND trace_id = ?");
sql_params.push(Box::new(trace_id.clone()));
}
subquery.push_str(" GROUP BY trace_id ORDER BY MAX(start_time) DESC LIMIT ?");
sql_params.push(Box::new(trace_limit as i64));
let query = format!(
"SELECT * FROM spans WHERE trace_id IN ({}) ORDER BY start_time DESC",
subquery
);
let mut stmt = conn
.prepare(&query)
.map_err(|e| StorageError::QueryError(format!("Failed to prepare query: {}", e)))?;
let param_refs: Vec<&dyn rusqlite::ToSql> = sql_params.iter().map(|p| p.as_ref()).collect();
let spans = stmt
.query_map(param_refs.as_slice(), parse_span_row)
.map_err(|e| StorageError::QueryError(format!("Failed to execute query: {}", e)))?
.collect::<std::result::Result<Vec<_>, _>>()
.map_err(|e| StorageError::QueryError(format!("Failed to parse results: {}", e)))?;
Ok(spans)
}
pub fn query_metrics(conn: &Connection, params: &QueryParams) -> Result<Vec<Metric>> {
let mut query = String::from("SELECT * FROM metrics WHERE 1=1");
let mut sql_params: Vec<Box<dyn rusqlite::ToSql>> = Vec::new();
if let Some(start) = params.start_time {
query.push_str(" AND timestamp >= ?");
sql_params.push(Box::new(start));
}
if let Some(end) = params.end_time {
query.push_str(" AND timestamp <= ?");
sql_params.push(Box::new(end));
}
append_predicates("metrics", ¶ms.predicates, &mut query, &mut sql_params)?;
query.push_str(" ORDER BY timestamp DESC");
if let Some(limit) = params.limit {
query.push_str(" LIMIT ?");
sql_params.push(Box::new(limit as i64));
}
let mut stmt = conn
.prepare(&query)
.map_err(|e| StorageError::QueryError(format!("Failed to prepare query: {}", e)))?;
let param_refs: Vec<&dyn rusqlite::ToSql> = sql_params.iter().map(|p| p.as_ref()).collect();
let metrics = stmt
.query_map(param_refs.as_slice(), parse_metric_row)
.map_err(|e| StorageError::QueryError(format!("Failed to execute query: {}", e)))?
.collect::<std::result::Result<Vec<_>, _>>()
.map_err(|e| StorageError::QueryError(format!("Failed to parse results: {}", e)))?;
Ok(metrics)
}
pub fn query_latest_metrics(conn: &Connection, params: &QueryParams) -> Result<Vec<Metric>> {
let mut query = String::from(
"SELECT * FROM metrics WHERE rowid IN (\
SELECT rowid FROM metrics GROUP BY name HAVING timestamp = MAX(timestamp)\
) AND 1=1",
);
let mut sql_params: Vec<Box<dyn rusqlite::ToSql>> = Vec::new();
if let Some(start) = params.start_time {
query.push_str(" AND timestamp >= ?");
sql_params.push(Box::new(start));
}
if let Some(end) = params.end_time {
query.push_str(" AND timestamp <= ?");
sql_params.push(Box::new(end));
}
append_predicates("metrics", ¶ms.predicates, &mut query, &mut sql_params)?;
query.push_str(" ORDER BY name ASC");
if let Some(limit) = params.limit {
query.push_str(" LIMIT ?");
sql_params.push(Box::new(limit as i64));
}
let mut stmt = conn
.prepare(&query)
.map_err(|e| StorageError::QueryError(format!("Failed to prepare query: {}", e)))?;
let param_refs: Vec<&dyn rusqlite::ToSql> = sql_params.iter().map(|p| p.as_ref()).collect();
let metrics = stmt
.query_map(param_refs.as_slice(), parse_metric_row)
.map_err(|e| StorageError::QueryError(format!("Failed to execute query: {}", e)))?
.collect::<std::result::Result<Vec<_>, _>>()
.map_err(|e| StorageError::QueryError(format!("Failed to parse results: {}", e)))?;
Ok(metrics)
}
pub fn get_stats(conn: &Connection) -> Result<StorageStats> {
let log_count: i64 = conn
.query_row("SELECT COUNT(*) FROM logs", [], |row| row.get(0))
.map_err(|e| StorageError::QueryError(format!("Failed to count logs: {}", e)))?;
let span_count: i64 = conn
.query_row("SELECT COUNT(*) FROM spans", [], |row| row.get(0))
.map_err(|e| StorageError::QueryError(format!("Failed to count spans: {}", e)))?;
let metric_count: i64 = conn
.query_row("SELECT COUNT(*) FROM metrics", [], |row| row.get(0))
.map_err(|e| StorageError::QueryError(format!("Failed to count metrics: {}", e)))?;
let oldest_timestamp: Option<i64> = conn
.query_row(
"SELECT MIN(timestamp) FROM (
SELECT timestamp FROM logs
UNION ALL SELECT start_time as timestamp FROM spans
UNION ALL SELECT timestamp FROM metrics
)",
[],
|row| row.get(0),
)
.ok();
let newest_timestamp: Option<i64> = conn
.query_row(
"SELECT MAX(timestamp) FROM (
SELECT timestamp FROM logs
UNION ALL SELECT end_time as timestamp FROM spans
UNION ALL SELECT timestamp FROM metrics
)",
[],
|row| row.get(0),
)
.ok();
let page_count: i64 = conn
.query_row("PRAGMA page_count", [], |row| row.get(0))
.unwrap_or(0);
let page_size: i64 = conn
.query_row("PRAGMA page_size", [], |row| row.get(0))
.unwrap_or(4096);
let total_size_bytes = page_count * page_size;
Ok(StorageStats {
log_count: log_count as u64,
span_count: span_count as u64,
metric_count: metric_count as u64,
oldest_timestamp,
newest_timestamp,
storage_size_bytes: total_size_bytes as u64,
})
}
fn append_predicates(
signal_type: &str,
predicates: &[QueryPredicate],
query: &mut String,
sql_params: &mut Vec<Box<dyn rusqlite::ToSql>>,
) -> Result<()> {
for predicate in predicates {
let clause = predicate_to_sql(signal_type, predicate, sql_params)?;
query.push_str(" AND ");
query.push_str(&clause);
}
Ok(())
}
fn predicate_to_sql(
signal_type: &str,
predicate: &QueryPredicate,
sql_params: &mut Vec<Box<dyn rusqlite::ToSql>>,
) -> Result<String> {
let lhs = field_to_sql(signal_type, &predicate.field)?;
let operator = sql_operator(&predicate.operator);
match (&predicate.field[..], &predicate.operator, &predicate.value) {
("duration", op, QueryValue::Duration(value)) if signal_type == "spans" => {
sql_params.push(Box::new(*value as i64));
Ok(format!("((end_time - start_time) {} ?)", sql_operator(op)))
},
("duration", _, _) if signal_type == "spans" => Err(StorageError::QueryError(
"Structured query field 'duration' for spans requires a duration value like 500ms"
.to_string(),
)),
(_, Operator::Contains, QueryValue::String(value)) => {
sql_params.push(Box::new(format!("%{}%", value)));
Ok(format!("{} LIKE ?", lhs))
},
(_, Operator::Contains, _) => Err(StorageError::QueryError(format!(
"Structured query operator 'contains' for field '{}' requires a quoted string value",
predicate.field
))),
(_, _, QueryValue::String(value)) => {
sql_params.push(Box::new(value.clone()));
Ok(format!("{} {} ?", lhs, operator))
},
(_, _, QueryValue::Number(value)) => {
sql_params.push(Box::new(*value));
Ok(format!("{} {} ?", lhs, operator))
},
(_, _, QueryValue::Duration(value)) => {
sql_params.push(Box::new(*value as i64));
Ok(format!("{} {} ?", lhs, operator))
},
}
}
fn field_to_sql(signal_type: &str, field: &str) -> Result<String> {
let direct_column = match (signal_type, field) {
("logs", "timestamp") => Some("timestamp"),
("logs", "trace_id") => Some("trace_id"),
("logs", "span_id") => Some("span_id"),
("logs", "severity") | ("logs", "severity_number") => Some("severity_number"),
("logs", "body") => Some("body"),
("spans", "trace_id") => Some("trace_id"),
("spans", "span_id") => Some("span_id"),
("spans", "parent_span_id") => Some("parent_span_id"),
("spans", "name") => Some("name"),
("spans", "kind") => Some("kind"),
("spans", "start_time") => Some("start_time"),
("spans", "end_time") => Some("end_time"),
("metrics", "name") => Some("name"),
("metrics", "description") => Some("description"),
("metrics", "unit") => Some("unit"),
("metrics", "timestamp") => Some("timestamp"),
_ => None,
};
if let Some(column) = direct_column {
return Ok(column.to_string());
}
if let Some(attribute_field) = field.strip_prefix("attributes.") {
return Ok(format!(
"json_extract(attributes, '{}')",
json_path_for_key(attribute_field)
));
}
if let Some(resource_field) = field.strip_prefix("resource.") {
return Ok(format!(
"json_extract(resource, '$.attributes{}')",
json_key_accessor(resource_field)
));
}
Ok(format!(
"json_extract(attributes, '{}')",
json_path_for_key(field)
))
}
fn json_path_for_key(field: &str) -> String {
format!("$.\"{}\"", field)
}
fn json_key_accessor(field: &str) -> String {
format!(".\"{}\"", field)
}
fn sql_operator(operator: &Operator) -> &'static str {
match operator {
Operator::Equal => "=",
Operator::NotEqual => "!=",
Operator::GreaterThan => ">",
Operator::LessThan => "<",
Operator::GreaterThanOrEqual => ">=",
Operator::LessThanOrEqual => "<=",
Operator::Contains => "LIKE",
}
}
fn parse_log_row(row: &Row) -> rusqlite::Result<LogRecord> {
let attributes_json: String = row.get("attributes")?;
let attributes = serde_json::from_str(&attributes_json).unwrap_or_default();
let resource_json: String = row.get("resource")?;
let resource = serde_json::from_str(&resource_json).ok();
let severity_num: i32 = row.get("severity_number")?;
let severity = SeverityLevel::from_i32(severity_num).unwrap_or(SeverityLevel::Info);
Ok(LogRecord {
timestamp: row.get("timestamp")?,
observed_timestamp: row.get("observed_timestamp")?,
trace_id: row.get("trace_id")?,
span_id: row.get("span_id")?,
severity,
severity_text: row.get("severity_text")?,
body: row.get("body")?,
attributes,
resource,
})
}
fn parse_span_row(row: &Row) -> rusqlite::Result<Span> {
let attributes_json: String = row.get("attributes")?;
let attributes = serde_json::from_str(&attributes_json).unwrap_or_default();
let events_json: String = row.get("events")?;
let events = serde_json::from_str(&events_json).unwrap_or_default();
let resource_json: String = row.get("resource")?;
let resource = serde_json::from_str(&resource_json).ok();
let kind_num: i32 = row.get("kind")?;
let kind = SpanKind::from_i32(kind_num).unwrap_or(SpanKind::Internal);
let status_code_num: i32 = row.get("status_code")?;
let status_code = StatusCode::from_i32(status_code_num).unwrap_or(StatusCode::Unset);
let status = SpanStatus {
code: status_code,
message: row.get("status_message")?,
};
Ok(Span {
trace_id: row.get("trace_id")?,
span_id: row.get("span_id")?,
parent_span_id: row.get("parent_span_id")?,
name: row.get("name")?,
kind,
start_time: row.get("start_time")?,
end_time: row.get("end_time")?,
attributes,
events,
status,
resource,
})
}
fn parse_metric_row(row: &Row) -> rusqlite::Result<Metric> {
use otelite_core::telemetry::metric::MetricType;
let attributes_json: String = row.get("attributes")?;
let attributes = serde_json::from_str(&attributes_json).unwrap_or_default();
let resource_json: String = row.get("resource")?;
let resource = serde_json::from_str(&resource_json).ok();
let metric_type_int: i32 = row.get("metric_type")?;
let metric_type = match metric_type_int {
0 => {
let value: f64 = row.get("value_double")?;
MetricType::Gauge(value)
},
1 => {
let value: i64 = row.get("value_int")?;
MetricType::Counter(value as u64)
},
2 => {
let histogram_json: String = row.get("value_histogram")?;
let (count, sum, buckets) =
serde_json::from_str(&histogram_json).unwrap_or((0, 0.0, Vec::new()));
MetricType::Histogram {
count,
sum,
buckets,
}
},
3 => {
let summary_json: String = row.get("value_summary")?;
let (count, sum, quantiles) =
serde_json::from_str(&summary_json).unwrap_or((0, 0.0, Vec::new()));
MetricType::Summary {
count,
sum,
quantiles,
}
},
_ => MetricType::Gauge(0.0),
};
Ok(Metric {
name: row.get("name")?,
description: row.get("description")?,
unit: row.get("unit")?,
metric_type,
timestamp: row.get("timestamp")?,
attributes,
resource,
})
}
pub fn query_token_usage(
conn: &Connection,
start_time: Option<i64>,
end_time: Option<i64>,
) -> Result<(
otelite_core::api::TokenUsageSummary,
Vec<otelite_core::api::ModelUsage>,
Vec<otelite_core::api::SystemUsage>,
)> {
let mut where_clause = String::from(
"WHERE (json_extract(attributes, '$.\"gen_ai.system\"') IS NOT NULL \
OR json_extract(attributes, '$.\"gen_ai.provider.name\"') IS NOT NULL)",
);
let mut params: Vec<Box<dyn rusqlite::ToSql>> = Vec::new();
if let Some(start) = start_time {
where_clause.push_str(" AND start_time >= ?");
params.push(Box::new(start));
}
if let Some(end) = end_time {
where_clause.push_str(" AND end_time <= ?");
params.push(Box::new(end));
}
let summary_query = format!(
"SELECT
COALESCE(SUM(CAST(json_extract(attributes, '$.\"gen_ai.usage.input_tokens\"') AS INTEGER)), 0) as total_input,
COALESCE(SUM(CAST(json_extract(attributes, '$.\"gen_ai.usage.output_tokens\"') AS INTEGER)), 0) as total_output,
COUNT(*) as total_requests,
COALESCE(SUM(CAST(json_extract(attributes, '$.\"gen_ai.usage.cache_creation.input_tokens\"') AS INTEGER)), 0) as cache_creation,
COALESCE(SUM(CAST(json_extract(attributes, '$.\"gen_ai.usage.cache_read.input_tokens\"') AS INTEGER)), 0) as cache_read
FROM spans
{}",
where_clause
);
let param_refs: Vec<&dyn rusqlite::ToSql> = params.iter().map(|p| p.as_ref()).collect();
let summary = conn
.query_row(&summary_query, param_refs.as_slice(), |row| {
Ok(otelite_core::api::TokenUsageSummary {
total_input_tokens: row.get::<_, i64>(0)? as u64,
total_output_tokens: row.get::<_, i64>(1)? as u64,
total_requests: row.get::<_, i64>(2)? as usize,
total_cache_creation_tokens: row.get::<_, i64>(3)? as u64,
total_cache_read_tokens: row.get::<_, i64>(4)? as u64,
})
})
.map_err(|e| StorageError::QueryError(format!("Failed to query token summary: {}", e)))?;
let model_query = format!(
"SELECT
json_extract(attributes, '$.\"gen_ai.request.model\"') as model,
COALESCE(SUM(CAST(json_extract(attributes, '$.\"gen_ai.usage.input_tokens\"') AS INTEGER)), 0) as input_tokens,
COALESCE(SUM(CAST(json_extract(attributes, '$.\"gen_ai.usage.output_tokens\"') AS INTEGER)), 0) as output_tokens,
COUNT(*) as requests
FROM spans
{}
GROUP BY model
HAVING model IS NOT NULL
ORDER BY input_tokens + output_tokens DESC",
where_clause
);
let mut stmt = conn
.prepare(&model_query)
.map_err(|e| StorageError::QueryError(format!("Failed to prepare model query: {}", e)))?;
let param_refs: Vec<&dyn rusqlite::ToSql> = params.iter().map(|p| p.as_ref()).collect();
let by_model = stmt
.query_map(param_refs.as_slice(), |row| {
Ok(otelite_core::api::ModelUsage {
model: row.get(0)?,
input_tokens: row.get::<_, i64>(1)? as u64,
output_tokens: row.get::<_, i64>(2)? as u64,
requests: row.get::<_, i64>(3)? as usize,
})
})
.map_err(|e| StorageError::QueryError(format!("Failed to execute model query: {}", e)))?
.collect::<std::result::Result<Vec<_>, _>>()
.map_err(|e| StorageError::QueryError(format!("Failed to parse model results: {}", e)))?;
let system_query = format!(
"SELECT
COALESCE(json_extract(attributes, '$.\"gen_ai.provider.name\"'), json_extract(attributes, '$.\"gen_ai.system\"')) as system,
COALESCE(SUM(CAST(json_extract(attributes, '$.\"gen_ai.usage.input_tokens\"') AS INTEGER)), 0) as input_tokens,
COALESCE(SUM(CAST(json_extract(attributes, '$.\"gen_ai.usage.output_tokens\"') AS INTEGER)), 0) as output_tokens,
COUNT(*) as requests
FROM spans
{}
GROUP BY system
HAVING system IS NOT NULL
ORDER BY input_tokens + output_tokens DESC",
where_clause
);
let mut stmt = conn
.prepare(&system_query)
.map_err(|e| StorageError::QueryError(format!("Failed to prepare system query: {}", e)))?;
let param_refs: Vec<&dyn rusqlite::ToSql> = params.iter().map(|p| p.as_ref()).collect();
let by_system = stmt
.query_map(param_refs.as_slice(), |row| {
Ok(otelite_core::api::SystemUsage {
system: row.get(0)?,
input_tokens: row.get::<_, i64>(1)? as u64,
output_tokens: row.get::<_, i64>(2)? as u64,
requests: row.get::<_, i64>(3)? as usize,
})
})
.map_err(|e| StorageError::QueryError(format!("Failed to execute system query: {}", e)))?
.collect::<std::result::Result<Vec<_>, _>>()
.map_err(|e| StorageError::QueryError(format!("Failed to parse system results: {}", e)))?;
Ok((summary, by_model, by_system))
}
pub fn distinct_resource_keys(conn: &Connection, signal: &str) -> Result<Vec<String>> {
let table = match signal {
"logs" => "logs",
"spans" => "spans",
"metrics" => "metrics",
other => {
return Err(StorageError::QueryError(format!(
"Unknown signal type: {}",
other
)));
},
};
let sql = format!(
"SELECT DISTINCT je.key \
FROM {table}, json_each(json_extract({table}.resource, '$.attributes')) je \
WHERE {table}.resource IS NOT NULL \
AND json_extract({table}.resource, '$.attributes') IS NOT NULL \
LIMIT 50"
);
let mut stmt = conn
.prepare(&sql)
.map_err(|e| StorageError::QueryError(format!("Failed to prepare query: {}", e)))?;
let keys = stmt
.query_map([], |row| row.get::<_, String>(0))
.map_err(|e| StorageError::QueryError(format!("Failed to execute query: {}", e)))?
.filter_map(|r| r.ok())
.collect();
Ok(keys)
}
#[cfg(test)]
mod tests {
use super::*;
use crate::sqlite::schema;
fn setup_test_db() -> Connection {
let conn = Connection::open_in_memory().unwrap();
schema::initialize_schema(&conn).unwrap();
conn
}
#[test]
fn test_query_logs_empty() {
let conn = setup_test_db();
let params = QueryParams::default();
let logs = query_logs(&conn, ¶ms).unwrap();
assert_eq!(logs.len(), 0);
}
#[test]
fn test_get_stats_empty() {
let conn = setup_test_db();
let stats = get_stats(&conn).unwrap();
assert_eq!(stats.log_count, 0);
assert_eq!(stats.span_count, 0);
assert_eq!(stats.metric_count, 0);
}
#[test]
fn test_field_to_sql_for_attribute_field() {
let sql = field_to_sql("logs", "gen_ai.system").unwrap();
assert_eq!(sql, "json_extract(attributes, '$.\"gen_ai.system\"')");
}
#[test]
fn test_field_to_sql_for_explicit_attribute_prefix() {
let sql = field_to_sql("logs", "attributes.http.method").unwrap();
assert_eq!(sql, "json_extract(attributes, '$.\"http.method\"')");
}
#[test]
fn test_field_to_sql_for_resource_prefix() {
let sql = field_to_sql("logs", "resource.service.name").unwrap();
assert_eq!(
sql,
"json_extract(resource, '$.attributes.\"service.name\"')"
);
}
#[test]
fn test_json_key_accessor_quotes_dotted_keys() {
assert_eq!(json_key_accessor("service.name"), ".\"service.name\"");
}
#[test]
fn test_predicate_to_sql_for_attribute_equality() {
let mut params: Vec<Box<dyn rusqlite::ToSql>> = Vec::new();
let sql = predicate_to_sql(
"logs",
&QueryPredicate {
field: "gen_ai.system".to_string(),
operator: Operator::Equal,
value: QueryValue::String("anthropic".to_string()),
},
&mut params,
)
.unwrap();
assert_eq!(sql, "json_extract(attributes, '$.\"gen_ai.system\"') = ?");
assert_eq!(params.len(), 1);
}
#[test]
fn test_predicate_to_sql_for_resource_equality() {
let mut params: Vec<Box<dyn rusqlite::ToSql>> = Vec::new();
let sql = predicate_to_sql(
"logs",
&QueryPredicate {
field: "resource.service.name".to_string(),
operator: Operator::Equal,
value: QueryValue::String("gateway".to_string()),
},
&mut params,
)
.unwrap();
assert_eq!(
sql,
"json_extract(resource, '$.attributes.\"service.name\"') = ?"
);
assert_eq!(params.len(), 1);
}
#[test]
fn test_span_duration_predicate_requires_duration_value() {
let mut params = Vec::new();
let err = predicate_to_sql(
"spans",
&QueryPredicate {
field: "duration".to_string(),
operator: Operator::GreaterThan,
value: QueryValue::Number(100.0),
},
&mut params,
)
.unwrap_err();
assert!(err
.to_string()
.contains("requires a duration value like 500ms"));
}
#[test]
fn test_query_logs_with_structured_attribute_and_resource_predicates() {
let conn = setup_test_db();
conn.execute(
"INSERT INTO logs (
timestamp, observed_timestamp, trace_id, span_id,
severity_number, severity_text, body, attributes, resource, scope
) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10)",
rusqlite::params![
1000_i64,
1000_i64,
"trace-a",
"span-a",
SeverityLevel::Info.to_i32(),
"INFO",
"matching log body",
r#"{"gen_ai.system":"anthropic"}"#,
r#"{"attributes":{"service.name":"gateway"}}"#,
"{}",
],
)
.unwrap();
let params = QueryParams {
predicates: vec![
QueryPredicate {
field: "gen_ai.system".to_string(),
operator: Operator::Equal,
value: QueryValue::String("anthropic".to_string()),
},
QueryPredicate {
field: "resource.service.name".to_string(),
operator: Operator::Equal,
value: QueryValue::String("gateway".to_string()),
},
],
..Default::default()
};
let attr_match: Option<String> = conn
.query_row(
"SELECT json_extract(attributes, '$.\"gen_ai.system\"') FROM logs WHERE timestamp = 1000",
[],
|row| row.get(0),
)
.unwrap();
let resource_match: Option<String> = conn
.query_row(
"SELECT json_extract(resource, '$.attributes.\"service.name\"') FROM logs WHERE timestamp = 1000",
[],
|row| row.get(0),
)
.unwrap();
assert_eq!(attr_match.as_deref(), Some("anthropic"));
assert_eq!(resource_match.as_deref(), Some("gateway"));
let logs = query_logs(&conn, ¶ms).unwrap();
assert_eq!(logs.len(), 1);
assert_eq!(logs[0].body, "matching log body");
}
}