1use axum::{
12 Json,
13 extract::{Query, State},
14 http::{StatusCode, header},
15 response::{IntoResponse, Response},
16};
17use serde::{Deserialize, Serialize};
18
19use logdive_core::{LogEntry, LogFormat, Stats, execute, parse_query};
20
21use crate::error::AppError;
22use crate::state::AppState;
23
24#[derive(Debug, Deserialize)]
34pub struct QueryParams {
35 pub q: Option<String>,
36 pub limit: Option<usize>,
37}
38
39const DEFAULT_LIMIT: usize = 1000;
42
43pub async fn query_handler(
48 State(state): State<AppState>,
49 Query(params): Query<QueryParams>,
50) -> Result<Response, AppError> {
51 let query_str = params
53 .q
54 .map(|s| s.trim().to_string())
55 .filter(|s| !s.is_empty())
56 .ok_or_else(|| AppError::bad_request("missing or empty `q` parameter"))?;
57
58 let limit = match params.limit.unwrap_or(DEFAULT_LIMIT) {
60 0 => None,
61 n => Some(n),
62 };
63
64 let ast = parse_query(&query_str)?;
67
68 tracing::debug!(query = %query_str, ?limit, "executing query over HTTP");
69
70 let rows: Vec<LogEntry> = state
71 .with_connection(move |indexer| execute(&ast, indexer.connection(), limit))
72 .await?;
73
74 tracing::debug!(
75 result_count = rows.len(),
76 "query returned results over HTTP"
77 );
78
79 Ok(build_ndjson_response(&rows))
80}
81
82fn build_ndjson_response(rows: &[LogEntry]) -> Response {
90 let mut body = String::with_capacity(rows.len() * 256);
91 for row in rows {
92 body.push_str(&entry_to_json_string(row));
93 body.push('\n');
94 }
95
96 (
97 StatusCode::OK,
98 [(header::CONTENT_TYPE, "application/x-ndjson")],
99 body,
100 )
101 .into_response()
102}
103
104fn entry_to_json_string(entry: &LogEntry) -> String {
111 serde_json::to_string(entry).unwrap_or_else(|_| "{}".to_string())
112}
113
114#[derive(Debug, Serialize)]
125pub struct StatsResponse {
126 pub entries: u64,
127 pub min_timestamp: Option<String>,
128 pub max_timestamp: Option<String>,
129 pub tags: Vec<Option<String>>,
135 pub db_size_bytes: u64,
136 pub db_path: String,
137}
138
139impl StatsResponse {
140 fn from_core(stats: Stats, db_path: String, db_size_bytes: u64) -> Self {
141 Self {
142 entries: stats.entries,
143 min_timestamp: stats.min_timestamp,
144 max_timestamp: stats.max_timestamp,
145 tags: stats.tags,
146 db_size_bytes,
147 db_path,
148 }
149 }
150}
151
152pub async fn stats_handler(State(state): State<AppState>) -> Result<Json<StatsResponse>, AppError> {
156 let db_path_for_response = state.db_path.display().to_string();
157 let db_path_for_closure = state.db_path.clone();
158
159 let (stats, size_bytes) = state
160 .with_connection(move |indexer| {
161 let stats = indexer.stats()?;
164 let size_bytes = std::fs::metadata(&db_path_for_closure)
165 .map(|m| m.len())
166 .map_err(|e| logdive_core::LogdiveError::io_at(&db_path_for_closure, e))?;
167 Ok::<_, logdive_core::LogdiveError>((stats, size_bytes))
168 })
169 .await?;
170
171 let response = StatsResponse::from_core(stats, db_path_for_response, size_bytes);
172 Ok(Json(response))
173}
174
175#[derive(Debug, Serialize)]
186pub struct VersionResponse {
187 pub version: &'static str,
190 pub formats: Vec<&'static str>,
194 pub capabilities: Vec<&'static str>,
196}
197
198pub async fn version_handler() -> Json<VersionResponse> {
204 Json(VersionResponse {
205 version: env!("CARGO_PKG_VERSION"),
206 formats: LogFormat::ALL.iter().map(|f| f.name()).collect(),
207 capabilities: vec!["query", "stats", "version"],
208 })
209}
210
211#[cfg(test)]
216mod tests {
217 use super::*;
218 use serde_json::Value;
219
220 fn sample_entry() -> LogEntry {
221 let raw = r#"{"timestamp":"2026-04-22T14:03:21Z","level":"error","message":"boom"}"#;
222 let mut e = LogEntry::new(raw.to_string());
223 e.timestamp = Some("2026-04-22T14:03:21Z".to_string());
224 e.level = Some("error".to_string());
225 e.message = Some("boom".to_string());
226 e
227 }
228
229 #[test]
230 fn entry_to_json_string_is_valid_json_with_expected_shape() {
231 let e = sample_entry();
232 let s = entry_to_json_string(&e);
233 let v: Value = serde_json::from_str(&s).expect("valid json");
234 assert_eq!(v["timestamp"], "2026-04-22T14:03:21Z");
235 assert_eq!(v["level"], "error");
236 assert_eq!(v["message"], "boom");
237 assert!(v["tag"].is_null());
238 assert!(v["fields"].is_object());
239 assert!(v["raw"].is_string());
240 }
241
242 #[test]
243 fn entry_to_json_string_preserves_tag_and_fields() {
244 let mut e = sample_entry();
245 e.tag = Some("api".to_string());
246 e.fields
247 .insert("service".to_string(), Value::String("payments".to_string()));
248
249 let s = entry_to_json_string(&e);
250 let v: Value = serde_json::from_str(&s).unwrap();
251 assert_eq!(v["tag"], "api");
252 assert_eq!(v["fields"]["service"], "payments");
253 }
254
255 #[test]
256 fn stats_response_from_core_round_trips_all_fields() {
257 let mut idx = logdive_core::Indexer::open_in_memory().unwrap();
258 let mut e = sample_entry();
259 e.tag = Some("api".to_string());
260 idx.insert_batch(&[e]).unwrap();
261
262 let stats = idx.stats().unwrap();
263 let resp = StatsResponse::from_core(stats, "/tmp/idx.db".to_string(), 4096);
264
265 assert_eq!(resp.entries, 1);
266 assert_eq!(resp.db_path, "/tmp/idx.db");
267 assert_eq!(resp.db_size_bytes, 4096);
268 assert_eq!(resp.tags, vec![Some("api".to_string())]);
269
270 let v = serde_json::to_value(&resp).unwrap();
271 assert_eq!(v["entries"], 1);
272 assert_eq!(v["db_path"], "/tmp/idx.db");
273 assert_eq!(v["db_size_bytes"], 4096);
274 assert_eq!(v["tags"][0], "api");
275 }
276
277 #[test]
278 fn stats_response_renders_null_for_empty_time_bounds() {
279 let idx = logdive_core::Indexer::open_in_memory().unwrap();
280 let stats = idx.stats().unwrap();
281 let resp = StatsResponse::from_core(stats, "/tmp/empty.db".to_string(), 0);
282
283 let v = serde_json::to_value(&resp).unwrap();
284 assert!(v["min_timestamp"].is_null());
285 assert!(v["max_timestamp"].is_null());
286 assert_eq!(v["tags"].as_array().unwrap().len(), 0);
287 }
288
289 #[test]
290 fn build_ndjson_response_sets_correct_content_type() {
291 let mut e = sample_entry();
292 e.tag = Some("api".to_string());
293 let resp = build_ndjson_response(&[e]);
294
295 assert_eq!(resp.status(), StatusCode::OK);
296 let content_type = resp
297 .headers()
298 .get(header::CONTENT_TYPE)
299 .and_then(|v| v.to_str().ok())
300 .unwrap_or_default();
301 assert_eq!(content_type, "application/x-ndjson");
302 }
303
304 #[test]
305 fn build_ndjson_response_is_ok_for_empty_results() {
306 let resp = build_ndjson_response(&[]);
307 assert_eq!(resp.status(), StatusCode::OK);
308 }
309
310 #[tokio::test]
313 async fn version_handler_returns_current_package_version() {
314 let Json(resp) = version_handler().await;
315 assert_eq!(resp.version, env!("CARGO_PKG_VERSION"));
316 }
317
318 #[tokio::test]
319 async fn version_handler_formats_match_logformat_all() {
320 let Json(resp) = version_handler().await;
321 let expected: Vec<&'static str> = LogFormat::ALL.iter().map(|f| f.name()).collect();
322 assert_eq!(resp.formats, expected);
323 }
324
325 #[tokio::test]
326 async fn version_handler_capabilities_are_sorted_and_complete() {
327 let Json(resp) = version_handler().await;
328 assert_eq!(resp.capabilities, vec!["query", "stats", "version"]);
329 let mut sorted = resp.capabilities.clone();
331 sorted.sort_unstable();
332 assert_eq!(
333 resp.capabilities, sorted,
334 "capabilities must be sorted alphabetically"
335 );
336 }
337
338 #[test]
339 fn version_response_serializes_to_expected_json_shape() {
340 let resp = VersionResponse {
341 version: "0.2.0",
342 formats: vec!["json", "logfmt", "plain"],
343 capabilities: vec!["query", "stats", "version"],
344 };
345 let v = serde_json::to_value(&resp).unwrap();
346 assert_eq!(v["version"], "0.2.0");
347 assert_eq!(v["formats"], serde_json::json!(["json", "logfmt", "plain"]));
348 assert_eq!(
349 v["capabilities"],
350 serde_json::json!(["query", "stats", "version"])
351 );
352 }
353}