1use axum::{
12 Json,
13 extract::{Query, State},
14 http::{StatusCode, header},
15 response::{IntoResponse, Response},
16};
17use serde::{Deserialize, Serialize};
18
19use logdive_core::{LogEntry, LogFormat, QueryOptions, Stats, execute, parse_query};
20
21use crate::error::AppError;
22use crate::state::AppState;
23
24#[derive(Debug, Deserialize)]
37pub struct QueryParams {
38 pub q: Option<String>,
39 pub limit: Option<usize>,
40 pub offset: Option<usize>,
41}
42
43const DEFAULT_LIMIT: usize = 1000;
46
47pub async fn query_handler(
53 State(state): State<AppState>,
54 Query(params): Query<QueryParams>,
55) -> Result<Response, AppError> {
56 let query_str = params
58 .q
59 .map(|s| s.trim().to_string())
60 .filter(|s| !s.is_empty())
61 .ok_or_else(|| AppError::bad_request("missing or empty `q` parameter"))?;
62
63 let limit = match params.limit.unwrap_or(DEFAULT_LIMIT) {
65 0 => None,
66 n => Some(n),
67 };
68 let offset = match params.offset.unwrap_or(0) {
69 0 => None,
70 n => Some(n),
71 };
72
73 let ast = parse_query(&query_str)?;
76
77 tracing::debug!(query = %query_str, ?limit, ?offset, "executing query over HTTP");
78
79 let rows: Vec<LogEntry> = state
80 .with_connection(move |indexer| {
81 execute(&ast, indexer.connection(), QueryOptions { limit, offset })
82 })
83 .await?;
84
85 tracing::debug!(
86 result_count = rows.len(),
87 "query returned results over HTTP"
88 );
89
90 Ok(build_ndjson_response(&rows))
91}
92
93fn build_ndjson_response(rows: &[LogEntry]) -> Response {
101 let mut body = String::with_capacity(rows.len() * 256);
102 for row in rows {
103 body.push_str(&entry_to_json_string(row));
104 body.push('\n');
105 }
106
107 (
108 StatusCode::OK,
109 [(header::CONTENT_TYPE, "application/x-ndjson")],
110 body,
111 )
112 .into_response()
113}
114
115fn entry_to_json_string(entry: &LogEntry) -> String {
122 serde_json::to_string(entry).unwrap_or_else(|_| "{}".to_string())
123}
124
125#[derive(Debug, Serialize)]
136pub struct StatsResponse {
137 pub entries: u64,
138 pub min_timestamp: Option<String>,
139 pub max_timestamp: Option<String>,
140 pub tags: Vec<Option<String>>,
146 pub db_size_bytes: u64,
147 pub db_path: String,
148}
149
150impl StatsResponse {
151 fn from_core(stats: Stats, db_path: String, db_size_bytes: u64) -> Self {
152 Self {
153 entries: stats.entries,
154 min_timestamp: stats.min_timestamp,
155 max_timestamp: stats.max_timestamp,
156 tags: stats.tags,
157 db_size_bytes,
158 db_path,
159 }
160 }
161}
162
163pub async fn stats_handler(State(state): State<AppState>) -> Result<Json<StatsResponse>, AppError> {
167 let db_path_for_response = state.db_path.display().to_string();
168 let db_path_for_closure = state.db_path.clone();
169
170 let (stats, size_bytes) = state
171 .with_connection(move |indexer| {
172 let stats = indexer.stats()?;
175 let size_bytes = std::fs::metadata(&db_path_for_closure)
176 .map(|m| m.len())
177 .map_err(|e| logdive_core::LogdiveError::io_at(&db_path_for_closure, e))?;
178 Ok::<_, logdive_core::LogdiveError>((stats, size_bytes))
179 })
180 .await?;
181
182 let response = StatsResponse::from_core(stats, db_path_for_response, size_bytes);
183 Ok(Json(response))
184}
185
186#[derive(Debug, Serialize)]
197pub struct VersionResponse {
198 pub version: &'static str,
201 pub formats: Vec<&'static str>,
205 pub capabilities: Vec<&'static str>,
207}
208
209pub async fn version_handler() -> Json<VersionResponse> {
215 Json(VersionResponse {
216 version: env!("CARGO_PKG_VERSION"),
217 formats: LogFormat::ALL.iter().map(|f| f.name()).collect(),
218 capabilities: vec!["query", "stats", "version"],
219 })
220}
221
222#[cfg(test)]
227mod tests {
228 use super::*;
229 use serde_json::Value;
230
231 fn sample_entry() -> LogEntry {
232 let raw = r#"{"timestamp":"2026-04-22T14:03:21Z","level":"error","message":"boom"}"#;
233 let mut e = LogEntry::new(raw.to_string());
234 e.timestamp = Some("2026-04-22T14:03:21Z".to_string());
235 e.level = Some("error".to_string());
236 e.message = Some("boom".to_string());
237 e
238 }
239
240 #[test]
241 fn entry_to_json_string_is_valid_json_with_expected_shape() {
242 let e = sample_entry();
243 let s = entry_to_json_string(&e);
244 let v: Value = serde_json::from_str(&s).expect("valid json");
245 assert_eq!(v["timestamp"], "2026-04-22T14:03:21Z");
246 assert_eq!(v["level"], "error");
247 assert_eq!(v["message"], "boom");
248 assert!(v["tag"].is_null());
249 assert!(v["fields"].is_object());
250 assert!(v["raw"].is_string());
251 }
252
253 #[test]
254 fn entry_to_json_string_preserves_tag_and_fields() {
255 let mut e = sample_entry();
256 e.tag = Some("api".to_string());
257 e.fields
258 .insert("service".to_string(), Value::String("payments".to_string()));
259
260 let s = entry_to_json_string(&e);
261 let v: Value = serde_json::from_str(&s).unwrap();
262 assert_eq!(v["tag"], "api");
263 assert_eq!(v["fields"]["service"], "payments");
264 }
265
266 #[test]
267 fn stats_response_from_core_round_trips_all_fields() {
268 let mut idx = logdive_core::Indexer::open_in_memory().unwrap();
269 let mut e = sample_entry();
270 e.tag = Some("api".to_string());
271 idx.insert_batch(&[e]).unwrap();
272
273 let stats = idx.stats().unwrap();
274 let resp = StatsResponse::from_core(stats, "/tmp/idx.db".to_string(), 4096);
275
276 assert_eq!(resp.entries, 1);
277 assert_eq!(resp.db_path, "/tmp/idx.db");
278 assert_eq!(resp.db_size_bytes, 4096);
279 assert_eq!(resp.tags, vec![Some("api".to_string())]);
280
281 let v = serde_json::to_value(&resp).unwrap();
282 assert_eq!(v["entries"], 1);
283 assert_eq!(v["db_path"], "/tmp/idx.db");
284 assert_eq!(v["db_size_bytes"], 4096);
285 assert_eq!(v["tags"][0], "api");
286 }
287
288 #[test]
289 fn stats_response_renders_null_for_empty_time_bounds() {
290 let idx = logdive_core::Indexer::open_in_memory().unwrap();
291 let stats = idx.stats().unwrap();
292 let resp = StatsResponse::from_core(stats, "/tmp/empty.db".to_string(), 0);
293
294 let v = serde_json::to_value(&resp).unwrap();
295 assert!(v["min_timestamp"].is_null());
296 assert!(v["max_timestamp"].is_null());
297 assert_eq!(v["tags"].as_array().unwrap().len(), 0);
298 }
299
300 #[test]
301 fn build_ndjson_response_sets_correct_content_type() {
302 let mut e = sample_entry();
303 e.tag = Some("api".to_string());
304 let resp = build_ndjson_response(&[e]);
305
306 assert_eq!(resp.status(), StatusCode::OK);
307 let content_type = resp
308 .headers()
309 .get(header::CONTENT_TYPE)
310 .and_then(|v| v.to_str().ok())
311 .unwrap_or_default();
312 assert_eq!(content_type, "application/x-ndjson");
313 }
314
315 #[test]
316 fn build_ndjson_response_is_ok_for_empty_results() {
317 let resp = build_ndjson_response(&[]);
318 assert_eq!(resp.status(), StatusCode::OK);
319 }
320
321 #[tokio::test]
324 async fn version_handler_returns_current_package_version() {
325 let Json(resp) = version_handler().await;
326 assert_eq!(resp.version, env!("CARGO_PKG_VERSION"));
327 }
328
329 #[tokio::test]
330 async fn version_handler_formats_match_logformat_all() {
331 let Json(resp) = version_handler().await;
332 let expected: Vec<&'static str> = LogFormat::ALL.iter().map(|f| f.name()).collect();
333 assert_eq!(resp.formats, expected);
334 }
335
336 #[tokio::test]
337 async fn version_handler_capabilities_are_sorted_and_complete() {
338 let Json(resp) = version_handler().await;
339 assert_eq!(resp.capabilities, vec!["query", "stats", "version"]);
340 let mut sorted = resp.capabilities.clone();
342 sorted.sort_unstable();
343 assert_eq!(
344 resp.capabilities, sorted,
345 "capabilities must be sorted alphabetically"
346 );
347 }
348
349 #[test]
350 fn version_response_serializes_to_expected_json_shape() {
351 let resp = VersionResponse {
352 version: "0.2.0",
353 formats: vec!["json", "logfmt", "plain"],
354 capabilities: vec!["query", "stats", "version"],
355 };
356 let v = serde_json::to_value(&resp).unwrap();
357 assert_eq!(v["version"], "0.2.0");
358 assert_eq!(v["formats"], serde_json::json!(["json", "logfmt", "plain"]));
359 assert_eq!(
360 v["capabilities"],
361 serde_json::json!(["query", "stats", "version"])
362 );
363 }
364}