Skip to main content

logdive_api/
handlers.rs

1//! Axum handlers for the three public endpoints.
2//!
3//! All handlers are async, but every one that touches the database
4//! delegates SQLite work to [`AppState::with_connection`], which runs on
5//! Tokio's blocking-task pool. The handlers themselves do parsing,
6//! parameter validation, and response shaping — nothing else.
7//!
8//! `GET /version` is the exception: it returns compile-time constants only
9//! and never opens the database.
10
11use axum::{
12    Json,
13    extract::{Query, State},
14    http::{StatusCode, header},
15    response::{IntoResponse, Response},
16};
17use serde::{Deserialize, Serialize};
18
19use logdive_core::{LogEntry, LogFormat, Stats, execute, parse_query};
20
21use crate::error::AppError;
22use crate::state::AppState;
23
24// ---------------------------------------------------------------------------
25// GET /query
26// ---------------------------------------------------------------------------
27
28/// Query-string parameters accepted by `GET /query`.
29///
30/// `q` is modeled as `Option<String>` rather than a required field so the
31/// "missing `q`" error message comes from our own `AppError::bad_request`
32/// path rather than from Axum's generic extractor rejection.
33#[derive(Debug, Deserialize)]
34pub struct QueryParams {
35    pub q: Option<String>,
36    pub limit: Option<usize>,
37}
38
39/// Default cap on result set size when `limit` is not supplied. Mirrors
40/// the CLI's `--limit` default so the two surfaces behave identically.
41const DEFAULT_LIMIT: usize = 1000;
42
43/// `GET /query?q=<expr>&limit=<n>`
44///
45/// Returns matching log entries as newline-delimited JSON, one entry per
46/// line. A missing `limit` defaults to 1000; `limit=0` means unlimited.
47pub async fn query_handler(
48    State(state): State<AppState>,
49    Query(params): Query<QueryParams>,
50) -> Result<Response, AppError> {
51    // Validate: `q` is required and non-empty (post-trim).
52    let query_str = params
53        .q
54        .map(|s| s.trim().to_string())
55        .filter(|s| !s.is_empty())
56        .ok_or_else(|| AppError::bad_request("missing or empty `q` parameter"))?;
57
58    // Apply the same "0 = unlimited" rule as the CLI.
59    let limit = match params.limit.unwrap_or(DEFAULT_LIMIT) {
60        0 => None,
61        n => Some(n),
62    };
63
64    // Parse the query up-front so parse errors are classified by the
65    // `From<LogdiveError>` impl before we touch the DB.
66    let ast = parse_query(&query_str)?;
67
68    tracing::debug!(query = %query_str, ?limit, "executing query over HTTP");
69
70    let rows: Vec<LogEntry> = state
71        .with_connection(move |indexer| execute(&ast, indexer.connection(), limit))
72        .await?;
73
74    tracing::debug!(
75        result_count = rows.len(),
76        "query returned results over HTTP"
77    );
78
79    Ok(build_ndjson_response(&rows))
80}
81
82/// Serialize a slice of entries into a newline-delimited JSON body and
83/// wrap it in a response with the correct `Content-Type`.
84///
85/// Empty result sets produce an empty body (zero bytes, zero lines),
86/// which is valid NDJSON and pipeline-friendly. Status is 200 OK in
87/// both the empty and non-empty cases — "no matches" is a successful
88/// query, not a not-found.
89fn build_ndjson_response(rows: &[LogEntry]) -> Response {
90    let mut body = String::with_capacity(rows.len() * 256);
91    for row in rows {
92        body.push_str(&entry_to_json_string(row));
93        body.push('\n');
94    }
95
96    (
97        StatusCode::OK,
98        [(header::CONTENT_TYPE, "application/x-ndjson")],
99        body,
100    )
101        .into_response()
102}
103
104/// Render a single `LogEntry` as a JSON object string.
105///
106/// The shape matches the CLI's `--format json` output so clients can treat
107/// both surfaces interchangeably: `timestamp`, `level`, `message`, `tag`,
108/// `fields`, `raw`. `tag` is `null` when absent; `fields` is always an
109/// object.
110fn entry_to_json_string(entry: &LogEntry) -> String {
111    serde_json::to_string(entry).unwrap_or_else(|_| "{}".to_string())
112}
113
114// ---------------------------------------------------------------------------
115// GET /stats
116// ---------------------------------------------------------------------------
117
118/// Wire shape for `GET /stats` responses.
119///
120/// Intentionally decoupled from `logdive_core::Stats` so the library stays
121/// serde-agnostic on its public output types. Renaming core fields is
122/// then a non-breaking change for HTTP clients; conversely, the HTTP
123/// shape can evolve without touching core.
124#[derive(Debug, Serialize)]
125pub struct StatsResponse {
126    pub entries: u64,
127    pub min_timestamp: Option<String>,
128    pub max_timestamp: Option<String>,
129    /// Distinct tag values. `None` (untagged) appears first when present,
130    /// then non-null tags in ascending alphabetical order — identical to
131    /// the `Stats.tags` contract from core. Clients presenting this to
132    /// humans can reshuffle (e.g. put `null` last, label it "(untagged)")
133    /// the way the CLI does.
134    pub tags: Vec<Option<String>>,
135    pub db_size_bytes: u64,
136    pub db_path: String,
137}
138
139impl StatsResponse {
140    fn from_core(stats: Stats, db_path: String, db_size_bytes: u64) -> Self {
141        Self {
142            entries: stats.entries,
143            min_timestamp: stats.min_timestamp,
144            max_timestamp: stats.max_timestamp,
145            tags: stats.tags,
146            db_size_bytes,
147            db_path,
148        }
149    }
150}
151
152/// `GET /stats`
153///
154/// Returns aggregate metadata about the index as a single JSON object.
155pub async fn stats_handler(State(state): State<AppState>) -> Result<Json<StatsResponse>, AppError> {
156    let db_path_for_response = state.db_path.display().to_string();
157    let db_path_for_closure = state.db_path.clone();
158
159    let (stats, size_bytes) = state
160        .with_connection(move |indexer| {
161            // Run both queries under the same blocking task so we don't
162            // round-trip back to the async runtime between them.
163            let stats = indexer.stats()?;
164            let size_bytes = std::fs::metadata(&db_path_for_closure)
165                .map(|m| m.len())
166                .map_err(|e| logdive_core::LogdiveError::io_at(&db_path_for_closure, e))?;
167            Ok::<_, logdive_core::LogdiveError>((stats, size_bytes))
168        })
169        .await?;
170
171    let response = StatsResponse::from_core(stats, db_path_for_response, size_bytes);
172    Ok(Json(response))
173}
174
175// ---------------------------------------------------------------------------
176// GET /version
177// ---------------------------------------------------------------------------
178
179/// Wire shape for `GET /version` responses.
180///
181/// All fields are compile-time constants — no database access is required.
182/// The primary use case is client-side feature detection: a UI or script
183/// calls `/version` on startup to discover which formats and endpoints the
184/// running server supports before making assumptions about its capabilities.
185#[derive(Debug, Serialize)]
186pub struct VersionResponse {
187    /// Semver version of the running `logdive-api` binary, injected from
188    /// `CARGO_PKG_VERSION` at compile time.
189    pub version: &'static str,
190    /// Ingest formats the binary was compiled with, sourced from
191    /// [`LogFormat::ALL`]. Stays in sync with core without manual
192    /// maintenance — adding a new format to core propagates here for free.
193    pub formats: Vec<&'static str>,
194    /// API endpoint names available on this server, sorted alphabetically.
195    pub capabilities: Vec<&'static str>,
196}
197
198/// `GET /version`
199///
200/// Returns a JSON object describing the running server's version and
201/// capabilities. Built entirely from compile-time constants; never touches
202/// the database. Always returns `200 OK`.
203pub async fn version_handler() -> Json<VersionResponse> {
204    Json(VersionResponse {
205        version: env!("CARGO_PKG_VERSION"),
206        formats: LogFormat::ALL.iter().map(|f| f.name()).collect(),
207        capabilities: vec!["query", "stats", "version"],
208    })
209}
210
211// ---------------------------------------------------------------------------
212// Tests
213// ---------------------------------------------------------------------------
214
215#[cfg(test)]
216mod tests {
217    use super::*;
218    use serde_json::Value;
219
220    fn sample_entry() -> LogEntry {
221        let raw = r#"{"timestamp":"2026-04-22T14:03:21Z","level":"error","message":"boom"}"#;
222        let mut e = LogEntry::new(raw.to_string());
223        e.timestamp = Some("2026-04-22T14:03:21Z".to_string());
224        e.level = Some("error".to_string());
225        e.message = Some("boom".to_string());
226        e
227    }
228
229    #[test]
230    fn entry_to_json_string_is_valid_json_with_expected_shape() {
231        let e = sample_entry();
232        let s = entry_to_json_string(&e);
233        let v: Value = serde_json::from_str(&s).expect("valid json");
234        assert_eq!(v["timestamp"], "2026-04-22T14:03:21Z");
235        assert_eq!(v["level"], "error");
236        assert_eq!(v["message"], "boom");
237        assert!(v["tag"].is_null());
238        assert!(v["fields"].is_object());
239        assert!(v["raw"].is_string());
240    }
241
242    #[test]
243    fn entry_to_json_string_preserves_tag_and_fields() {
244        let mut e = sample_entry();
245        e.tag = Some("api".to_string());
246        e.fields
247            .insert("service".to_string(), Value::String("payments".to_string()));
248
249        let s = entry_to_json_string(&e);
250        let v: Value = serde_json::from_str(&s).unwrap();
251        assert_eq!(v["tag"], "api");
252        assert_eq!(v["fields"]["service"], "payments");
253    }
254
255    #[test]
256    fn stats_response_from_core_round_trips_all_fields() {
257        let mut idx = logdive_core::Indexer::open_in_memory().unwrap();
258        let mut e = sample_entry();
259        e.tag = Some("api".to_string());
260        idx.insert_batch(&[e]).unwrap();
261
262        let stats = idx.stats().unwrap();
263        let resp = StatsResponse::from_core(stats, "/tmp/idx.db".to_string(), 4096);
264
265        assert_eq!(resp.entries, 1);
266        assert_eq!(resp.db_path, "/tmp/idx.db");
267        assert_eq!(resp.db_size_bytes, 4096);
268        assert_eq!(resp.tags, vec![Some("api".to_string())]);
269
270        let v = serde_json::to_value(&resp).unwrap();
271        assert_eq!(v["entries"], 1);
272        assert_eq!(v["db_path"], "/tmp/idx.db");
273        assert_eq!(v["db_size_bytes"], 4096);
274        assert_eq!(v["tags"][0], "api");
275    }
276
277    #[test]
278    fn stats_response_renders_null_for_empty_time_bounds() {
279        let idx = logdive_core::Indexer::open_in_memory().unwrap();
280        let stats = idx.stats().unwrap();
281        let resp = StatsResponse::from_core(stats, "/tmp/empty.db".to_string(), 0);
282
283        let v = serde_json::to_value(&resp).unwrap();
284        assert!(v["min_timestamp"].is_null());
285        assert!(v["max_timestamp"].is_null());
286        assert_eq!(v["tags"].as_array().unwrap().len(), 0);
287    }
288
289    #[test]
290    fn build_ndjson_response_sets_correct_content_type() {
291        let mut e = sample_entry();
292        e.tag = Some("api".to_string());
293        let resp = build_ndjson_response(&[e]);
294
295        assert_eq!(resp.status(), StatusCode::OK);
296        let content_type = resp
297            .headers()
298            .get(header::CONTENT_TYPE)
299            .and_then(|v| v.to_str().ok())
300            .unwrap_or_default();
301        assert_eq!(content_type, "application/x-ndjson");
302    }
303
304    #[test]
305    fn build_ndjson_response_is_ok_for_empty_results() {
306        let resp = build_ndjson_response(&[]);
307        assert_eq!(resp.status(), StatusCode::OK);
308    }
309
310    // ----- GET /version ------------------------------------------------
311
312    #[tokio::test]
313    async fn version_handler_returns_current_package_version() {
314        let Json(resp) = version_handler().await;
315        assert_eq!(resp.version, env!("CARGO_PKG_VERSION"));
316    }
317
318    #[tokio::test]
319    async fn version_handler_formats_match_logformat_all() {
320        let Json(resp) = version_handler().await;
321        let expected: Vec<&'static str> = LogFormat::ALL.iter().map(|f| f.name()).collect();
322        assert_eq!(resp.formats, expected);
323    }
324
325    #[tokio::test]
326    async fn version_handler_capabilities_are_sorted_and_complete() {
327        let Json(resp) = version_handler().await;
328        assert_eq!(resp.capabilities, vec!["query", "stats", "version"]);
329        // Guard: the list must stay sorted — future additions must maintain this.
330        let mut sorted = resp.capabilities.clone();
331        sorted.sort_unstable();
332        assert_eq!(
333            resp.capabilities, sorted,
334            "capabilities must be sorted alphabetically"
335        );
336    }
337
338    #[test]
339    fn version_response_serializes_to_expected_json_shape() {
340        let resp = VersionResponse {
341            version: "0.2.0",
342            formats: vec!["json", "logfmt", "plain"],
343            capabilities: vec!["query", "stats", "version"],
344        };
345        let v = serde_json::to_value(&resp).unwrap();
346        assert_eq!(v["version"], "0.2.0");
347        assert_eq!(v["formats"], serde_json::json!(["json", "logfmt", "plain"]));
348        assert_eq!(
349            v["capabilities"],
350            serde_json::json!(["query", "stats", "version"])
351        );
352    }
353}