Skip to main content

logdive_api/
handlers.rs

1//! Axum handlers for the three public endpoints.
2//!
3//! All handlers are async, but every one that touches the database
4//! delegates SQLite work to [`AppState::with_connection`], which runs on
5//! Tokio's blocking-task pool. The handlers themselves do parsing,
6//! parameter validation, and response shaping — nothing else.
7//!
8//! `GET /version` is the exception: it returns compile-time constants only
9//! and never opens the database.
10
11use axum::{
12    Json,
13    extract::{Query, State},
14    http::{StatusCode, header},
15    response::{IntoResponse, Response},
16};
17use serde::{Deserialize, Serialize};
18
19use logdive_core::{LogEntry, LogFormat, QueryOptions, Stats, execute, parse_query};
20
21use crate::error::AppError;
22use crate::state::AppState;
23
24// ---------------------------------------------------------------------------
25// GET /query
26// ---------------------------------------------------------------------------
27
28/// Query-string parameters accepted by `GET /query`.
29///
30/// `q` is modeled as `Option<String>` rather than a required field so the
31/// "missing `q`" error message comes from our own `AppError::bad_request`
32/// path rather than from Axum's generic extractor rejection.
33///
34/// `offset` is optional; absent or `0` both mean "start from the first result".
35/// Mirrors the CLI's `--offset` behaviour so both surfaces page identically.
36#[derive(Debug, Deserialize)]
37pub struct QueryParams {
38    pub q: Option<String>,
39    pub limit: Option<usize>,
40    pub offset: Option<usize>,
41}
42
43/// Default cap on result set size when `limit` is not supplied. Mirrors
44/// the CLI's `--limit` default so the two surfaces behave identically.
45const DEFAULT_LIMIT: usize = 1000;
46
47/// `GET /query?q=<expr>&limit=<n>&offset=<m>`
48///
49/// Returns matching log entries as newline-delimited JSON, one entry per
50/// line. A missing `limit` defaults to 1000; `limit=0` means unlimited.
51/// A missing or zero `offset` starts from the first result.
52pub async fn query_handler(
53    State(state): State<AppState>,
54    Query(params): Query<QueryParams>,
55) -> Result<Response, AppError> {
56    // Validate: `q` is required and non-empty (post-trim).
57    let query_str = params
58        .q
59        .map(|s| s.trim().to_string())
60        .filter(|s| !s.is_empty())
61        .ok_or_else(|| AppError::bad_request("missing or empty `q` parameter"))?;
62
63    // Apply the same "0 = unlimited" and "0 = absent" rules as the CLI.
64    let limit = match params.limit.unwrap_or(DEFAULT_LIMIT) {
65        0 => None,
66        n => Some(n),
67    };
68    let offset = match params.offset.unwrap_or(0) {
69        0 => None,
70        n => Some(n),
71    };
72
73    // Parse the query up-front so parse errors are classified by the
74    // `From<LogdiveError>` impl before we touch the DB.
75    let ast = parse_query(&query_str)?;
76
77    tracing::debug!(query = %query_str, ?limit, ?offset, "executing query over HTTP");
78
79    let rows: Vec<LogEntry> = state
80        .with_connection(move |indexer| {
81            execute(&ast, indexer.connection(), QueryOptions { limit, offset })
82        })
83        .await?;
84
85    tracing::debug!(
86        result_count = rows.len(),
87        "query returned results over HTTP"
88    );
89
90    Ok(build_ndjson_response(&rows))
91}
92
93/// Serialize a slice of entries into a newline-delimited JSON body and
94/// wrap it in a response with the correct `Content-Type`.
95///
96/// Empty result sets produce an empty body (zero bytes, zero lines),
97/// which is valid NDJSON and pipeline-friendly. Status is 200 OK in
98/// both the empty and non-empty cases — "no matches" is a successful
99/// query, not a not-found.
100fn build_ndjson_response(rows: &[LogEntry]) -> Response {
101    let mut body = String::with_capacity(rows.len() * 256);
102    for row in rows {
103        body.push_str(&entry_to_json_string(row));
104        body.push('\n');
105    }
106
107    (
108        StatusCode::OK,
109        [(header::CONTENT_TYPE, "application/x-ndjson")],
110        body,
111    )
112        .into_response()
113}
114
115/// Render a single `LogEntry` as a JSON object string.
116///
117/// The shape matches the CLI's `--format json` output so clients can treat
118/// both surfaces interchangeably: `timestamp`, `level`, `message`, `tag`,
119/// `fields`, `raw`. `tag` is `null` when absent; `fields` is always an
120/// object.
121fn entry_to_json_string(entry: &LogEntry) -> String {
122    serde_json::to_string(entry).unwrap_or_else(|_| "{}".to_string())
123}
124
125// ---------------------------------------------------------------------------
126// GET /stats
127// ---------------------------------------------------------------------------
128
129/// Wire shape for `GET /stats` responses.
130///
131/// Intentionally decoupled from `logdive_core::Stats` so the library stays
132/// serde-agnostic on its public output types. Renaming core fields is
133/// then a non-breaking change for HTTP clients; conversely, the HTTP
134/// shape can evolve without touching core.
135#[derive(Debug, Serialize)]
136pub struct StatsResponse {
137    pub entries: u64,
138    pub min_timestamp: Option<String>,
139    pub max_timestamp: Option<String>,
140    /// Distinct tag values. `None` (untagged) appears first when present,
141    /// then non-null tags in ascending alphabetical order — identical to
142    /// the `Stats.tags` contract from core. Clients presenting this to
143    /// humans can reshuffle (e.g. put `null` last, label it "(untagged)")
144    /// the way the CLI does.
145    pub tags: Vec<Option<String>>,
146    pub db_size_bytes: u64,
147    pub db_path: String,
148}
149
150impl StatsResponse {
151    fn from_core(stats: Stats, db_path: String, db_size_bytes: u64) -> Self {
152        Self {
153            entries: stats.entries,
154            min_timestamp: stats.min_timestamp,
155            max_timestamp: stats.max_timestamp,
156            tags: stats.tags,
157            db_size_bytes,
158            db_path,
159        }
160    }
161}
162
163/// `GET /stats`
164///
165/// Returns aggregate metadata about the index as a single JSON object.
166pub async fn stats_handler(State(state): State<AppState>) -> Result<Json<StatsResponse>, AppError> {
167    let db_path_for_response = state.db_path.display().to_string();
168    let db_path_for_closure = state.db_path.clone();
169
170    let (stats, size_bytes) = state
171        .with_connection(move |indexer| {
172            // Run both queries under the same blocking task so we don't
173            // round-trip back to the async runtime between them.
174            let stats = indexer.stats()?;
175            let size_bytes = std::fs::metadata(&db_path_for_closure)
176                .map(|m| m.len())
177                .map_err(|e| logdive_core::LogdiveError::io_at(&db_path_for_closure, e))?;
178            Ok::<_, logdive_core::LogdiveError>((stats, size_bytes))
179        })
180        .await?;
181
182    let response = StatsResponse::from_core(stats, db_path_for_response, size_bytes);
183    Ok(Json(response))
184}
185
186// ---------------------------------------------------------------------------
187// GET /version
188// ---------------------------------------------------------------------------
189
190/// Wire shape for `GET /version` responses.
191///
192/// All fields are compile-time constants — no database access is required.
193/// The primary use case is client-side feature detection: a UI or script
194/// calls `/version` on startup to discover which formats and endpoints the
195/// running server supports before making assumptions about its capabilities.
196#[derive(Debug, Serialize)]
197pub struct VersionResponse {
198    /// Semver version of the running `logdive-api` binary, injected from
199    /// `CARGO_PKG_VERSION` at compile time.
200    pub version: &'static str,
201    /// Ingest formats the binary was compiled with, sourced from
202    /// [`LogFormat::ALL`]. Stays in sync with core without manual
203    /// maintenance — adding a new format to core propagates here for free.
204    pub formats: Vec<&'static str>,
205    /// API endpoint names available on this server, sorted alphabetically.
206    pub capabilities: Vec<&'static str>,
207}
208
209/// `GET /version`
210///
211/// Returns a JSON object describing the running server's version and
212/// capabilities. Built entirely from compile-time constants; never touches
213/// the database. Always returns `200 OK`.
214pub async fn version_handler() -> Json<VersionResponse> {
215    Json(VersionResponse {
216        version: env!("CARGO_PKG_VERSION"),
217        formats: LogFormat::ALL.iter().map(|f| f.name()).collect(),
218        capabilities: vec!["query", "stats", "version"],
219    })
220}
221
222// ---------------------------------------------------------------------------
223// Tests
224// ---------------------------------------------------------------------------
225
226#[cfg(test)]
227mod tests {
228    use super::*;
229    use serde_json::Value;
230
231    fn sample_entry() -> LogEntry {
232        let raw = r#"{"timestamp":"2026-04-22T14:03:21Z","level":"error","message":"boom"}"#;
233        let mut e = LogEntry::new(raw.to_string());
234        e.timestamp = Some("2026-04-22T14:03:21Z".to_string());
235        e.level = Some("error".to_string());
236        e.message = Some("boom".to_string());
237        e
238    }
239
240    #[test]
241    fn entry_to_json_string_is_valid_json_with_expected_shape() {
242        let e = sample_entry();
243        let s = entry_to_json_string(&e);
244        let v: Value = serde_json::from_str(&s).expect("valid json");
245        assert_eq!(v["timestamp"], "2026-04-22T14:03:21Z");
246        assert_eq!(v["level"], "error");
247        assert_eq!(v["message"], "boom");
248        assert!(v["tag"].is_null());
249        assert!(v["fields"].is_object());
250        assert!(v["raw"].is_string());
251    }
252
253    #[test]
254    fn entry_to_json_string_preserves_tag_and_fields() {
255        let mut e = sample_entry();
256        e.tag = Some("api".to_string());
257        e.fields
258            .insert("service".to_string(), Value::String("payments".to_string()));
259
260        let s = entry_to_json_string(&e);
261        let v: Value = serde_json::from_str(&s).unwrap();
262        assert_eq!(v["tag"], "api");
263        assert_eq!(v["fields"]["service"], "payments");
264    }
265
266    #[test]
267    fn stats_response_from_core_round_trips_all_fields() {
268        let mut idx = logdive_core::Indexer::open_in_memory().unwrap();
269        let mut e = sample_entry();
270        e.tag = Some("api".to_string());
271        idx.insert_batch(&[e]).unwrap();
272
273        let stats = idx.stats().unwrap();
274        let resp = StatsResponse::from_core(stats, "/tmp/idx.db".to_string(), 4096);
275
276        assert_eq!(resp.entries, 1);
277        assert_eq!(resp.db_path, "/tmp/idx.db");
278        assert_eq!(resp.db_size_bytes, 4096);
279        assert_eq!(resp.tags, vec![Some("api".to_string())]);
280
281        let v = serde_json::to_value(&resp).unwrap();
282        assert_eq!(v["entries"], 1);
283        assert_eq!(v["db_path"], "/tmp/idx.db");
284        assert_eq!(v["db_size_bytes"], 4096);
285        assert_eq!(v["tags"][0], "api");
286    }
287
288    #[test]
289    fn stats_response_renders_null_for_empty_time_bounds() {
290        let idx = logdive_core::Indexer::open_in_memory().unwrap();
291        let stats = idx.stats().unwrap();
292        let resp = StatsResponse::from_core(stats, "/tmp/empty.db".to_string(), 0);
293
294        let v = serde_json::to_value(&resp).unwrap();
295        assert!(v["min_timestamp"].is_null());
296        assert!(v["max_timestamp"].is_null());
297        assert_eq!(v["tags"].as_array().unwrap().len(), 0);
298    }
299
300    #[test]
301    fn build_ndjson_response_sets_correct_content_type() {
302        let mut e = sample_entry();
303        e.tag = Some("api".to_string());
304        let resp = build_ndjson_response(&[e]);
305
306        assert_eq!(resp.status(), StatusCode::OK);
307        let content_type = resp
308            .headers()
309            .get(header::CONTENT_TYPE)
310            .and_then(|v| v.to_str().ok())
311            .unwrap_or_default();
312        assert_eq!(content_type, "application/x-ndjson");
313    }
314
315    #[test]
316    fn build_ndjson_response_is_ok_for_empty_results() {
317        let resp = build_ndjson_response(&[]);
318        assert_eq!(resp.status(), StatusCode::OK);
319    }
320
321    // ----- GET /version ------------------------------------------------
322
323    #[tokio::test]
324    async fn version_handler_returns_current_package_version() {
325        let Json(resp) = version_handler().await;
326        assert_eq!(resp.version, env!("CARGO_PKG_VERSION"));
327    }
328
329    #[tokio::test]
330    async fn version_handler_formats_match_logformat_all() {
331        let Json(resp) = version_handler().await;
332        let expected: Vec<&'static str> = LogFormat::ALL.iter().map(|f| f.name()).collect();
333        assert_eq!(resp.formats, expected);
334    }
335
336    #[tokio::test]
337    async fn version_handler_capabilities_are_sorted_and_complete() {
338        let Json(resp) = version_handler().await;
339        assert_eq!(resp.capabilities, vec!["query", "stats", "version"]);
340        // Guard: the list must stay sorted — future additions must maintain this.
341        let mut sorted = resp.capabilities.clone();
342        sorted.sort_unstable();
343        assert_eq!(
344            resp.capabilities, sorted,
345            "capabilities must be sorted alphabetically"
346        );
347    }
348
349    #[test]
350    fn version_response_serializes_to_expected_json_shape() {
351        let resp = VersionResponse {
352            version: "0.2.0",
353            formats: vec!["json", "logfmt", "plain"],
354            capabilities: vec!["query", "stats", "version"],
355        };
356        let v = serde_json::to_value(&resp).unwrap();
357        assert_eq!(v["version"], "0.2.0");
358        assert_eq!(v["formats"], serde_json::json!(["json", "logfmt", "plain"]));
359        assert_eq!(
360            v["capabilities"],
361            serde_json::json!(["query", "stats", "version"])
362        );
363    }
364}