Skip to main content

logdive_core/
executor.rs

1//! Query executor: translate a [`QueryNode`] into parameterized SQL, run
2//! it against the index, and reconstruct [`LogEntry`] values from the
3//! result rows.
4//!
5//! This module is the bridge between the query AST and the SQLite schema.
6//! It never mixes user-controlled strings into SQL text — every literal
7//! value is bound as a parameter. The one exception is JSON extraction
8//! paths like `$.service`, which embed the field name directly because
9//! SQLite parameters aren't allowed inside `json_extract` path expressions;
10//! safety there comes from the field name having passed
11//! `validate_field_name`'s strict regex in the parser, which we
12//! defensively re-check at the executor boundary.
13//!
14//! # Disjunction (OR) shape
15//!
16//! v0.2.0 introduced OR. The AST is `QueryNode::Or(Vec<AndGroup>)` where
17//! each [`AndGroup`] is a conjunction of clauses. The SQL we emit
18//! parenthesizes each AND-group and joins them with ` OR `:
19//!
20//! ```sql
21//! WHERE (level = ? AND json_extract(fields, '$.service') = ?)
22//!    OR (level = ?)
23//! ```
24//!
25//! For queries with no OR (a single AND-group), the parens are still
26//! emitted — the alternative is a special-case branch in the SQL
27//! builder that adds maintenance cost without performance benefit.
28//! SQLite's planner ignores redundant parens.
29//!
30//! # Timestamp handling
31//!
32//! Timestamps are compared as TEXT, which works correctly for any ISO-8601
33//! format because those sort lexicographically in chronological order when
34//! all components are fixed-width. Ingested timestamps that aren't ISO-8601
35//! shaped will compare incorrectly against `last`/`since` bounds — a known
36//! limitation of accepting arbitrary timestamp strings at ingestion time.
37
38use chrono::{DateTime, NaiveDate, NaiveDateTime, TimeZone, Utc};
39use rusqlite::{Connection, params_from_iter, types::Value as SqlValue};
40use serde_json::{Map, Value};
41
42use crate::entry::LogEntry;
43use crate::error::{LogdiveError, Result};
44use crate::query::{AndGroup, Clause, Duration, QueryNode, QueryValue};
45
46// ---------------------------------------------------------------------------
47// Public entry point
48// ---------------------------------------------------------------------------
49
50/// Execute a parsed query against the index and return matching entries.
51///
52/// `limit` caps the result set size; pass `None` for no limit. Results are
53/// ordered by `timestamp DESC, id DESC` (newest first, with row id as
54/// stable tiebreaker for identical timestamps).
55pub fn execute(
56    query: &QueryNode,
57    conn: &Connection,
58    limit: Option<usize>,
59) -> Result<Vec<LogEntry>> {
60    let (sql, binds) = build_sql(query, limit, Utc::now())?;
61    run(conn, &sql, &binds)
62}
63
64/// Variant of [`execute`] that uses a caller-supplied "now" value.
65///
66/// Exposed for testing so time-range clauses produce deterministic bounds.
67pub fn execute_at(
68    query: &QueryNode,
69    conn: &Connection,
70    limit: Option<usize>,
71    now: DateTime<Utc>,
72) -> Result<Vec<LogEntry>> {
73    let (sql, binds) = build_sql(query, limit, now)?;
74    run(conn, &sql, &binds)
75}
76
77// ---------------------------------------------------------------------------
78// SQL generation
79// ---------------------------------------------------------------------------
80
81/// Intermediate representation of a bindable value, kept as an owned
82/// `SqlValue` so `params_from_iter` can consume them without lifetime
83/// gymnastics.
84type Bind = SqlValue;
85
86fn build_sql(
87    query: &QueryNode,
88    limit: Option<usize>,
89    now: DateTime<Utc>,
90) -> Result<(String, Vec<Bind>)> {
91    let QueryNode::Or(groups) = query;
92
93    // The parser guarantees at least one AND-group, and each AND-group has
94    // at least one clause. Treat both invariants defensively here so a bug
95    // upstream produces a recognizable runtime shape rather than a SQL
96    // syntax error from an empty `WHERE` clause.
97    let mut group_sqls: Vec<String> = Vec::with_capacity(groups.len());
98    let mut binds: Vec<Bind> = Vec::new();
99
100    for group in groups {
101        let (group_sql, mut group_binds) = translate_and_group(group, now)?;
102        group_sqls.push(group_sql);
103        binds.append(&mut group_binds);
104    }
105
106    let where_sql = if group_sqls.is_empty() {
107        // Defensive: should be unreachable given the parser contract.
108        "1=1".to_string()
109    } else {
110        // ` OR ` between groups, each already parenthesized.
111        group_sqls.join(" OR ")
112    };
113
114    let mut sql = format!(
115        "SELECT timestamp, level, message, tag, fields, raw \
116         FROM log_entries \
117         WHERE {where_sql} \
118         ORDER BY timestamp DESC, id DESC"
119    );
120    if let Some(n) = limit {
121        sql.push_str(&format!(" LIMIT {n}"));
122    }
123    Ok((sql, binds))
124}
125
126/// Translate one AND-group into a parenthesized SQL fragment and the
127/// associated bind values, in clause-declaration order.
128///
129/// Always parenthesizes — including the single-clause case. Uniformity in
130/// the SQL emitter outweighs prettiness in the rare query-debugger view.
131fn translate_and_group(group: &AndGroup, now: DateTime<Utc>) -> Result<(String, Vec<Bind>)> {
132    let mut clause_sqls: Vec<String> = Vec::with_capacity(group.clauses.len());
133    let mut binds: Vec<Bind> = Vec::new();
134
135    for clause in &group.clauses {
136        let (sql, mut clause_binds) = translate_clause(clause, now)?;
137        clause_sqls.push(sql);
138        binds.append(&mut clause_binds);
139    }
140
141    let inner = if clause_sqls.is_empty() {
142        // Defensive: parser guarantees non-empty AND-groups.
143        "1=1".to_string()
144    } else {
145        clause_sqls.join(" AND ")
146    };
147    Ok((format!("({inner})"), binds))
148}
149
150fn translate_clause(clause: &Clause, now: DateTime<Utc>) -> Result<(String, Vec<Bind>)> {
151    match clause {
152        Clause::Compare { field, op, value } => {
153            let column_expr = column_for_field(field)?;
154            let sql = format!("{column_expr} {op} ?");
155            Ok((sql, vec![value_to_bind(value)]))
156        }
157        Clause::Contains { field, value } => {
158            let column_expr = column_for_field(field)?;
159            // Escape SQL LIKE metacharacters (%, _, \) so a user searching
160            // for a literal '%' doesn't accidentally wildcard the world.
161            let escaped = escape_like(value);
162            let pattern = format!("%{escaped}%");
163            let sql = format!("{column_expr} LIKE ? ESCAPE '\\'");
164            Ok((sql, vec![SqlValue::Text(pattern)]))
165        }
166        Clause::LastDuration(d) => {
167            let cutoff = compute_last_cutoff(*d, now);
168            Ok((
169                "timestamp >= ?".to_string(),
170                vec![SqlValue::Text(cutoff.to_rfc3339())],
171            ))
172        }
173        Clause::SinceDatetime(s) => {
174            let dt = parse_datetime(s)?;
175            Ok((
176                "timestamp >= ?".to_string(),
177                vec![SqlValue::Text(dt.to_rfc3339())],
178            ))
179        }
180    }
181}
182
183/// Return the SQL expression that references a given query field.
184///
185/// Known fields resolve to indexed columns. Unknown fields resolve to a
186/// `json_extract(fields, '$.<field>')` expression — which is why the
187/// field name must survive `validate_field_name`'s regex *and* the
188/// defensive check here.
189fn column_for_field(field: &str) -> Result<String> {
190    if LogEntry::KNOWN_KEYS.contains(&field) {
191        Ok(field.to_string())
192    } else {
193        if !is_safe_json_path_segment(field) {
194            return Err(LogdiveError::UnsafeFieldName(field.to_string()));
195        }
196        Ok(format!("json_extract(fields, '$.{field}')"))
197    }
198}
199
200/// Defensive: the parser's `validate_field_name` already enforces this,
201/// but we re-check at the SQL boundary so the trust model is obvious
202/// from inside this module. Allowed: letters, digits, `_`, `.`.
203fn is_safe_json_path_segment(s: &str) -> bool {
204    !s.is_empty()
205        && s.chars()
206            .next()
207            .map(|c| c.is_ascii_alphabetic() || c == '_')
208            .unwrap_or(false)
209        && s.chars()
210            .all(|c| c.is_ascii_alphanumeric() || c == '_' || c == '.')
211}
212
213fn value_to_bind(v: &QueryValue) -> Bind {
214    match v {
215        QueryValue::String(s) => SqlValue::Text(s.clone()),
216        QueryValue::Integer(n) => SqlValue::Integer(*n),
217        QueryValue::Float(f) => SqlValue::Real(*f),
218        QueryValue::Bool(b) => SqlValue::Integer(if *b { 1 } else { 0 }),
219    }
220}
221
222/// Pre-escape SQL LIKE wildcards (`%`, `_`) and the escape character
223/// itself so a user's literal CONTAINS string is matched literally.
224fn escape_like(input: &str) -> String {
225    let mut out = String::with_capacity(input.len());
226    for ch in input.chars() {
227        match ch {
228            '\\' | '%' | '_' => {
229                out.push('\\');
230                out.push(ch);
231            }
232            _ => out.push(ch),
233        }
234    }
235    out
236}
237
238fn compute_last_cutoff(d: Duration, now: DateTime<Utc>) -> DateTime<Utc> {
239    // `amount` is u64; promote to i64 for chrono. Saturate on the
240    // (astronomically unlikely) overflow case.
241    let amount_i64 = i64::try_from(d.amount).unwrap_or(i64::MAX);
242    let secs = amount_i64.saturating_mul(d.unit.seconds());
243    let delta = chrono::Duration::seconds(secs);
244    now.checked_sub_signed(delta).unwrap_or_else(|| {
245        Utc.timestamp_opt(0, 0)
246            .single()
247            .expect("unix epoch is valid")
248    })
249}
250
251/// Accept three datetime formats for `since` clauses:
252///   - RFC3339 / ISO-8601 with timezone (e.g. `2024-01-01T10:00:00Z`)
253///   - ISO naive datetime (e.g. `2024-01-01 10:00:00` or `2024-01-01T10:00:00`), interpreted as UTC
254///   - ISO date (e.g. `2024-01-01`), interpreted as UTC midnight
255fn parse_datetime(s: &str) -> Result<DateTime<Utc>> {
256    if let Ok(dt) = DateTime::parse_from_rfc3339(s) {
257        return Ok(dt.with_timezone(&Utc));
258    }
259    for fmt in &["%Y-%m-%dT%H:%M:%S", "%Y-%m-%d %H:%M:%S"] {
260        if let Ok(ndt) = NaiveDateTime::parse_from_str(s, fmt) {
261            return Ok(Utc.from_utc_datetime(&ndt));
262        }
263    }
264    if let Ok(nd) = NaiveDate::parse_from_str(s, "%Y-%m-%d") {
265        let ndt = nd.and_hms_opt(0, 0, 0).expect("00:00:00 is valid");
266        return Ok(Utc.from_utc_datetime(&ndt));
267    }
268    Err(LogdiveError::InvalidDatetime {
269        input: s.to_string(),
270        reason: "expected RFC3339, `YYYY-MM-DD HH:MM:SS`, or `YYYY-MM-DD`".to_string(),
271    })
272}
273
274// ---------------------------------------------------------------------------
275// Execution
276// ---------------------------------------------------------------------------
277
278fn run(conn: &Connection, sql: &str, binds: &[Bind]) -> Result<Vec<LogEntry>> {
279    let mut stmt = conn.prepare(sql)?;
280    let rows = stmt.query_map(params_from_iter(binds.iter()), |row| {
281        let timestamp: Option<String> = row.get(0)?;
282        let level: Option<String> = row.get(1)?;
283        let message: Option<String> = row.get(2)?;
284        let tag: Option<String> = row.get(3)?;
285        let fields_json: String = row.get(4)?;
286        let raw: String = row.get(5)?;
287        // We tunnel the raw JSON out; deserialization happens below so the
288        // closure's error type stays `rusqlite::Error`.
289        Ok((timestamp, level, message, tag, fields_json, raw))
290    })?;
291
292    let mut out = Vec::new();
293    for row in rows {
294        let (timestamp, level, message, tag, fields_json, raw) = row?;
295        let fields: Map<String, Value> =
296            serde_json::from_str(&fields_json).map_err(LogdiveError::CorruptFieldsJson)?;
297        out.push(LogEntry {
298            timestamp,
299            level,
300            message,
301            tag,
302            fields,
303            raw,
304        });
305    }
306    Ok(out)
307}
308
309// ---------------------------------------------------------------------------
310// Tests
311// ---------------------------------------------------------------------------
312
313#[cfg(test)]
314mod tests {
315    use super::*;
316    use crate::indexer::Indexer;
317    use crate::query::parse;
318    use std::collections::HashSet;
319
320    /// Convenience: parse a query string and run it against the given
321    /// connection. Panics if parsing fails — tests pass well-formed input.
322    fn run_query(conn: &Connection, q: &str) -> Vec<LogEntry> {
323        let ast = parse(q).expect("test queries are well-formed");
324        execute(&ast, conn, None).expect("execute")
325    }
326
327    fn run_query_at(conn: &Connection, q: &str, now: DateTime<Utc>) -> Vec<LogEntry> {
328        let ast = parse(q).expect("test queries are well-formed");
329        execute_at(&ast, conn, None, now).expect("execute")
330    }
331
332    fn make_entry(ts: &str, level: &str, message: &str) -> LogEntry {
333        let raw = format!(r#"{{"timestamp":"{ts}","level":"{level}","message":"{message}"}}"#);
334        let mut e = LogEntry::new(raw);
335        e.timestamp = Some(ts.to_string());
336        e.level = Some(level.to_string());
337        e.message = Some(message.to_string());
338        e
339    }
340
341    fn fixture() -> Indexer {
342        let mut idx = Indexer::open_in_memory().unwrap();
343        let mut a = make_entry("2026-04-20T10:00:00Z", "error", "payment failed");
344        a.tag = Some("api".into());
345        a.fields
346            .insert("service".into(), Value::String("payments".into()));
347        a.fields.insert("req_id".into(), Value::from(100));
348
349        let mut b = make_entry("2026-04-20T11:00:00Z", "info", "health check");
350        b.tag = Some("api".into());
351        b.fields
352            .insert("service".into(), Value::String("payments".into()));
353        b.fields.insert("req_id".into(), Value::from(200));
354
355        let mut c = make_entry("2026-04-20T12:00:00Z", "error", "timeout on db call");
356        c.fields
357            .insert("service".into(), Value::String("users".into()));
358        c.fields.insert("req_id".into(), Value::from(300));
359
360        idx.insert_batch(&[a, b, c]).unwrap();
361        idx
362    }
363
364    /// Three-row fixture covering levels error/warn/info across two services.
365    /// Used by OR-specific round-trip tests where we want at least one row
366    /// per level to exercise multi-group disjunction.
367    fn three_level_fixture() -> Indexer {
368        let mut idx = Indexer::open_in_memory().unwrap();
369        let a = make_entry("2026-04-20T09:00:00Z", "error", "boom");
370        let b = make_entry("2026-04-20T10:00:00Z", "warn", "slow query");
371        let c = make_entry("2026-04-20T11:00:00Z", "info", "ok");
372        idx.insert_batch(&[a, b, c]).unwrap();
373        idx
374    }
375
376    // -----------------------------------------------------------------
377    // SQL generation (inspection) — single AND-group cases (no OR)
378    // -----------------------------------------------------------------
379
380    #[test]
381    fn compare_on_known_field_binds_value_not_interpolates() {
382        let ast = parse("level=error").unwrap();
383        let (sql, binds) = build_sql(&ast, None, Utc::now()).unwrap();
384        // Always-parenthesized invariant: even single-clause queries are
385        // wrapped in parens.
386        assert!(sql.contains("WHERE (level = ?)"));
387        assert!(!sql.contains("error"));
388        assert_eq!(binds.len(), 1);
389        match &binds[0] {
390            SqlValue::Text(s) => assert_eq!(s, "error"),
391            other => panic!("expected text bind, got {other:?}"),
392        }
393    }
394
395    #[test]
396    fn compare_on_unknown_field_uses_json_extract() {
397        let ast = parse("service=payments").unwrap();
398        let (sql, binds) = build_sql(&ast, None, Utc::now()).unwrap();
399        assert!(sql.contains("json_extract(fields, '$.service')"));
400        assert_eq!(binds.len(), 1);
401    }
402
403    #[test]
404    fn contains_uses_like_with_escape_and_wildcards() {
405        let ast = parse(r#"message contains "timeout""#).unwrap();
406        let (sql, binds) = build_sql(&ast, None, Utc::now()).unwrap();
407        assert!(sql.contains("LIKE ? ESCAPE '\\'"));
408        match &binds[0] {
409            SqlValue::Text(s) => assert_eq!(s, "%timeout%"),
410            other => panic!("expected text bind, got {other:?}"),
411        }
412    }
413
414    #[test]
415    fn contains_escapes_like_metacharacters() {
416        let ast = parse(r#"message contains "50%""#).unwrap();
417        let (_, binds) = build_sql(&ast, None, Utc::now()).unwrap();
418        match &binds[0] {
419            SqlValue::Text(s) => assert_eq!(s, r"%50\%%"),
420            other => panic!("unexpected bind: {other:?}"),
421        }
422    }
423
424    #[test]
425    fn last_duration_produces_timestamp_lower_bound() {
426        let ast = parse("last 2h").unwrap();
427        let now = Utc.with_ymd_and_hms(2026, 4, 20, 12, 0, 0).unwrap();
428        let (sql, binds) = build_sql(&ast, None, now).unwrap();
429        assert!(sql.contains("timestamp >= ?"));
430        match &binds[0] {
431            SqlValue::Text(s) => assert!(s.starts_with("2026-04-20T10:00:00")),
432            other => panic!("unexpected bind: {other:?}"),
433        }
434    }
435
436    #[test]
437    fn since_accepts_rfc3339() {
438        let ast = parse(r#"since "2024-01-01T10:00:00Z""#).unwrap();
439        let (sql, binds) = build_sql(&ast, None, Utc::now()).unwrap();
440        assert!(sql.contains("timestamp >= ?"));
441        match &binds[0] {
442            SqlValue::Text(s) => assert!(s.starts_with("2024-01-01T10:00:00")),
443            other => panic!("unexpected: {other:?}"),
444        }
445    }
446
447    #[test]
448    fn since_accepts_bare_date() {
449        let ast = parse("since 2024-06-15").unwrap();
450        let (_, binds) = build_sql(&ast, None, Utc::now()).unwrap();
451        match &binds[0] {
452            SqlValue::Text(s) => assert!(s.starts_with("2024-06-15T00:00:00")),
453            other => panic!("unexpected: {other:?}"),
454        }
455    }
456
457    #[test]
458    fn since_rejects_garbage() {
459        let ast = parse("since not-a-date").unwrap();
460        let err = build_sql(&ast, None, Utc::now()).unwrap_err();
461        assert!(matches!(err, LogdiveError::InvalidDatetime { .. }));
462    }
463
464    #[test]
465    fn and_chain_joins_with_and_inside_a_single_group() {
466        let ast = parse("level=error AND service=payments").unwrap();
467        let (sql, binds) = build_sql(&ast, None, Utc::now()).unwrap();
468        assert!(sql.contains("level = ?"));
469        assert!(sql.contains("json_extract(fields, '$.service') = ?"));
470        // AND inside a single parenthesized group, no OR.
471        assert!(sql.contains(" AND "));
472        assert!(!sql.contains(" OR "));
473        // Always-parenthesized invariant.
474        assert!(sql.contains("WHERE (level = ? AND json_extract(fields, '$.service') = ?)"));
475        assert_eq!(binds.len(), 2);
476        match (&binds[0], &binds[1]) {
477            (SqlValue::Text(a), SqlValue::Text(b)) => {
478                assert_eq!(a, "error");
479                assert_eq!(b, "payments");
480            }
481            other => panic!("unexpected binds: {other:?}"),
482        }
483    }
484
485    #[test]
486    fn integer_binds_as_integer_not_text() {
487        let ast = parse("req_id > 100").unwrap();
488        let (_, binds) = build_sql(&ast, None, Utc::now()).unwrap();
489        match &binds[0] {
490            SqlValue::Integer(n) => assert_eq!(*n, 100),
491            other => panic!("expected integer bind, got {other:?}"),
492        }
493    }
494
495    #[test]
496    fn bool_binds_as_integer_zero_or_one() {
497        let ast = parse("ok=true").unwrap();
498        let (_, binds) = build_sql(&ast, None, Utc::now()).unwrap();
499        assert!(matches!(binds[0], SqlValue::Integer(1)));
500
501        let ast = parse("ok=false").unwrap();
502        let (_, binds) = build_sql(&ast, None, Utc::now()).unwrap();
503        assert!(matches!(binds[0], SqlValue::Integer(0)));
504    }
505
506    #[test]
507    fn float_binds_as_real() {
508        let ast = parse("duration < 1.5").unwrap();
509        let (_, binds) = build_sql(&ast, None, Utc::now()).unwrap();
510        match &binds[0] {
511            SqlValue::Real(f) => assert!((f - 1.5).abs() < 1e-9),
512            other => panic!("expected real bind, got {other:?}"),
513        }
514    }
515
516    #[test]
517    fn limit_appends_limit_clause() {
518        let ast = parse("level=error").unwrap();
519        let (sql, _) = build_sql(&ast, Some(50), Utc::now()).unwrap();
520        assert!(sql.ends_with("LIMIT 50"));
521    }
522
523    // -----------------------------------------------------------------
524    // SQL generation — OR cases (new in v0.2.0)
525    // -----------------------------------------------------------------
526
527    #[test]
528    fn or_emits_two_parenthesized_groups_joined_by_or() {
529        let ast = parse("level=error OR level=warn").unwrap();
530        let (sql, binds) = build_sql(&ast, None, Utc::now()).unwrap();
531        // Two parenthesized AND-groups joined by ` OR `.
532        assert!(sql.contains("WHERE (level = ?) OR (level = ?)"));
533        // Bind order matches clause order across the OR boundary.
534        match (&binds[0], &binds[1]) {
535            (SqlValue::Text(a), SqlValue::Text(b)) => {
536                assert_eq!(a, "error");
537                assert_eq!(b, "warn");
538            }
539            other => panic!("unexpected binds: {other:?}"),
540        }
541    }
542
543    #[test]
544    fn or_with_three_groups_joins_with_two_or_keywords() {
545        let ast = parse("level=error OR level=warn OR level=fatal").unwrap();
546        let (sql, _binds) = build_sql(&ast, None, Utc::now()).unwrap();
547        // Exactly two ` OR ` separators between three groups.
548        assert_eq!(sql.matches(" OR ").count(), 2);
549        // All three values appear bound (we rely on the matches above
550        // for the count; spot-check shape here).
551        assert!(sql.contains("(level = ?) OR (level = ?) OR (level = ?)"));
552    }
553
554    #[test]
555    fn or_with_and_inside_each_group_emits_correct_shape() {
556        // (level=error AND service=payments) OR (level=warn)
557        let ast = parse("level=error AND service=payments OR level=warn").unwrap();
558        let (sql, binds) = build_sql(&ast, None, Utc::now()).unwrap();
559        assert!(sql.contains(
560            "WHERE (level = ? AND json_extract(fields, '$.service') = ?) OR (level = ?)"
561        ));
562        // Bind order: error, payments, warn — preserved across OR.
563        assert_eq!(binds.len(), 3);
564        match (&binds[0], &binds[1], &binds[2]) {
565            (SqlValue::Text(a), SqlValue::Text(b), SqlValue::Text(c)) => {
566                assert_eq!(a, "error");
567                assert_eq!(b, "payments");
568                assert_eq!(c, "warn");
569            }
570            other => panic!("unexpected binds: {other:?}"),
571        }
572    }
573
574    #[test]
575    fn or_with_and_on_both_sides_preserves_bind_ordering() {
576        // (a=1 AND b=2) OR (c=3 AND d=4) — exercises bind ordering across
577        // OR boundary with mixed integer values.
578        let ast = parse("a=1 AND b=2 OR c=3 AND d=4").unwrap();
579        let (sql, binds) = build_sql(&ast, None, Utc::now()).unwrap();
580        assert_eq!(sql.matches(" OR ").count(), 1);
581        assert_eq!(binds.len(), 4);
582        match (&binds[0], &binds[1], &binds[2], &binds[3]) {
583            (
584                SqlValue::Integer(a),
585                SqlValue::Integer(b),
586                SqlValue::Integer(c),
587                SqlValue::Integer(d),
588            ) => {
589                assert_eq!(*a, 1);
590                assert_eq!(*b, 2);
591                assert_eq!(*c, 3);
592                assert_eq!(*d, 4);
593            }
594            other => panic!("unexpected binds: {other:?}"),
595        }
596    }
597
598    #[test]
599    fn or_with_mixed_clause_kinds_in_each_group() {
600        // level=error OR message contains "timeout" OR last 30m
601        // Three groups, each with a different kind of clause.
602        let ast = parse(r#"level=error OR message contains "timeout" OR last 30m"#).unwrap();
603        let now = Utc.with_ymd_and_hms(2026, 4, 20, 12, 0, 0).unwrap();
604        let (sql, binds) = build_sql(&ast, None, now).unwrap();
605        assert_eq!(sql.matches(" OR ").count(), 2);
606        assert_eq!(binds.len(), 3);
607
608        // Group 1: text bind for the level value.
609        assert!(matches!(&binds[0], SqlValue::Text(s) if s == "error"));
610        // Group 2: text bind with %timeout% LIKE pattern.
611        assert!(matches!(&binds[1], SqlValue::Text(s) if s == "%timeout%"));
612        // Group 3: text bind with the cutoff timestamp.
613        match &binds[2] {
614            SqlValue::Text(s) => assert!(s.starts_with("2026-04-20T11:30:00")),
615            other => panic!("expected text cutoff, got {other:?}"),
616        }
617    }
618
619    #[test]
620    fn limit_applies_to_or_query_too() {
621        let ast = parse("level=error OR level=warn").unwrap();
622        let (sql, _) = build_sql(&ast, Some(25), Utc::now()).unwrap();
623        assert!(sql.ends_with("LIMIT 25"));
624        // LIMIT is outside the WHERE clause.
625        assert!(sql.contains(" ORDER BY "));
626    }
627
628    // -----------------------------------------------------------------
629    // Round-trip: insert, query (without OR), assert results
630    // -----------------------------------------------------------------
631
632    #[test]
633    fn round_trip_known_field_equality() {
634        let idx = fixture();
635        let rows = run_query(idx.connection(), "level=error");
636        assert_eq!(rows.len(), 2);
637        assert!(rows.iter().all(|e| e.level.as_deref() == Some("error")));
638    }
639
640    #[test]
641    fn round_trip_unknown_field_via_json_extract() {
642        let idx = fixture();
643        let rows = run_query(idx.connection(), "service=payments");
644        assert_eq!(rows.len(), 2);
645        assert!(
646            rows.iter()
647                .all(|e| e.fields.get("service") == Some(&Value::String("payments".into())))
648        );
649    }
650
651    #[test]
652    fn round_trip_and_chain() {
653        let idx = fixture();
654        let rows = run_query(idx.connection(), "level=error AND service=payments");
655        assert_eq!(rows.len(), 1);
656        assert_eq!(rows[0].message.as_deref(), Some("payment failed"));
657    }
658
659    #[test]
660    fn round_trip_contains_substring_match() {
661        let idx = fixture();
662        let rows = run_query(idx.connection(), r#"message contains "timeout""#);
663        assert_eq!(rows.len(), 1);
664        assert!(rows[0].message.as_deref().unwrap().contains("timeout"));
665    }
666
667    #[test]
668    fn round_trip_numeric_comparison_on_json_field() {
669        let idx = fixture();
670        let rows = run_query(idx.connection(), "req_id > 150");
671        assert_eq!(rows.len(), 2);
672        let ids: HashSet<i64> = rows
673            .iter()
674            .map(|e| e.fields.get("req_id").and_then(|v| v.as_i64()).unwrap())
675            .collect();
676        assert_eq!(ids, HashSet::from([200, 300]));
677    }
678
679    #[test]
680    fn round_trip_last_duration_uses_now() {
681        let idx = fixture();
682        let now = Utc.with_ymd_and_hms(2026, 4, 20, 13, 0, 0).unwrap();
683        let rows = run_query_at(idx.connection(), "last 3h", now);
684        assert_eq!(rows.len(), 3);
685
686        let rows = run_query_at(idx.connection(), "last 70m", now);
687        assert_eq!(rows.len(), 1);
688        assert_eq!(rows[0].timestamp.as_deref(), Some("2026-04-20T12:00:00Z"));
689    }
690
691    #[test]
692    fn round_trip_since_datetime() {
693        let idx = fixture();
694        let rows = run_query(idx.connection(), "since 2026-04-20T11:00:00Z");
695        assert_eq!(rows.len(), 2);
696    }
697
698    #[test]
699    fn round_trip_results_ordered_newest_first() {
700        let idx = fixture();
701        let rows = run_query(idx.connection(), "level=error");
702        assert!(rows[0].timestamp > rows[1].timestamp);
703    }
704
705    #[test]
706    fn round_trip_not_equal_operator() {
707        let idx = fixture();
708        let rows = run_query(idx.connection(), "level!=error");
709        assert_eq!(rows.len(), 1);
710        assert_eq!(rows[0].level.as_deref(), Some("info"));
711    }
712
713    #[test]
714    fn round_trip_contains_with_wildcard_character_is_literal() {
715        let mut idx = Indexer::open_in_memory().unwrap();
716        let a = make_entry("2026-04-20T10:00:00Z", "info", "discount 50% today");
717        let b = make_entry("2026-04-20T11:00:00Z", "info", "no special char here");
718        idx.insert_batch(&[a, b]).unwrap();
719
720        let rows = run_query(idx.connection(), r#"message contains "50%""#);
721        assert_eq!(rows.len(), 1);
722        assert!(rows[0].message.as_deref().unwrap().contains("50%"));
723    }
724
725    #[test]
726    fn round_trip_empty_result_is_empty_vec_not_error() {
727        let idx = fixture();
728        let rows = run_query(idx.connection(), "level=nonsense");
729        assert!(rows.is_empty());
730    }
731
732    #[test]
733    fn round_trip_reconstructs_fields_map() {
734        let idx = fixture();
735        let rows = run_query(idx.connection(), "level=error AND service=payments");
736        assert_eq!(rows.len(), 1);
737        let e = &rows[0];
738        assert_eq!(
739            e.fields.get("service"),
740            Some(&Value::String("payments".into()))
741        );
742        assert_eq!(e.fields.get("req_id").and_then(|v| v.as_i64()), Some(100));
743    }
744
745    // -----------------------------------------------------------------
746    // Round-trip: OR queries (new in v0.2.0)
747    // -----------------------------------------------------------------
748
749    #[test]
750    fn round_trip_or_two_groups_returns_union() {
751        let idx = three_level_fixture();
752        let rows = run_query(idx.connection(), "level=error OR level=warn");
753        assert_eq!(rows.len(), 2);
754
755        let levels: HashSet<String> = rows
756            .iter()
757            .map(|e| e.level.clone().unwrap_or_default())
758            .collect();
759        assert_eq!(
760            levels,
761            HashSet::from(["error".to_string(), "warn".to_string()])
762        );
763    }
764
765    #[test]
766    fn round_trip_or_three_groups_returns_full_union() {
767        let idx = three_level_fixture();
768        let rows = run_query(idx.connection(), "level=error OR level=warn OR level=info");
769        assert_eq!(rows.len(), 3);
770    }
771
772    #[test]
773    fn round_trip_or_does_not_double_count_overlapping_rows() {
774        // A row matched by both groups must appear once (SQL OR is set
775        // union over candidate rows in the WHERE evaluation, but each
776        // row is yielded once because the FROM is a single table).
777        let idx = three_level_fixture();
778        // Both clauses match the error row.
779        let rows = run_query(
780            idx.connection(),
781            r#"level=error OR message contains "boom""#,
782        );
783        assert_eq!(rows.len(), 1);
784        assert_eq!(rows[0].level.as_deref(), Some("error"));
785    }
786
787    #[test]
788    fn round_trip_or_respects_and_precedence() {
789        // (level=error AND service=payments) OR level=warn
790        // Fixture has one error row with service=payments, one error
791        // row with service=users, one info row, no warn rows.
792        // Adding a warn row to the fixture so the OR side has a match.
793        let mut idx = fixture();
794        let mut warn = make_entry("2026-04-20T13:00:00Z", "warn", "almost full");
795        warn.fields
796            .insert("service".into(), Value::String("orders".into()));
797        warn.fields.insert("req_id".into(), Value::from(400));
798        idx.insert_batch(&[warn]).unwrap();
799
800        let rows = run_query(
801            idx.connection(),
802            "level=error AND service=payments OR level=warn",
803        );
804        // One error/payments row + one warn row = 2.
805        assert_eq!(rows.len(), 2);
806        let messages: HashSet<String> = rows
807            .iter()
808            .map(|e| e.message.clone().unwrap_or_default())
809            .collect();
810        assert!(messages.contains("payment failed"));
811        assert!(messages.contains("almost full"));
812    }
813
814    #[test]
815    fn round_trip_or_with_time_range() {
816        // "Errors any time, OR anything in the last hour" is a real-
817        // world disjunction shape.
818        let idx = fixture();
819        let now = Utc.with_ymd_and_hms(2026, 4, 20, 12, 30, 0).unwrap();
820        let rows = run_query_at(idx.connection(), "level=error OR last 30m", now);
821        // The two error rows + the one row in the last 30m (12:00 row).
822        // The 12:00 row is also an error row, so dedup yields 2.
823        assert_eq!(rows.len(), 2);
824    }
825
826    #[test]
827    fn round_trip_or_yields_results_ordered_newest_first() {
828        // Ordering must be applied across the union, not within each group.
829        let idx = three_level_fixture();
830        let rows = run_query(idx.connection(), "level=error OR level=info");
831        assert_eq!(rows.len(), 2);
832        assert!(rows[0].timestamp > rows[1].timestamp);
833        // Newest is the info row at 11:00, oldest is the error at 09:00.
834        assert_eq!(rows[0].level.as_deref(), Some("info"));
835        assert_eq!(rows[1].level.as_deref(), Some("error"));
836    }
837
838    #[test]
839    fn round_trip_or_with_zero_matches_in_one_group_still_returns_other() {
840        let idx = three_level_fixture();
841        // First group matches nothing, second matches one row.
842        let rows = run_query(idx.connection(), "level=fatal OR level=warn");
843        assert_eq!(rows.len(), 1);
844        assert_eq!(rows[0].level.as_deref(), Some("warn"));
845    }
846
847    // -----------------------------------------------------------------
848    // Safety guards
849    // -----------------------------------------------------------------
850
851    #[test]
852    fn unsafe_field_name_is_rejected_at_executor() {
853        let result = column_for_field("service; DROP TABLE log_entries--");
854        assert!(matches!(result, Err(LogdiveError::UnsafeFieldName(_))));
855    }
856}