Skip to main content

logdive_core/
executor.rs

1//! Query executor: translate a [`QueryNode`] into parameterized SQL, run
2//! it against the index, and reconstruct [`LogEntry`] values from the
3//! result rows.
4//!
5//! This module is the bridge between the query AST and the SQLite schema.
6//! It never mixes user-controlled strings into SQL text — every literal
7//! value is bound as a parameter. The one exception is JSON extraction
8//! paths like `$.service`, which embed the field name directly because
9//! SQLite parameters aren't allowed inside `json_extract` path expressions;
10//! safety there comes from the field name having passed
11//! `validate_field_name`'s strict regex in the parser, which we
12//! defensively re-check at the executor boundary.
13//!
14//! # Timestamp handling
15//!
16//! Timestamps are compared as TEXT, which works correctly for any ISO-8601
17//! format because those sort lexicographically in chronological order when
18//! all components are fixed-width. Ingested timestamps that aren't ISO-8601
19//! shaped will compare incorrectly against `last`/`since` bounds — a known
20//! limitation of accepting arbitrary timestamp strings at ingestion time.
21
22use chrono::{DateTime, NaiveDate, NaiveDateTime, TimeZone, Utc};
23use rusqlite::{Connection, params_from_iter, types::Value as SqlValue};
24use serde_json::{Map, Value};
25
26use crate::entry::LogEntry;
27use crate::error::{LogdiveError, Result};
28use crate::query::{Clause, Duration, QueryNode, QueryValue};
29
30// ---------------------------------------------------------------------------
31// Public entry point
32// ---------------------------------------------------------------------------
33
34/// Execute a parsed query against the index and return matching entries.
35///
36/// `limit` caps the result set size; pass `None` for no limit. Results are
37/// ordered by `timestamp DESC, id DESC` (newest first, with row id as
38/// stable tiebreaker for identical timestamps).
39pub fn execute(
40    query: &QueryNode,
41    conn: &Connection,
42    limit: Option<usize>,
43) -> Result<Vec<LogEntry>> {
44    let (sql, binds) = build_sql(query, limit, Utc::now())?;
45    run(conn, &sql, &binds)
46}
47
48/// Variant of [`execute`] that uses a caller-supplied "now" value.
49///
50/// Exposed for testing so time-range clauses produce deterministic bounds.
51pub fn execute_at(
52    query: &QueryNode,
53    conn: &Connection,
54    limit: Option<usize>,
55    now: DateTime<Utc>,
56) -> Result<Vec<LogEntry>> {
57    let (sql, binds) = build_sql(query, limit, now)?;
58    run(conn, &sql, &binds)
59}
60
61// ---------------------------------------------------------------------------
62// SQL generation
63// ---------------------------------------------------------------------------
64
65/// Intermediate representation of a bindable value, kept as an owned
66/// `SqlValue` so `params_from_iter` can consume them without lifetime
67/// gymnastics.
68type Bind = SqlValue;
69
70fn build_sql(
71    query: &QueryNode,
72    limit: Option<usize>,
73    now: DateTime<Utc>,
74) -> Result<(String, Vec<Bind>)> {
75    let QueryNode::And(clauses) = query;
76
77    let mut where_parts: Vec<String> = Vec::with_capacity(clauses.len());
78    let mut binds: Vec<Bind> = Vec::with_capacity(clauses.len());
79
80    for clause in clauses {
81        let (sql, mut clause_binds) = translate_clause(clause, now)?;
82        where_parts.push(sql);
83        binds.append(&mut clause_binds);
84    }
85
86    let where_sql = if where_parts.is_empty() {
87        // Can't happen with a valid QueryNode (parser guarantees at least
88        // one clause), but handle it to keep this function total.
89        "1=1".to_string()
90    } else {
91        where_parts.join(" AND ")
92    };
93
94    let mut sql = format!(
95        "SELECT timestamp, level, message, tag, fields, raw \
96         FROM log_entries \
97         WHERE {where_sql} \
98         ORDER BY timestamp DESC, id DESC"
99    );
100    if let Some(n) = limit {
101        sql.push_str(&format!(" LIMIT {n}"));
102    }
103    Ok((sql, binds))
104}
105
106fn translate_clause(clause: &Clause, now: DateTime<Utc>) -> Result<(String, Vec<Bind>)> {
107    match clause {
108        Clause::Compare { field, op, value } => {
109            let column_expr = column_for_field(field)?;
110            let sql = format!("{column_expr} {op} ?");
111            Ok((sql, vec![value_to_bind(value)]))
112        }
113        Clause::Contains { field, value } => {
114            let column_expr = column_for_field(field)?;
115            // Escape SQL LIKE metacharacters (%, _, \) so a user searching
116            // for a literal '%' doesn't accidentally wildcard the world.
117            let escaped = escape_like(value);
118            let pattern = format!("%{escaped}%");
119            let sql = format!("{column_expr} LIKE ? ESCAPE '\\'");
120            Ok((sql, vec![SqlValue::Text(pattern)]))
121        }
122        Clause::LastDuration(d) => {
123            let cutoff = compute_last_cutoff(*d, now);
124            Ok((
125                "timestamp >= ?".to_string(),
126                vec![SqlValue::Text(cutoff.to_rfc3339())],
127            ))
128        }
129        Clause::SinceDatetime(s) => {
130            let dt = parse_datetime(s)?;
131            Ok((
132                "timestamp >= ?".to_string(),
133                vec![SqlValue::Text(dt.to_rfc3339())],
134            ))
135        }
136    }
137}
138
139/// Return the SQL expression that references a given query field.
140///
141/// Known fields resolve to indexed columns. Unknown fields resolve to a
142/// `json_extract(fields, '$.<field>')` expression — which is why the
143/// field name must survive `validate_field_name`'s regex *and* the
144/// defensive check here.
145fn column_for_field(field: &str) -> Result<String> {
146    if LogEntry::KNOWN_KEYS.contains(&field) {
147        Ok(field.to_string())
148    } else {
149        if !is_safe_json_path_segment(field) {
150            return Err(LogdiveError::UnsafeFieldName(field.to_string()));
151        }
152        Ok(format!("json_extract(fields, '$.{field}')"))
153    }
154}
155
156/// Defensive: the parser's `validate_field_name` already enforces this,
157/// but we re-check at the SQL boundary so the trust model is obvious
158/// from inside this module. Allowed: letters, digits, `_`, `.`.
159fn is_safe_json_path_segment(s: &str) -> bool {
160    !s.is_empty()
161        && s.chars()
162            .next()
163            .map(|c| c.is_ascii_alphabetic() || c == '_')
164            .unwrap_or(false)
165        && s.chars()
166            .all(|c| c.is_ascii_alphanumeric() || c == '_' || c == '.')
167}
168
169fn value_to_bind(v: &QueryValue) -> Bind {
170    match v {
171        QueryValue::String(s) => SqlValue::Text(s.clone()),
172        QueryValue::Integer(n) => SqlValue::Integer(*n),
173        QueryValue::Float(f) => SqlValue::Real(*f),
174        QueryValue::Bool(b) => SqlValue::Integer(if *b { 1 } else { 0 }),
175    }
176}
177
178/// Pre-escape SQL LIKE wildcards (`%`, `_`) and the escape character
179/// itself so a user's literal CONTAINS string is matched literally.
180fn escape_like(input: &str) -> String {
181    let mut out = String::with_capacity(input.len());
182    for ch in input.chars() {
183        match ch {
184            '\\' | '%' | '_' => {
185                out.push('\\');
186                out.push(ch);
187            }
188            _ => out.push(ch),
189        }
190    }
191    out
192}
193
194fn compute_last_cutoff(d: Duration, now: DateTime<Utc>) -> DateTime<Utc> {
195    // `amount` is u64; promote to i64 for chrono. Saturate on the
196    // (astronomically unlikely) overflow case.
197    let amount_i64 = i64::try_from(d.amount).unwrap_or(i64::MAX);
198    let secs = amount_i64.saturating_mul(d.unit.seconds());
199    let delta = chrono::Duration::seconds(secs);
200    now.checked_sub_signed(delta).unwrap_or_else(|| {
201        Utc.timestamp_opt(0, 0)
202            .single()
203            .expect("unix epoch is valid")
204    })
205}
206
207/// Accept three datetime formats for `since` clauses:
208///   - RFC3339 / ISO-8601 with timezone (e.g. `2024-01-01T10:00:00Z`)
209///   - ISO naive datetime (e.g. `2024-01-01 10:00:00` or `2024-01-01T10:00:00`), interpreted as UTC
210///   - ISO date (e.g. `2024-01-01`), interpreted as UTC midnight
211fn parse_datetime(s: &str) -> Result<DateTime<Utc>> {
212    if let Ok(dt) = DateTime::parse_from_rfc3339(s) {
213        return Ok(dt.with_timezone(&Utc));
214    }
215    for fmt in &["%Y-%m-%dT%H:%M:%S", "%Y-%m-%d %H:%M:%S"] {
216        if let Ok(ndt) = NaiveDateTime::parse_from_str(s, fmt) {
217            return Ok(Utc.from_utc_datetime(&ndt));
218        }
219    }
220    if let Ok(nd) = NaiveDate::parse_from_str(s, "%Y-%m-%d") {
221        let ndt = nd.and_hms_opt(0, 0, 0).expect("00:00:00 is valid");
222        return Ok(Utc.from_utc_datetime(&ndt));
223    }
224    Err(LogdiveError::InvalidDatetime {
225        input: s.to_string(),
226        reason: "expected RFC3339, `YYYY-MM-DD HH:MM:SS`, or `YYYY-MM-DD`".to_string(),
227    })
228}
229
230// ---------------------------------------------------------------------------
231// Execution
232// ---------------------------------------------------------------------------
233
234fn run(conn: &Connection, sql: &str, binds: &[Bind]) -> Result<Vec<LogEntry>> {
235    let mut stmt = conn.prepare(sql)?;
236    let rows = stmt.query_map(params_from_iter(binds.iter()), |row| {
237        let timestamp: Option<String> = row.get(0)?;
238        let level: Option<String> = row.get(1)?;
239        let message: Option<String> = row.get(2)?;
240        let tag: Option<String> = row.get(3)?;
241        let fields_json: String = row.get(4)?;
242        let raw: String = row.get(5)?;
243        // We tunnel the raw JSON out; deserialization happens below so the
244        // closure's error type stays `rusqlite::Error`.
245        Ok((timestamp, level, message, tag, fields_json, raw))
246    })?;
247
248    let mut out = Vec::new();
249    for row in rows {
250        let (timestamp, level, message, tag, fields_json, raw) = row?;
251        let fields: Map<String, Value> =
252            serde_json::from_str(&fields_json).map_err(LogdiveError::CorruptFieldsJson)?;
253        out.push(LogEntry {
254            timestamp,
255            level,
256            message,
257            tag,
258            fields,
259            raw,
260        });
261    }
262    Ok(out)
263}
264
265// ---------------------------------------------------------------------------
266// Tests
267// ---------------------------------------------------------------------------
268
269#[cfg(test)]
270mod tests {
271    use super::*;
272    use crate::indexer::Indexer;
273    use crate::query::parse;
274    use std::collections::HashSet;
275
276    /// Convenience: parse a query string and run it against the given
277    /// connection. Panics if parsing fails — tests pass well-formed input.
278    fn run_query(conn: &Connection, q: &str) -> Vec<LogEntry> {
279        let ast = parse(q).expect("test queries are well-formed");
280        execute(&ast, conn, None).expect("execute")
281    }
282
283    fn run_query_at(conn: &Connection, q: &str, now: DateTime<Utc>) -> Vec<LogEntry> {
284        let ast = parse(q).expect("test queries are well-formed");
285        execute_at(&ast, conn, None, now).expect("execute")
286    }
287
288    fn make_entry(ts: &str, level: &str, message: &str) -> LogEntry {
289        let raw = format!(r#"{{"timestamp":"{ts}","level":"{level}","message":"{message}"}}"#);
290        let mut e = LogEntry::new(raw);
291        e.timestamp = Some(ts.to_string());
292        e.level = Some(level.to_string());
293        e.message = Some(message.to_string());
294        e
295    }
296
297    fn fixture() -> Indexer {
298        let mut idx = Indexer::open_in_memory().unwrap();
299        let mut a = make_entry("2026-04-20T10:00:00Z", "error", "payment failed");
300        a.tag = Some("api".into());
301        a.fields
302            .insert("service".into(), Value::String("payments".into()));
303        a.fields.insert("req_id".into(), Value::from(100));
304
305        let mut b = make_entry("2026-04-20T11:00:00Z", "info", "health check");
306        b.tag = Some("api".into());
307        b.fields
308            .insert("service".into(), Value::String("payments".into()));
309        b.fields.insert("req_id".into(), Value::from(200));
310
311        let mut c = make_entry("2026-04-20T12:00:00Z", "error", "timeout on db call");
312        c.fields
313            .insert("service".into(), Value::String("users".into()));
314        c.fields.insert("req_id".into(), Value::from(300));
315
316        idx.insert_batch(&[a, b, c]).unwrap();
317        idx
318    }
319
320    // --- SQL generation (inspection) ---
321
322    #[test]
323    fn compare_on_known_field_binds_value_not_interpolates() {
324        let ast = parse("level=error").unwrap();
325        let (sql, binds) = build_sql(&ast, None, Utc::now()).unwrap();
326        assert!(sql.contains("WHERE level = ?"));
327        assert!(!sql.contains("error"));
328        assert_eq!(binds.len(), 1);
329        match &binds[0] {
330            SqlValue::Text(s) => assert_eq!(s, "error"),
331            other => panic!("expected text bind, got {other:?}"),
332        }
333    }
334
335    #[test]
336    fn compare_on_unknown_field_uses_json_extract() {
337        let ast = parse("service=payments").unwrap();
338        let (sql, binds) = build_sql(&ast, None, Utc::now()).unwrap();
339        assert!(sql.contains("json_extract(fields, '$.service')"));
340        assert_eq!(binds.len(), 1);
341    }
342
343    #[test]
344    fn contains_uses_like_with_escape_and_wildcards() {
345        let ast = parse(r#"message contains "timeout""#).unwrap();
346        let (sql, binds) = build_sql(&ast, None, Utc::now()).unwrap();
347        assert!(sql.contains("LIKE ? ESCAPE '\\'"));
348        match &binds[0] {
349            SqlValue::Text(s) => assert_eq!(s, "%timeout%"),
350            other => panic!("expected text bind, got {other:?}"),
351        }
352    }
353
354    #[test]
355    fn contains_escapes_like_metacharacters() {
356        let ast = parse(r#"message contains "50%""#).unwrap();
357        let (_, binds) = build_sql(&ast, None, Utc::now()).unwrap();
358        match &binds[0] {
359            SqlValue::Text(s) => assert_eq!(s, r"%50\%%"),
360            other => panic!("unexpected bind: {other:?}"),
361        }
362    }
363
364    #[test]
365    fn last_duration_produces_timestamp_lower_bound() {
366        let ast = parse("last 2h").unwrap();
367        let now = Utc.with_ymd_and_hms(2026, 4, 20, 12, 0, 0).unwrap();
368        let (sql, binds) = build_sql(&ast, None, now).unwrap();
369        assert!(sql.contains("timestamp >= ?"));
370        match &binds[0] {
371            SqlValue::Text(s) => assert!(s.starts_with("2026-04-20T10:00:00")),
372            other => panic!("unexpected bind: {other:?}"),
373        }
374    }
375
376    #[test]
377    fn since_accepts_rfc3339() {
378        let ast = parse(r#"since "2024-01-01T10:00:00Z""#).unwrap();
379        let (sql, binds) = build_sql(&ast, None, Utc::now()).unwrap();
380        assert!(sql.contains("timestamp >= ?"));
381        match &binds[0] {
382            SqlValue::Text(s) => assert!(s.starts_with("2024-01-01T10:00:00")),
383            other => panic!("unexpected: {other:?}"),
384        }
385    }
386
387    #[test]
388    fn since_accepts_bare_date() {
389        let ast = parse("since 2024-06-15").unwrap();
390        let (_, binds) = build_sql(&ast, None, Utc::now()).unwrap();
391        match &binds[0] {
392            SqlValue::Text(s) => assert!(s.starts_with("2024-06-15T00:00:00")),
393            other => panic!("unexpected: {other:?}"),
394        }
395    }
396
397    #[test]
398    fn since_rejects_garbage() {
399        let ast = parse("since not-a-date").unwrap();
400        let err = build_sql(&ast, None, Utc::now()).unwrap_err();
401        assert!(matches!(err, LogdiveError::InvalidDatetime { .. }));
402    }
403
404    #[test]
405    fn and_chain_joins_with_and_and_preserves_bind_order() {
406        let ast = parse("level=error AND service=payments").unwrap();
407        let (sql, binds) = build_sql(&ast, None, Utc::now()).unwrap();
408        assert!(sql.contains("level = ?"));
409        assert!(sql.contains("json_extract(fields, '$.service') = ?"));
410        assert!(sql.contains(" AND "));
411        assert_eq!(binds.len(), 2);
412        match (&binds[0], &binds[1]) {
413            (SqlValue::Text(a), SqlValue::Text(b)) => {
414                assert_eq!(a, "error");
415                assert_eq!(b, "payments");
416            }
417            other => panic!("unexpected binds: {other:?}"),
418        }
419    }
420
421    #[test]
422    fn integer_binds_as_integer_not_text() {
423        let ast = parse("req_id > 100").unwrap();
424        let (_, binds) = build_sql(&ast, None, Utc::now()).unwrap();
425        match &binds[0] {
426            SqlValue::Integer(n) => assert_eq!(*n, 100),
427            other => panic!("expected integer bind, got {other:?}"),
428        }
429    }
430
431    #[test]
432    fn bool_binds_as_integer_zero_or_one() {
433        let ast = parse("ok=true").unwrap();
434        let (_, binds) = build_sql(&ast, None, Utc::now()).unwrap();
435        assert!(matches!(binds[0], SqlValue::Integer(1)));
436
437        let ast = parse("ok=false").unwrap();
438        let (_, binds) = build_sql(&ast, None, Utc::now()).unwrap();
439        assert!(matches!(binds[0], SqlValue::Integer(0)));
440    }
441
442    #[test]
443    fn float_binds_as_real() {
444        let ast = parse("duration < 1.5").unwrap();
445        let (_, binds) = build_sql(&ast, None, Utc::now()).unwrap();
446        match &binds[0] {
447            SqlValue::Real(f) => assert!((f - 1.5).abs() < 1e-9),
448            other => panic!("expected real bind, got {other:?}"),
449        }
450    }
451
452    #[test]
453    fn limit_appends_limit_clause() {
454        let ast = parse("level=error").unwrap();
455        let (sql, _) = build_sql(&ast, Some(50), Utc::now()).unwrap();
456        assert!(sql.ends_with("LIMIT 50"));
457    }
458
459    // --- Round-trip: insert, query, assert results ---
460
461    #[test]
462    fn round_trip_known_field_equality() {
463        let idx = fixture();
464        let rows = run_query(idx.connection(), "level=error");
465        assert_eq!(rows.len(), 2);
466        assert!(rows.iter().all(|e| e.level.as_deref() == Some("error")));
467    }
468
469    #[test]
470    fn round_trip_unknown_field_via_json_extract() {
471        let idx = fixture();
472        let rows = run_query(idx.connection(), "service=payments");
473        assert_eq!(rows.len(), 2);
474        assert!(
475            rows.iter()
476                .all(|e| e.fields.get("service") == Some(&Value::String("payments".into())))
477        );
478    }
479
480    #[test]
481    fn round_trip_and_chain() {
482        let idx = fixture();
483        let rows = run_query(idx.connection(), "level=error AND service=payments");
484        assert_eq!(rows.len(), 1);
485        assert_eq!(rows[0].message.as_deref(), Some("payment failed"));
486    }
487
488    #[test]
489    fn round_trip_contains_substring_match() {
490        let idx = fixture();
491        let rows = run_query(idx.connection(), r#"message contains "timeout""#);
492        assert_eq!(rows.len(), 1);
493        assert!(rows[0].message.as_deref().unwrap().contains("timeout"));
494    }
495
496    #[test]
497    fn round_trip_numeric_comparison_on_json_field() {
498        let idx = fixture();
499        let rows = run_query(idx.connection(), "req_id > 150");
500        assert_eq!(rows.len(), 2);
501        let ids: HashSet<i64> = rows
502            .iter()
503            .map(|e| e.fields.get("req_id").and_then(|v| v.as_i64()).unwrap())
504            .collect();
505        assert_eq!(ids, HashSet::from([200, 300]));
506    }
507
508    #[test]
509    fn round_trip_last_duration_uses_now() {
510        let idx = fixture();
511        let now = Utc.with_ymd_and_hms(2026, 4, 20, 13, 0, 0).unwrap();
512        let rows = run_query_at(idx.connection(), "last 3h", now);
513        assert_eq!(rows.len(), 3);
514
515        let rows = run_query_at(idx.connection(), "last 70m", now);
516        assert_eq!(rows.len(), 1);
517        assert_eq!(rows[0].timestamp.as_deref(), Some("2026-04-20T12:00:00Z"));
518    }
519
520    #[test]
521    fn round_trip_since_datetime() {
522        let idx = fixture();
523        let rows = run_query(idx.connection(), "since 2026-04-20T11:00:00Z");
524        assert_eq!(rows.len(), 2);
525    }
526
527    #[test]
528    fn round_trip_results_ordered_newest_first() {
529        let idx = fixture();
530        let rows = run_query(idx.connection(), "level=error");
531        assert!(rows[0].timestamp > rows[1].timestamp);
532    }
533
534    #[test]
535    fn round_trip_not_equal_operator() {
536        let idx = fixture();
537        let rows = run_query(idx.connection(), "level!=error");
538        assert_eq!(rows.len(), 1);
539        assert_eq!(rows[0].level.as_deref(), Some("info"));
540    }
541
542    #[test]
543    fn round_trip_contains_with_wildcard_character_is_literal() {
544        let mut idx = Indexer::open_in_memory().unwrap();
545        let a = make_entry("2026-04-20T10:00:00Z", "info", "discount 50% today");
546        let b = make_entry("2026-04-20T11:00:00Z", "info", "no special char here");
547        idx.insert_batch(&[a, b]).unwrap();
548
549        let rows = run_query(idx.connection(), r#"message contains "50%""#);
550        assert_eq!(rows.len(), 1);
551        assert!(rows[0].message.as_deref().unwrap().contains("50%"));
552    }
553
554    #[test]
555    fn round_trip_empty_result_is_empty_vec_not_error() {
556        let idx = fixture();
557        let rows = run_query(idx.connection(), "level=nonsense");
558        assert!(rows.is_empty());
559    }
560
561    #[test]
562    fn round_trip_reconstructs_fields_map() {
563        let idx = fixture();
564        let rows = run_query(idx.connection(), "level=error AND service=payments");
565        assert_eq!(rows.len(), 1);
566        let e = &rows[0];
567        assert_eq!(
568            e.fields.get("service"),
569            Some(&Value::String("payments".into()))
570        );
571        assert_eq!(e.fields.get("req_id").and_then(|v| v.as_i64()), Some(100));
572    }
573
574    // --- Safety guards ---
575
576    #[test]
577    fn unsafe_field_name_is_rejected_at_executor() {
578        let result = column_for_field("service; DROP TABLE log_entries--");
579        assert!(matches!(result, Err(LogdiveError::UnsafeFieldName(_))));
580    }
581}