Skip to main content

logdive_api/
handlers.rs

1//! Axum handlers for the two public endpoints.
2//!
3//! Both handlers are async, but every one of them delegates the actual
4//! SQLite work to [`AppState::with_connection`] which runs on Tokio's
5//! blocking-task pool. The handlers themselves do parsing, parameter
6//! validation, and response shaping — nothing else.
7
8use axum::{
9    Json,
10    extract::{Query, State},
11    http::{StatusCode, header},
12    response::{IntoResponse, Response},
13};
14use serde::{Deserialize, Serialize};
15
16use logdive_core::{LogEntry, Stats, execute, parse_query};
17
18use crate::error::AppError;
19use crate::state::AppState;
20
21// ---------------------------------------------------------------------------
22// GET /query
23// ---------------------------------------------------------------------------
24
25/// Query-string parameters accepted by `GET /query`.
26///
27/// `q` is modeled as `Option<String>` rather than a required field so the
28/// "missing `q`" error message comes from our own `AppError::bad_request`
29/// path rather than from Axum's generic extractor rejection.
30#[derive(Debug, Deserialize)]
31pub struct QueryParams {
32    pub q: Option<String>,
33    pub limit: Option<usize>,
34}
35
36/// Default cap on result set size when `limit` is not supplied. Mirrors
37/// the CLI's `--limit` default so the two surfaces behave identically.
38const DEFAULT_LIMIT: usize = 1000;
39
40/// `GET /query?q=<expr>&limit=<n>`
41///
42/// Returns matching log entries as newline-delimited JSON, one entry per
43/// line. A missing `limit` defaults to 1000; `limit=0` means unlimited.
44pub async fn query_handler(
45    State(state): State<AppState>,
46    Query(params): Query<QueryParams>,
47) -> Result<Response, AppError> {
48    // Validate: `q` is required and non-empty (post-trim).
49    let query_str = params
50        .q
51        .map(|s| s.trim().to_string())
52        .filter(|s| !s.is_empty())
53        .ok_or_else(|| AppError::bad_request("missing or empty `q` parameter"))?;
54
55    // Apply the same "0 = unlimited" rule as the CLI.
56    let limit = match params.limit.unwrap_or(DEFAULT_LIMIT) {
57        0 => None,
58        n => Some(n),
59    };
60
61    // Parse the query up-front so parse errors are classified by the
62    // `From<LogdiveError>` impl before we touch the DB.
63    let ast = parse_query(&query_str)?;
64
65    tracing::debug!(query = %query_str, ?limit, "executing query over HTTP");
66
67    let rows: Vec<LogEntry> = state
68        .with_connection(move |indexer| execute(&ast, indexer.connection(), limit))
69        .await?;
70
71    tracing::debug!(
72        result_count = rows.len(),
73        "query returned results over HTTP"
74    );
75
76    Ok(build_ndjson_response(&rows))
77}
78
79/// Serialize a slice of entries into a newline-delimited JSON body and
80/// wrap it in a response with the correct `Content-Type`.
81///
82/// Empty result sets produce an empty body (zero bytes, zero lines),
83/// which is valid NDJSON and pipeline-friendly. Status is 200 OK in
84/// both the empty and non-empty cases — "no matches" is a successful
85/// query, not a not-found.
86fn build_ndjson_response(rows: &[LogEntry]) -> Response {
87    let mut body = String::with_capacity(rows.len() * 256);
88    for row in rows {
89        body.push_str(&entry_to_json_string(row));
90        body.push('\n');
91    }
92
93    (
94        StatusCode::OK,
95        [(header::CONTENT_TYPE, "application/x-ndjson")],
96        body,
97    )
98        .into_response()
99}
100
101/// Render a single `LogEntry` as a JSON object string.
102///
103/// The shape matches the CLI's `--format json` output so clients can treat
104/// both surfaces interchangeably: `timestamp`, `level`, `message`, `tag`,
105/// `fields`, `raw`. `tag` is `null` when absent; `fields` is always an
106/// object.
107fn entry_to_json_string(entry: &LogEntry) -> String {
108    // Using `serde_json::json!` rather than `#[derive(Serialize)]` on
109    // `LogEntry` keeps the serde annotation burden out of core. This
110    // module owns the HTTP wire shape.
111    let value = serde_json::json!({
112        "timestamp": entry.timestamp,
113        "level":     entry.level,
114        "message":   entry.message,
115        "tag":       entry.tag,
116        "fields":    serde_json::Value::Object(entry.fields.clone()),
117        "raw":       entry.raw,
118    });
119    value.to_string()
120}
121
122// ---------------------------------------------------------------------------
123// GET /stats
124// ---------------------------------------------------------------------------
125
126/// Wire shape for `GET /stats` responses.
127///
128/// Intentionally decoupled from `logdive_core::Stats` so the library stays
129/// serde-agnostic on its public output types. Renaming core fields is
130/// then a non-breaking change for HTTP clients; conversely, the HTTP
131/// shape can evolve without touching core.
132#[derive(Debug, Serialize)]
133pub struct StatsResponse {
134    pub entries: u64,
135    pub min_timestamp: Option<String>,
136    pub max_timestamp: Option<String>,
137    /// Distinct tag values. `None` (untagged) appears first when present,
138    /// then non-null tags in ascending alphabetical order — identical to
139    /// the `Stats.tags` contract from core. Clients presenting this to
140    /// humans can reshuffle (e.g. put `null` last, label it "(untagged)")
141    /// the way the CLI does.
142    pub tags: Vec<Option<String>>,
143    pub db_size_bytes: u64,
144    pub db_path: String,
145}
146
147impl StatsResponse {
148    fn from_core(stats: Stats, db_path: String, db_size_bytes: u64) -> Self {
149        Self {
150            entries: stats.entries,
151            min_timestamp: stats.min_timestamp,
152            max_timestamp: stats.max_timestamp,
153            tags: stats.tags,
154            db_size_bytes,
155            db_path,
156        }
157    }
158}
159
160/// `GET /stats`
161///
162/// Returns aggregate metadata about the index as a single JSON object.
163pub async fn stats_handler(State(state): State<AppState>) -> Result<Json<StatsResponse>, AppError> {
164    let db_path_for_response = state.db_path.display().to_string();
165    let db_path_for_closure = state.db_path.clone();
166
167    let (stats, size_bytes) = state
168        .with_connection(move |indexer| {
169            // Run both queries under the same blocking task so we don't
170            // round-trip back to the async runtime between them.
171            let stats = indexer.stats()?;
172            let size_bytes = std::fs::metadata(&db_path_for_closure)
173                .map(|m| m.len())
174                .map_err(|e| logdive_core::LogdiveError::io_at(&db_path_for_closure, e))?;
175            Ok::<_, logdive_core::LogdiveError>((stats, size_bytes))
176        })
177        .await?;
178
179    let response = StatsResponse::from_core(stats, db_path_for_response, size_bytes);
180    Ok(Json(response))
181}
182
183// ---------------------------------------------------------------------------
184// Tests
185// ---------------------------------------------------------------------------
186
187#[cfg(test)]
188mod tests {
189    use super::*;
190    use serde_json::Value;
191
192    fn sample_entry() -> LogEntry {
193        let raw = r#"{"timestamp":"2026-04-22T14:03:21Z","level":"error","message":"boom"}"#;
194        let mut e = LogEntry::new(raw.to_string());
195        e.timestamp = Some("2026-04-22T14:03:21Z".to_string());
196        e.level = Some("error".to_string());
197        e.message = Some("boom".to_string());
198        e
199    }
200
201    #[test]
202    fn entry_to_json_string_is_valid_json_with_expected_shape() {
203        let e = sample_entry();
204        let s = entry_to_json_string(&e);
205        let v: Value = serde_json::from_str(&s).expect("valid json");
206        assert_eq!(v["timestamp"], "2026-04-22T14:03:21Z");
207        assert_eq!(v["level"], "error");
208        assert_eq!(v["message"], "boom");
209        assert!(v["tag"].is_null());
210        assert!(v["fields"].is_object());
211        assert!(v["raw"].is_string());
212    }
213
214    #[test]
215    fn entry_to_json_string_preserves_tag_and_fields() {
216        let mut e = sample_entry();
217        e.tag = Some("api".to_string());
218        e.fields
219            .insert("service".to_string(), Value::String("payments".to_string()));
220
221        let s = entry_to_json_string(&e);
222        let v: Value = serde_json::from_str(&s).unwrap();
223        assert_eq!(v["tag"], "api");
224        assert_eq!(v["fields"]["service"], "payments");
225    }
226
227    #[test]
228    fn stats_response_from_core_round_trips_all_fields() {
229        let mut idx = logdive_core::Indexer::open_in_memory().unwrap();
230        let mut e = sample_entry();
231        e.tag = Some("api".to_string());
232        idx.insert_batch(&[e]).unwrap();
233
234        let stats = idx.stats().unwrap();
235        let resp = StatsResponse::from_core(stats, "/tmp/idx.db".to_string(), 4096);
236
237        assert_eq!(resp.entries, 1);
238        assert_eq!(resp.db_path, "/tmp/idx.db");
239        assert_eq!(resp.db_size_bytes, 4096);
240        assert_eq!(resp.tags, vec![Some("api".to_string())]);
241
242        let v = serde_json::to_value(&resp).unwrap();
243        assert_eq!(v["entries"], 1);
244        assert_eq!(v["db_path"], "/tmp/idx.db");
245        assert_eq!(v["db_size_bytes"], 4096);
246        assert_eq!(v["tags"][0], "api");
247    }
248
249    #[test]
250    fn stats_response_renders_null_for_empty_time_bounds() {
251        let idx = logdive_core::Indexer::open_in_memory().unwrap();
252        let stats = idx.stats().unwrap();
253        let resp = StatsResponse::from_core(stats, "/tmp/empty.db".to_string(), 0);
254
255        let v = serde_json::to_value(&resp).unwrap();
256        assert!(v["min_timestamp"].is_null());
257        assert!(v["max_timestamp"].is_null());
258        assert_eq!(v["tags"].as_array().unwrap().len(), 0);
259    }
260
261    #[test]
262    fn build_ndjson_response_sets_correct_content_type() {
263        let mut e = sample_entry();
264        e.tag = Some("api".to_string());
265        let resp = build_ndjson_response(&[e]);
266
267        assert_eq!(resp.status(), StatusCode::OK);
268        let content_type = resp
269            .headers()
270            .get(header::CONTENT_TYPE)
271            .and_then(|v| v.to_str().ok())
272            .unwrap_or_default();
273        assert_eq!(content_type, "application/x-ndjson");
274    }
275
276    #[test]
277    fn build_ndjson_response_is_ok_for_empty_results() {
278        let resp = build_ndjson_response(&[]);
279        assert_eq!(resp.status(), StatusCode::OK);
280    }
281}