Skip to main content

logdive_api/
handlers.rs

1//! Axum handlers for the three public endpoints.
2//!
3//! All handlers are async, but every one that touches the database
4//! delegates SQLite work to [`AppState::with_connection`], which runs on
5//! Tokio's blocking-task pool. The handlers themselves do parsing,
6//! parameter validation, and response shaping — nothing else.
7//!
8//! `GET /version` is the exception: it returns compile-time constants only
9//! and never opens the database.
10
11use axum::{
12    Json,
13    extract::{Query, State},
14    http::{StatusCode, header},
15    response::{IntoResponse, Response},
16};
17use serde::{Deserialize, Serialize};
18
19use logdive_core::{LogEntry, LogFormat, Stats, execute, parse_query};
20
21use crate::error::AppError;
22use crate::state::AppState;
23
24// ---------------------------------------------------------------------------
25// GET /query
26// ---------------------------------------------------------------------------
27
28/// Query-string parameters accepted by `GET /query`.
29///
30/// `q` is modeled as `Option<String>` rather than a required field so the
31/// "missing `q`" error message comes from our own `AppError::bad_request`
32/// path rather than from Axum's generic extractor rejection.
33#[derive(Debug, Deserialize)]
34pub struct QueryParams {
35    pub q: Option<String>,
36    pub limit: Option<usize>,
37}
38
39/// Default cap on result set size when `limit` is not supplied. Mirrors
40/// the CLI's `--limit` default so the two surfaces behave identically.
41const DEFAULT_LIMIT: usize = 1000;
42
43/// `GET /query?q=<expr>&limit=<n>`
44///
45/// Returns matching log entries as newline-delimited JSON, one entry per
46/// line. A missing `limit` defaults to 1000; `limit=0` means unlimited.
47pub async fn query_handler(
48    State(state): State<AppState>,
49    Query(params): Query<QueryParams>,
50) -> Result<Response, AppError> {
51    // Validate: `q` is required and non-empty (post-trim).
52    let query_str = params
53        .q
54        .map(|s| s.trim().to_string())
55        .filter(|s| !s.is_empty())
56        .ok_or_else(|| AppError::bad_request("missing or empty `q` parameter"))?;
57
58    // Apply the same "0 = unlimited" rule as the CLI.
59    let limit = match params.limit.unwrap_or(DEFAULT_LIMIT) {
60        0 => None,
61        n => Some(n),
62    };
63
64    // Parse the query up-front so parse errors are classified by the
65    // `From<LogdiveError>` impl before we touch the DB.
66    let ast = parse_query(&query_str)?;
67
68    tracing::debug!(query = %query_str, ?limit, "executing query over HTTP");
69
70    let rows: Vec<LogEntry> = state
71        .with_connection(move |indexer| execute(&ast, indexer.connection(), limit))
72        .await?;
73
74    tracing::debug!(
75        result_count = rows.len(),
76        "query returned results over HTTP"
77    );
78
79    Ok(build_ndjson_response(&rows))
80}
81
82/// Serialize a slice of entries into a newline-delimited JSON body and
83/// wrap it in a response with the correct `Content-Type`.
84///
85/// Empty result sets produce an empty body (zero bytes, zero lines),
86/// which is valid NDJSON and pipeline-friendly. Status is 200 OK in
87/// both the empty and non-empty cases — "no matches" is a successful
88/// query, not a not-found.
89fn build_ndjson_response(rows: &[LogEntry]) -> Response {
90    let mut body = String::with_capacity(rows.len() * 256);
91    for row in rows {
92        body.push_str(&entry_to_json_string(row));
93        body.push('\n');
94    }
95
96    (
97        StatusCode::OK,
98        [(header::CONTENT_TYPE, "application/x-ndjson")],
99        body,
100    )
101        .into_response()
102}
103
104/// Render a single `LogEntry` as a JSON object string.
105///
106/// The shape matches the CLI's `--format json` output so clients can treat
107/// both surfaces interchangeably: `timestamp`, `level`, `message`, `tag`,
108/// `fields`, `raw`. `tag` is `null` when absent; `fields` is always an
109/// object.
110fn entry_to_json_string(entry: &LogEntry) -> String {
111    // Using `serde_json::json!` rather than `#[derive(Serialize)]` on
112    // `LogEntry` keeps the serde annotation burden out of core. This
113    // module owns the HTTP wire shape.
114    let value = serde_json::json!({
115        "timestamp": entry.timestamp,
116        "level":     entry.level,
117        "message":   entry.message,
118        "tag":       entry.tag,
119        "fields":    serde_json::Value::Object(entry.fields.clone()),
120        "raw":       entry.raw,
121    });
122    value.to_string()
123}
124
125// ---------------------------------------------------------------------------
126// GET /stats
127// ---------------------------------------------------------------------------
128
129/// Wire shape for `GET /stats` responses.
130///
131/// Intentionally decoupled from `logdive_core::Stats` so the library stays
132/// serde-agnostic on its public output types. Renaming core fields is
133/// then a non-breaking change for HTTP clients; conversely, the HTTP
134/// shape can evolve without touching core.
135#[derive(Debug, Serialize)]
136pub struct StatsResponse {
137    pub entries: u64,
138    pub min_timestamp: Option<String>,
139    pub max_timestamp: Option<String>,
140    /// Distinct tag values. `None` (untagged) appears first when present,
141    /// then non-null tags in ascending alphabetical order — identical to
142    /// the `Stats.tags` contract from core. Clients presenting this to
143    /// humans can reshuffle (e.g. put `null` last, label it "(untagged)")
144    /// the way the CLI does.
145    pub tags: Vec<Option<String>>,
146    pub db_size_bytes: u64,
147    pub db_path: String,
148}
149
150impl StatsResponse {
151    fn from_core(stats: Stats, db_path: String, db_size_bytes: u64) -> Self {
152        Self {
153            entries: stats.entries,
154            min_timestamp: stats.min_timestamp,
155            max_timestamp: stats.max_timestamp,
156            tags: stats.tags,
157            db_size_bytes,
158            db_path,
159        }
160    }
161}
162
163/// `GET /stats`
164///
165/// Returns aggregate metadata about the index as a single JSON object.
166pub async fn stats_handler(State(state): State<AppState>) -> Result<Json<StatsResponse>, AppError> {
167    let db_path_for_response = state.db_path.display().to_string();
168    let db_path_for_closure = state.db_path.clone();
169
170    let (stats, size_bytes) = state
171        .with_connection(move |indexer| {
172            // Run both queries under the same blocking task so we don't
173            // round-trip back to the async runtime between them.
174            let stats = indexer.stats()?;
175            let size_bytes = std::fs::metadata(&db_path_for_closure)
176                .map(|m| m.len())
177                .map_err(|e| logdive_core::LogdiveError::io_at(&db_path_for_closure, e))?;
178            Ok::<_, logdive_core::LogdiveError>((stats, size_bytes))
179        })
180        .await?;
181
182    let response = StatsResponse::from_core(stats, db_path_for_response, size_bytes);
183    Ok(Json(response))
184}
185
186// ---------------------------------------------------------------------------
187// GET /version
188// ---------------------------------------------------------------------------
189
190/// Wire shape for `GET /version` responses.
191///
192/// All fields are compile-time constants — no database access is required.
193/// The primary use case is client-side feature detection: a UI or script
194/// calls `/version` on startup to discover which formats and endpoints the
195/// running server supports before making assumptions about its capabilities.
196#[derive(Debug, Serialize)]
197pub struct VersionResponse {
198    /// Semver version of the running `logdive-api` binary, injected from
199    /// `CARGO_PKG_VERSION` at compile time.
200    pub version: &'static str,
201    /// Ingest formats the binary was compiled with, sourced from
202    /// [`LogFormat::ALL`]. Stays in sync with core without manual
203    /// maintenance — adding a new format to core propagates here for free.
204    pub formats: Vec<&'static str>,
205    /// API endpoint names available on this server, sorted alphabetically.
206    pub capabilities: Vec<&'static str>,
207}
208
209/// `GET /version`
210///
211/// Returns a JSON object describing the running server's version and
212/// capabilities. Built entirely from compile-time constants; never touches
213/// the database. Always returns `200 OK`.
214pub async fn version_handler() -> Json<VersionResponse> {
215    Json(VersionResponse {
216        version: env!("CARGO_PKG_VERSION"),
217        formats: LogFormat::ALL.iter().map(|f| f.name()).collect(),
218        capabilities: vec!["query", "stats", "version"],
219    })
220}
221
222// ---------------------------------------------------------------------------
223// Tests
224// ---------------------------------------------------------------------------
225
226#[cfg(test)]
227mod tests {
228    use super::*;
229    use serde_json::Value;
230
231    fn sample_entry() -> LogEntry {
232        let raw = r#"{"timestamp":"2026-04-22T14:03:21Z","level":"error","message":"boom"}"#;
233        let mut e = LogEntry::new(raw.to_string());
234        e.timestamp = Some("2026-04-22T14:03:21Z".to_string());
235        e.level = Some("error".to_string());
236        e.message = Some("boom".to_string());
237        e
238    }
239
240    #[test]
241    fn entry_to_json_string_is_valid_json_with_expected_shape() {
242        let e = sample_entry();
243        let s = entry_to_json_string(&e);
244        let v: Value = serde_json::from_str(&s).expect("valid json");
245        assert_eq!(v["timestamp"], "2026-04-22T14:03:21Z");
246        assert_eq!(v["level"], "error");
247        assert_eq!(v["message"], "boom");
248        assert!(v["tag"].is_null());
249        assert!(v["fields"].is_object());
250        assert!(v["raw"].is_string());
251    }
252
253    #[test]
254    fn entry_to_json_string_preserves_tag_and_fields() {
255        let mut e = sample_entry();
256        e.tag = Some("api".to_string());
257        e.fields
258            .insert("service".to_string(), Value::String("payments".to_string()));
259
260        let s = entry_to_json_string(&e);
261        let v: Value = serde_json::from_str(&s).unwrap();
262        assert_eq!(v["tag"], "api");
263        assert_eq!(v["fields"]["service"], "payments");
264    }
265
266    #[test]
267    fn stats_response_from_core_round_trips_all_fields() {
268        let mut idx = logdive_core::Indexer::open_in_memory().unwrap();
269        let mut e = sample_entry();
270        e.tag = Some("api".to_string());
271        idx.insert_batch(&[e]).unwrap();
272
273        let stats = idx.stats().unwrap();
274        let resp = StatsResponse::from_core(stats, "/tmp/idx.db".to_string(), 4096);
275
276        assert_eq!(resp.entries, 1);
277        assert_eq!(resp.db_path, "/tmp/idx.db");
278        assert_eq!(resp.db_size_bytes, 4096);
279        assert_eq!(resp.tags, vec![Some("api".to_string())]);
280
281        let v = serde_json::to_value(&resp).unwrap();
282        assert_eq!(v["entries"], 1);
283        assert_eq!(v["db_path"], "/tmp/idx.db");
284        assert_eq!(v["db_size_bytes"], 4096);
285        assert_eq!(v["tags"][0], "api");
286    }
287
288    #[test]
289    fn stats_response_renders_null_for_empty_time_bounds() {
290        let idx = logdive_core::Indexer::open_in_memory().unwrap();
291        let stats = idx.stats().unwrap();
292        let resp = StatsResponse::from_core(stats, "/tmp/empty.db".to_string(), 0);
293
294        let v = serde_json::to_value(&resp).unwrap();
295        assert!(v["min_timestamp"].is_null());
296        assert!(v["max_timestamp"].is_null());
297        assert_eq!(v["tags"].as_array().unwrap().len(), 0);
298    }
299
300    #[test]
301    fn build_ndjson_response_sets_correct_content_type() {
302        let mut e = sample_entry();
303        e.tag = Some("api".to_string());
304        let resp = build_ndjson_response(&[e]);
305
306        assert_eq!(resp.status(), StatusCode::OK);
307        let content_type = resp
308            .headers()
309            .get(header::CONTENT_TYPE)
310            .and_then(|v| v.to_str().ok())
311            .unwrap_or_default();
312        assert_eq!(content_type, "application/x-ndjson");
313    }
314
315    #[test]
316    fn build_ndjson_response_is_ok_for_empty_results() {
317        let resp = build_ndjson_response(&[]);
318        assert_eq!(resp.status(), StatusCode::OK);
319    }
320
321    // ----- GET /version ------------------------------------------------
322
323    #[tokio::test]
324    async fn version_handler_returns_current_package_version() {
325        let Json(resp) = version_handler().await;
326        assert_eq!(resp.version, env!("CARGO_PKG_VERSION"));
327    }
328
329    #[tokio::test]
330    async fn version_handler_formats_match_logformat_all() {
331        let Json(resp) = version_handler().await;
332        let expected: Vec<&'static str> = LogFormat::ALL.iter().map(|f| f.name()).collect();
333        assert_eq!(resp.formats, expected);
334    }
335
336    #[tokio::test]
337    async fn version_handler_capabilities_are_sorted_and_complete() {
338        let Json(resp) = version_handler().await;
339        assert_eq!(resp.capabilities, vec!["query", "stats", "version"]);
340        // Guard: the list must stay sorted — future additions must maintain this.
341        let mut sorted = resp.capabilities.clone();
342        sorted.sort_unstable();
343        assert_eq!(
344            resp.capabilities, sorted,
345            "capabilities must be sorted alphabetically"
346        );
347    }
348
349    #[test]
350    fn version_response_serializes_to_expected_json_shape() {
351        let resp = VersionResponse {
352            version: "0.2.0",
353            formats: vec!["json", "logfmt", "plain"],
354            capabilities: vec!["query", "stats", "version"],
355        };
356        let v = serde_json::to_value(&resp).unwrap();
357        assert_eq!(v["version"], "0.2.0");
358        assert_eq!(v["formats"], serde_json::json!(["json", "logfmt", "plain"]));
359        assert_eq!(
360            v["capabilities"],
361            serde_json::json!(["query", "stats", "version"])
362        );
363    }
364}