Skip to main content

logdive_core/
executor.rs

1//! Query executor: translate a [`QueryNode`] into parameterized SQL, run
2//! it against the index, and reconstruct [`LogEntry`] values from the
3//! result rows.
4//!
5//! This module is the bridge between the query AST and the SQLite schema.
6//! It never mixes user-controlled strings into SQL text — every literal
7//! value is bound as a parameter. The one exception is JSON extraction
8//! paths like `$.service`, which embed the field name directly because
9//! SQLite parameters aren't allowed inside `json_extract` path expressions;
10//! safety there comes from the field name having passed
11//! `validate_field_name`'s strict regex in the parser, which we
12//! defensively re-check at the executor boundary.
13//!
14//! # Disjunction (OR) shape
15//!
16//! v0.2.0 introduced OR. The AST is `QueryNode::Or(Vec<AndGroup>)` where
17//! each [`AndGroup`] is a conjunction of clauses. The SQL we emit
18//! parenthesizes each AND-group and joins them with ` OR `:
19//!
20//! ```sql
21//! WHERE (level = ? AND json_extract(fields, '$.service') = ?)
22//!    OR (level = ?)
23//! ```
24//!
25//! For queries with no OR (a single AND-group), the parens are still
26//! emitted — the alternative is a special-case branch in the SQL
27//! builder that adds maintenance cost without performance benefit.
28//! SQLite's planner ignores redundant parens.
29//!
30//! # Parenthesized groups (v0.3.0)
31//!
32//! `Clause::Group` wraps an inner `QueryNode::Or` subtree produced by a
33//! `(` … `)` expression in the query language. The translator recurses
34//! into the subtree via `translate_and_group` and parenthesizes the
35//! result so it composes correctly with surrounding AND/OR operators:
36//!
37//! ```sql
38//! -- (level=error OR level=warn) AND service=payments
39//! WHERE (((level = ?) OR (level = ?)) AND json_extract(fields, '$.service') = ?)
40//! ```
41//!
42//! The extra level of parentheses is redundant for correctness but keeps
43//! the emitter uniform — every AND-group is parenthesized, whether it
44//! came from the top-level OR or from an inner Group clause.
45//!
46//! # Pagination (v0.3.0)
47//!
48//! [`QueryOptions`] bundles `limit` and `offset` so callers can request a
49//! specific page of results without separate function variants. Offset
50//! without limit uses `LIMIT -1` — SQLite requires a `LIMIT` clause when
51//! `OFFSET` is present; `-1` means unlimited in SQLite.
52//!
53//! # Timestamp handling
54//!
55//! Timestamps are compared as TEXT, which works correctly for any ISO-8601
56//! format because those sort lexicographically in chronological order when
57//! all components are fixed-width. Ingested timestamps that aren't ISO-8601
58//! shaped will compare incorrectly against `last`/`since` bounds — a known
59//! limitation of accepting arbitrary timestamp strings at ingestion time.
60
61use chrono::{DateTime, NaiveDate, NaiveDateTime, TimeZone, Utc};
62use rusqlite::{Connection, params_from_iter, types::Value as SqlValue};
63use serde_json::{Map, Value};
64
65use crate::entry::LogEntry;
66use crate::error::{LogdiveError, Result};
67use crate::query::{AndGroup, Clause, Duration, QueryNode, QueryValue};
68
69// ---------------------------------------------------------------------------
70// QueryOptions
71// ---------------------------------------------------------------------------
72
73/// Options controlling result set size and starting position for [`execute`].
74///
75/// `limit = None` means unlimited rows. `offset = None` means start from
76/// the first result. When offset is set without a limit the SQL uses
77/// `LIMIT -1` — SQLite requires a `LIMIT` clause whenever `OFFSET` appears;
78/// `-1` is the SQLite convention for "no cap".
79#[derive(Debug, Clone, Copy, Default)]
80pub struct QueryOptions {
81    /// Maximum number of rows to return. `None` = unlimited.
82    pub limit: Option<usize>,
83    /// Number of rows to skip from the front of the ordered result set.
84    /// `None` (or `Some(0)`) starts from the first row.
85    pub offset: Option<usize>,
86}
87
88// ---------------------------------------------------------------------------
89// Public entry points
90// ---------------------------------------------------------------------------
91
92/// Execute a parsed query against the index and return matching entries.
93///
94/// Results are ordered by `timestamp DESC, id DESC` (newest first, with
95/// row id as stable tiebreaker for identical timestamps). Use
96/// [`QueryOptions`] to cap or page the result set.
97pub fn execute(query: &QueryNode, conn: &Connection, opts: QueryOptions) -> Result<Vec<LogEntry>> {
98    let (sql, binds) = build_sql(query, opts, Utc::now())?;
99    run(conn, &sql, &binds)
100}
101
102/// Variant of [`execute`] that uses a caller-supplied "now" value.
103///
104/// Exposed for testing so time-range clauses produce deterministic bounds.
105pub fn execute_at(
106    query: &QueryNode,
107    conn: &Connection,
108    opts: QueryOptions,
109    now: DateTime<Utc>,
110) -> Result<Vec<LogEntry>> {
111    let (sql, binds) = build_sql(query, opts, now)?;
112    run(conn, &sql, &binds)
113}
114
115// ---------------------------------------------------------------------------
116// SQL generation
117// ---------------------------------------------------------------------------
118
119/// Intermediate representation of a bindable value, kept as an owned
120/// `SqlValue` so `params_from_iter` can consume them without lifetime
121/// gymnastics.
122type Bind = SqlValue;
123
124fn build_sql(
125    query: &QueryNode,
126    opts: QueryOptions,
127    now: DateTime<Utc>,
128) -> Result<(String, Vec<Bind>)> {
129    let QueryNode::Or(groups) = query;
130
131    // The parser guarantees at least one AND-group, and each AND-group has
132    // at least one clause. Treat both invariants defensively here so a bug
133    // upstream produces a recognizable runtime shape rather than a SQL
134    // syntax error from an empty `WHERE` clause.
135    let mut group_sqls: Vec<String> = Vec::with_capacity(groups.len());
136    let mut binds: Vec<Bind> = Vec::new();
137
138    for group in groups {
139        let (group_sql, mut group_binds) = translate_and_group(group, now)?;
140        group_sqls.push(group_sql);
141        binds.append(&mut group_binds);
142    }
143
144    let where_sql = if group_sqls.is_empty() {
145        // Defensive: should be unreachable given the parser contract.
146        "1=1".to_string()
147    } else {
148        // ` OR ` between groups, each already parenthesized.
149        group_sqls.join(" OR ")
150    };
151
152    let mut sql = format!(
153        "SELECT timestamp, level, message, tag, fields, raw \
154         FROM log_entries \
155         WHERE {where_sql} \
156         ORDER BY timestamp DESC, id DESC"
157    );
158
159    // SQLite requires LIMIT to be present when OFFSET is used.
160    // Emit `LIMIT -1` (unlimited) when the caller wants offset-only paging.
161    match (opts.limit, opts.offset) {
162        (Some(lim), Some(off)) if off > 0 => {
163            sql.push_str(&format!(" LIMIT {lim} OFFSET {off}"));
164        }
165        (Some(lim), _) => {
166            sql.push_str(&format!(" LIMIT {lim}"));
167        }
168        (None, Some(off)) if off > 0 => {
169            sql.push_str(&format!(" LIMIT -1 OFFSET {off}"));
170        }
171        _ => {}
172    }
173
174    Ok((sql, binds))
175}
176
177/// Translate one AND-group into a parenthesized SQL fragment and the
178/// associated bind values, in clause-declaration order.
179///
180/// Always parenthesizes — including the single-clause case. Uniformity in
181/// the SQL emitter outweighs prettiness in the rare query-debugger view.
182fn translate_and_group(group: &AndGroup, now: DateTime<Utc>) -> Result<(String, Vec<Bind>)> {
183    let mut clause_sqls: Vec<String> = Vec::with_capacity(group.clauses.len());
184    let mut binds: Vec<Bind> = Vec::new();
185
186    for clause in &group.clauses {
187        let (sql, mut clause_binds) = translate_clause(clause, now)?;
188        clause_sqls.push(sql);
189        binds.append(&mut clause_binds);
190    }
191
192    let inner = if clause_sqls.is_empty() {
193        // Defensive: parser guarantees non-empty AND-groups.
194        "1=1".to_string()
195    } else {
196        clause_sqls.join(" AND ")
197    };
198    Ok((format!("({inner})"), binds))
199}
200
201fn translate_clause(clause: &Clause, now: DateTime<Utc>) -> Result<(String, Vec<Bind>)> {
202    match clause {
203        Clause::Compare { field, op, value } => {
204            // Route `level` through lower() so queries hit the idx_level_norm
205            // expression index and match case-insensitively (ERROR == error).
206            let (column_expr, bind) = if field == "level" {
207                let lowered = match value {
208                    QueryValue::String(s) => SqlValue::Text(s.to_lowercase()),
209                    other => value_to_bind(other),
210                };
211                ("lower(level)".to_string(), lowered)
212            } else {
213                (column_for_field(field)?, value_to_bind(value))
214            };
215            let sql = format!("{column_expr} {op} ?");
216            Ok((sql, vec![bind]))
217        }
218        Clause::Contains { field, value } => {
219            // Escape SQL LIKE metacharacters (%, _, \) so a user searching
220            // for a literal '%' doesn't accidentally wildcard the world.
221            // For `level`, lowercase the pattern so it matches the lower()
222            // column expression used in the index.
223            let (column_expr, normalised_value) = if field == "level" {
224                ("lower(level)".to_string(), value.to_lowercase())
225            } else {
226                (column_for_field(field)?, value.clone())
227            };
228            let escaped = escape_like(&normalised_value);
229            let pattern = format!("%{escaped}%");
230            let sql = format!("{column_expr} LIKE ? ESCAPE '\\'");
231            Ok((sql, vec![SqlValue::Text(pattern)]))
232        }
233        Clause::LastDuration(d) => {
234            let cutoff = compute_last_cutoff(*d, now);
235            Ok((
236                "timestamp >= ?".to_string(),
237                vec![SqlValue::Text(cutoff.to_rfc3339())],
238            ))
239        }
240        Clause::SinceDatetime(s) => {
241            let dt = parse_datetime(s)?;
242            Ok((
243                "timestamp >= ?".to_string(),
244                vec![SqlValue::Text(dt.to_rfc3339())],
245            ))
246        }
247        Clause::Group(inner) => {
248            // Recurse into the parenthesized subexpression. Each inner
249            // AND-group is already parenthesized by `translate_and_group`;
250            // multiple groups are joined with ` OR ` inside an extra pair
251            // of parens so the whole group composes correctly with the
252            // surrounding AND expression.
253            let QueryNode::Or(groups) = inner.as_ref();
254            let mut group_sqls: Vec<String> = Vec::with_capacity(groups.len());
255            let mut binds: Vec<Bind> = Vec::new();
256            for group in groups {
257                let (gsql, mut gbinds) = translate_and_group(group, now)?;
258                group_sqls.push(gsql);
259                binds.append(&mut gbinds);
260            }
261            let sql = if group_sqls.len() == 1 {
262                group_sqls.into_iter().next().unwrap()
263            } else {
264                format!("({})", group_sqls.join(" OR "))
265            };
266            Ok((sql, binds))
267        }
268    }
269}
270
271/// Return the SQL expression that references a given query field.
272///
273/// Known fields resolve to indexed columns. Unknown fields resolve to a
274/// `json_extract(fields, '$.<field>')` expression — which is why the
275/// field name must survive `validate_field_name`'s regex *and* the
276/// defensive check here.
277fn column_for_field(field: &str) -> Result<String> {
278    if LogEntry::KNOWN_KEYS.contains(&field) {
279        Ok(field.to_string())
280    } else {
281        if !is_safe_json_path_segment(field) {
282            return Err(LogdiveError::UnsafeFieldName(field.to_string()));
283        }
284        Ok(format!("json_extract(fields, '$.{field}')"))
285    }
286}
287
288/// Defensive: the parser's `validate_field_name` already enforces this,
289/// but we re-check at the SQL boundary so the trust model is obvious
290/// from inside this module. Allowed: letters, digits, `_`, `.`.
291fn is_safe_json_path_segment(s: &str) -> bool {
292    !s.is_empty()
293        && s.chars()
294            .next()
295            .map(|c| c.is_ascii_alphabetic() || c == '_')
296            .unwrap_or(false)
297        && s.chars()
298            .all(|c| c.is_ascii_alphanumeric() || c == '_' || c == '.')
299}
300
301fn value_to_bind(v: &QueryValue) -> Bind {
302    match v {
303        QueryValue::String(s) => SqlValue::Text(s.clone()),
304        QueryValue::Integer(n) => SqlValue::Integer(*n),
305        QueryValue::Float(f) => SqlValue::Real(*f),
306        QueryValue::Bool(b) => SqlValue::Integer(if *b { 1 } else { 0 }),
307    }
308}
309
310/// Pre-escape SQL LIKE wildcards (`%`, `_`) and the escape character
311/// itself so a user's literal CONTAINS string is matched literally.
312fn escape_like(input: &str) -> String {
313    let mut out = String::with_capacity(input.len());
314    for ch in input.chars() {
315        match ch {
316            '\\' | '%' | '_' => {
317                out.push('\\');
318                out.push(ch);
319            }
320            _ => out.push(ch),
321        }
322    }
323    out
324}
325
326fn compute_last_cutoff(d: Duration, now: DateTime<Utc>) -> DateTime<Utc> {
327    // `amount` is u64; promote to i64 for chrono. Saturate on the
328    // (astronomically unlikely) overflow case.
329    let amount_i64 = i64::try_from(d.amount).unwrap_or(i64::MAX);
330    let secs = amount_i64.saturating_mul(d.unit.seconds());
331    let delta = chrono::Duration::seconds(secs);
332    now.checked_sub_signed(delta).unwrap_or_else(|| {
333        Utc.timestamp_opt(0, 0)
334            .single()
335            .expect("unix epoch is valid")
336    })
337}
338
339/// Accept three datetime formats for `since` clauses:
340///   - RFC3339 / ISO-8601 with timezone (e.g. `2024-01-01T10:00:00Z`)
341///   - ISO naive datetime (e.g. `2024-01-01 10:00:00` or `2024-01-01T10:00:00`), interpreted as UTC
342///   - ISO date (e.g. `2024-01-01`), interpreted as UTC midnight
343fn parse_datetime(s: &str) -> Result<DateTime<Utc>> {
344    if let Ok(dt) = DateTime::parse_from_rfc3339(s) {
345        return Ok(dt.with_timezone(&Utc));
346    }
347    for fmt in &["%Y-%m-%dT%H:%M:%S", "%Y-%m-%d %H:%M:%S"] {
348        if let Ok(ndt) = NaiveDateTime::parse_from_str(s, fmt) {
349            return Ok(Utc.from_utc_datetime(&ndt));
350        }
351    }
352    if let Ok(nd) = NaiveDate::parse_from_str(s, "%Y-%m-%d") {
353        let ndt = nd.and_hms_opt(0, 0, 0).expect("00:00:00 is valid");
354        return Ok(Utc.from_utc_datetime(&ndt));
355    }
356    Err(LogdiveError::InvalidDatetime {
357        input: s.to_string(),
358        reason: "expected RFC3339, `YYYY-MM-DD HH:MM:SS`, or `YYYY-MM-DD`".to_string(),
359    })
360}
361
362// ---------------------------------------------------------------------------
363// Execution
364// ---------------------------------------------------------------------------
365
366fn run(conn: &Connection, sql: &str, binds: &[Bind]) -> Result<Vec<LogEntry>> {
367    let mut stmt = conn.prepare(sql)?;
368    let rows = stmt.query_map(params_from_iter(binds.iter()), |row| {
369        let timestamp: Option<String> = row.get(0)?;
370        let level: Option<String> = row.get(1)?;
371        let message: Option<String> = row.get(2)?;
372        let tag: Option<String> = row.get(3)?;
373        let fields_json: String = row.get(4)?;
374        let raw: String = row.get(5)?;
375        // We tunnel the raw JSON out; deserialization happens below so the
376        // closure's error type stays `rusqlite::Error`.
377        Ok((timestamp, level, message, tag, fields_json, raw))
378    })?;
379
380    let mut out = Vec::new();
381    for row in rows {
382        let (timestamp, level, message, tag, fields_json, raw) = row?;
383        let fields: Map<String, Value> =
384            serde_json::from_str(&fields_json).map_err(LogdiveError::CorruptFieldsJson)?;
385        out.push(LogEntry {
386            timestamp,
387            level,
388            message,
389            tag,
390            fields,
391            raw,
392        });
393    }
394    Ok(out)
395}
396
397// ---------------------------------------------------------------------------
398// Tests
399// ---------------------------------------------------------------------------
400
401#[cfg(test)]
402mod tests {
403    use super::*;
404    use crate::indexer::Indexer;
405    use crate::query::parse;
406    use std::collections::HashSet;
407
408    /// Convenience: parse a query string and run it against the given
409    /// connection. Panics if parsing fails — tests pass well-formed input.
410    fn run_query(conn: &Connection, q: &str) -> Vec<LogEntry> {
411        let ast = parse(q).expect("test queries are well-formed");
412        execute(&ast, conn, QueryOptions::default()).expect("execute")
413    }
414
415    fn run_query_opts(conn: &Connection, q: &str, opts: QueryOptions) -> Vec<LogEntry> {
416        let ast = parse(q).expect("test queries are well-formed");
417        execute(&ast, conn, opts).expect("execute")
418    }
419
420    fn run_query_at(conn: &Connection, q: &str, now: DateTime<Utc>) -> Vec<LogEntry> {
421        let ast = parse(q).expect("test queries are well-formed");
422        execute_at(&ast, conn, QueryOptions::default(), now).expect("execute")
423    }
424
425    fn make_entry(ts: &str, level: &str, message: &str) -> LogEntry {
426        let raw = format!(r#"{{"timestamp":"{ts}","level":"{level}","message":"{message}"}}"#);
427        let mut e = LogEntry::new(raw);
428        e.timestamp = Some(ts.to_string());
429        e.level = Some(level.to_string());
430        e.message = Some(message.to_string());
431        e
432    }
433
434    fn fixture() -> Indexer {
435        let mut idx = Indexer::open_in_memory().unwrap();
436        let mut a = make_entry("2026-04-20T10:00:00Z", "error", "payment failed");
437        a.tag = Some("api".into());
438        a.fields
439            .insert("service".into(), Value::String("payments".into()));
440        a.fields.insert("req_id".into(), Value::from(100));
441
442        let mut b = make_entry("2026-04-20T11:00:00Z", "info", "health check");
443        b.tag = Some("api".into());
444        b.fields
445            .insert("service".into(), Value::String("payments".into()));
446        b.fields.insert("req_id".into(), Value::from(200));
447
448        let mut c = make_entry("2026-04-20T12:00:00Z", "error", "timeout on db call");
449        c.fields
450            .insert("service".into(), Value::String("users".into()));
451        c.fields.insert("req_id".into(), Value::from(300));
452
453        idx.insert_batch(&[a, b, c]).unwrap();
454        idx
455    }
456
457    /// Three-row fixture covering levels error/warn/info across two services.
458    /// Used by OR-specific round-trip tests where we want at least one row
459    /// per level to exercise multi-group disjunction.
460    fn three_level_fixture() -> Indexer {
461        let mut idx = Indexer::open_in_memory().unwrap();
462        let a = make_entry("2026-04-20T09:00:00Z", "error", "boom");
463        let b = make_entry("2026-04-20T10:00:00Z", "warn", "slow query");
464        let c = make_entry("2026-04-20T11:00:00Z", "info", "ok");
465        idx.insert_batch(&[a, b, c]).unwrap();
466        idx
467    }
468
469    // -----------------------------------------------------------------
470    // SQL generation (inspection) — single AND-group cases (no OR)
471    // -----------------------------------------------------------------
472
473    #[test]
474    fn compare_on_known_field_binds_value_not_interpolates() {
475        let ast = parse("level=error").unwrap();
476        let (sql, binds) = build_sql(&ast, QueryOptions::default(), Utc::now()).unwrap();
477        // level queries route through lower() for case-insensitive matching.
478        // Always-parenthesized invariant: even single-clause queries are wrapped.
479        assert!(sql.contains("WHERE (lower(level) = ?)"));
480        assert!(!sql.contains("error"));
481        assert_eq!(binds.len(), 1);
482        match &binds[0] {
483            SqlValue::Text(s) => assert_eq!(s, "error"),
484            other => panic!("expected text bind, got {other:?}"),
485        }
486    }
487
488    #[test]
489    fn compare_on_unknown_field_uses_json_extract() {
490        let ast = parse("service=payments").unwrap();
491        let (sql, binds) = build_sql(&ast, QueryOptions::default(), Utc::now()).unwrap();
492        assert!(sql.contains("json_extract(fields, '$.service')"));
493        assert_eq!(binds.len(), 1);
494    }
495
496    #[test]
497    fn contains_uses_like_with_escape_and_wildcards() {
498        let ast = parse(r#"message contains "timeout""#).unwrap();
499        let (sql, binds) = build_sql(&ast, QueryOptions::default(), Utc::now()).unwrap();
500        assert!(sql.contains("LIKE ? ESCAPE '\\'"));
501        match &binds[0] {
502            SqlValue::Text(s) => assert_eq!(s, "%timeout%"),
503            other => panic!("expected text bind, got {other:?}"),
504        }
505    }
506
507    #[test]
508    fn contains_escapes_like_metacharacters() {
509        let ast = parse(r#"message contains "50%""#).unwrap();
510        let (_, binds) = build_sql(&ast, QueryOptions::default(), Utc::now()).unwrap();
511        match &binds[0] {
512            SqlValue::Text(s) => assert_eq!(s, r"%50\%%"),
513            other => panic!("unexpected bind: {other:?}"),
514        }
515    }
516
517    #[test]
518    fn last_duration_produces_timestamp_lower_bound() {
519        let ast = parse("last 2h").unwrap();
520        let now = Utc.with_ymd_and_hms(2026, 4, 20, 12, 0, 0).unwrap();
521        let (sql, binds) = build_sql(&ast, QueryOptions::default(), now).unwrap();
522        assert!(sql.contains("timestamp >= ?"));
523        match &binds[0] {
524            SqlValue::Text(s) => assert!(s.starts_with("2026-04-20T10:00:00")),
525            other => panic!("unexpected bind: {other:?}"),
526        }
527    }
528
529    #[test]
530    fn since_accepts_rfc3339() {
531        let ast = parse(r#"since "2024-01-01T10:00:00Z""#).unwrap();
532        let (sql, binds) = build_sql(&ast, QueryOptions::default(), Utc::now()).unwrap();
533        assert!(sql.contains("timestamp >= ?"));
534        match &binds[0] {
535            SqlValue::Text(s) => assert!(s.starts_with("2024-01-01T10:00:00")),
536            other => panic!("unexpected: {other:?}"),
537        }
538    }
539
540    #[test]
541    fn since_accepts_bare_date() {
542        let ast = parse("since 2024-06-15").unwrap();
543        let (_, binds) = build_sql(&ast, QueryOptions::default(), Utc::now()).unwrap();
544        match &binds[0] {
545            SqlValue::Text(s) => assert!(s.starts_with("2024-06-15T00:00:00")),
546            other => panic!("unexpected: {other:?}"),
547        }
548    }
549
550    #[test]
551    fn since_rejects_garbage() {
552        let ast = parse("since not-a-date").unwrap();
553        let err = build_sql(&ast, QueryOptions::default(), Utc::now()).unwrap_err();
554        assert!(matches!(err, LogdiveError::InvalidDatetime { .. }));
555    }
556
557    #[test]
558    fn and_chain_joins_with_and_inside_a_single_group() {
559        let ast = parse("level=error AND service=payments").unwrap();
560        let (sql, binds) = build_sql(&ast, QueryOptions::default(), Utc::now()).unwrap();
561        assert!(sql.contains("lower(level) = ?"));
562        assert!(sql.contains("json_extract(fields, '$.service') = ?"));
563        // AND inside a single parenthesized group, no OR.
564        assert!(sql.contains(" AND "));
565        assert!(!sql.contains(" OR "));
566        // Always-parenthesized invariant.
567        assert!(sql.contains("WHERE (lower(level) = ? AND json_extract(fields, '$.service') = ?)"));
568        assert_eq!(binds.len(), 2);
569        match (&binds[0], &binds[1]) {
570            (SqlValue::Text(a), SqlValue::Text(b)) => {
571                assert_eq!(a, "error");
572                assert_eq!(b, "payments");
573            }
574            other => panic!("unexpected binds: {other:?}"),
575        }
576    }
577
578    #[test]
579    fn integer_binds_as_integer_not_text() {
580        let ast = parse("req_id > 100").unwrap();
581        let (_, binds) = build_sql(&ast, QueryOptions::default(), Utc::now()).unwrap();
582        match &binds[0] {
583            SqlValue::Integer(n) => assert_eq!(*n, 100),
584            other => panic!("expected integer bind, got {other:?}"),
585        }
586    }
587
588    #[test]
589    fn bool_binds_as_integer_zero_or_one() {
590        let ast = parse("ok=true").unwrap();
591        let (_, binds) = build_sql(&ast, QueryOptions::default(), Utc::now()).unwrap();
592        assert!(matches!(binds[0], SqlValue::Integer(1)));
593
594        let ast = parse("ok=false").unwrap();
595        let (_, binds) = build_sql(&ast, QueryOptions::default(), Utc::now()).unwrap();
596        assert!(matches!(binds[0], SqlValue::Integer(0)));
597    }
598
599    #[test]
600    fn float_binds_as_real() {
601        let ast = parse("duration < 1.5").unwrap();
602        let (_, binds) = build_sql(&ast, QueryOptions::default(), Utc::now()).unwrap();
603        match &binds[0] {
604            SqlValue::Real(f) => assert!((f - 1.5).abs() < 1e-9),
605            other => panic!("expected real bind, got {other:?}"),
606        }
607    }
608
609    // -----------------------------------------------------------------
610    // SQL generation — pagination (new in v0.3.0)
611    // -----------------------------------------------------------------
612
613    #[test]
614    fn limit_only_appends_limit_clause() {
615        let ast = parse("level=error").unwrap();
616        let opts = QueryOptions {
617            limit: Some(50),
618            offset: None,
619        };
620        let (sql, _) = build_sql(&ast, opts, Utc::now()).unwrap();
621        assert!(sql.ends_with("LIMIT 50"), "sql: {sql}");
622    }
623
624    #[test]
625    fn offset_zero_treated_as_absent_no_suffix() {
626        let ast = parse("level=error").unwrap();
627        let opts = QueryOptions {
628            limit: None,
629            offset: Some(0),
630        };
631        let (sql, _) = build_sql(&ast, opts, Utc::now()).unwrap();
632        assert!(
633            sql.ends_with("DESC"),
634            "offset=0 must not append any suffix, sql: {sql}"
635        );
636    }
637
638    #[test]
639    fn offset_without_limit_uses_limit_neg1() {
640        let ast = parse("level=error").unwrap();
641        let opts = QueryOptions {
642            limit: None,
643            offset: Some(10),
644        };
645        let (sql, _) = build_sql(&ast, opts, Utc::now()).unwrap();
646        assert!(sql.ends_with("LIMIT -1 OFFSET 10"), "sql: {sql}");
647    }
648
649    #[test]
650    fn limit_and_offset_both_appended() {
651        let ast = parse("level=error").unwrap();
652        let opts = QueryOptions {
653            limit: Some(25),
654            offset: Some(50),
655        };
656        let (sql, _) = build_sql(&ast, opts, Utc::now()).unwrap();
657        assert!(sql.ends_with("LIMIT 25 OFFSET 50"), "sql: {sql}");
658    }
659
660    #[test]
661    fn no_limit_no_offset_produces_no_suffix() {
662        let ast = parse("level=error").unwrap();
663        let (sql, _) = build_sql(&ast, QueryOptions::default(), Utc::now()).unwrap();
664        assert!(sql.ends_with("DESC"), "sql: {sql}");
665    }
666
667    // -----------------------------------------------------------------
668    // SQL generation — OR cases (new in v0.2.0)
669    // -----------------------------------------------------------------
670
671    #[test]
672    fn or_emits_two_parenthesized_groups_joined_by_or() {
673        let ast = parse("level=error OR level=warn").unwrap();
674        let (sql, binds) = build_sql(&ast, QueryOptions::default(), Utc::now()).unwrap();
675        // Two parenthesized AND-groups joined by ` OR `.
676        assert!(sql.contains("WHERE (lower(level) = ?) OR (lower(level) = ?)"));
677        // Bind order matches clause order across the OR boundary.
678        match (&binds[0], &binds[1]) {
679            (SqlValue::Text(a), SqlValue::Text(b)) => {
680                assert_eq!(a, "error");
681                assert_eq!(b, "warn");
682            }
683            other => panic!("unexpected binds: {other:?}"),
684        }
685    }
686
687    #[test]
688    fn or_with_three_groups_joins_with_two_or_keywords() {
689        let ast = parse("level=error OR level=warn OR level=fatal").unwrap();
690        let (sql, _binds) = build_sql(&ast, QueryOptions::default(), Utc::now()).unwrap();
691        // Exactly two ` OR ` separators between three groups.
692        assert_eq!(sql.matches(" OR ").count(), 2);
693        // All three values appear bound (we rely on the matches above
694        // for the count; spot-check shape here).
695        assert!(sql.contains("(lower(level) = ?) OR (lower(level) = ?) OR (lower(level) = ?)"));
696    }
697
698    #[test]
699    fn or_with_and_inside_each_group_emits_correct_shape() {
700        // (level=error AND service=payments) OR (level=warn)
701        let ast = parse("level=error AND service=payments OR level=warn").unwrap();
702        let (sql, binds) = build_sql(&ast, QueryOptions::default(), Utc::now()).unwrap();
703        assert!(sql.contains(
704            "WHERE (lower(level) = ? AND json_extract(fields, '$.service') = ?) \
705             OR (lower(level) = ?)"
706        ));
707        // Bind order: error, payments, warn — preserved across OR.
708        assert_eq!(binds.len(), 3);
709        match (&binds[0], &binds[1], &binds[2]) {
710            (SqlValue::Text(a), SqlValue::Text(b), SqlValue::Text(c)) => {
711                assert_eq!(a, "error");
712                assert_eq!(b, "payments");
713                assert_eq!(c, "warn");
714            }
715            other => panic!("unexpected binds: {other:?}"),
716        }
717    }
718
719    #[test]
720    fn or_with_and_on_both_sides_preserves_bind_ordering() {
721        // (a=1 AND b=2) OR (c=3 AND d=4) — exercises bind ordering across
722        // OR boundary with mixed integer values.
723        let ast = parse("a=1 AND b=2 OR c=3 AND d=4").unwrap();
724        let (sql, binds) = build_sql(&ast, QueryOptions::default(), Utc::now()).unwrap();
725        assert_eq!(sql.matches(" OR ").count(), 1);
726        assert_eq!(binds.len(), 4);
727        match (&binds[0], &binds[1], &binds[2], &binds[3]) {
728            (
729                SqlValue::Integer(a),
730                SqlValue::Integer(b),
731                SqlValue::Integer(c),
732                SqlValue::Integer(d),
733            ) => {
734                assert_eq!(*a, 1);
735                assert_eq!(*b, 2);
736                assert_eq!(*c, 3);
737                assert_eq!(*d, 4);
738            }
739            other => panic!("unexpected binds: {other:?}"),
740        }
741    }
742
743    #[test]
744    fn or_with_mixed_clause_kinds_in_each_group() {
745        // level=error OR message contains "timeout" OR last 30m
746        // Three groups, each with a different kind of clause.
747        let ast = parse(r#"level=error OR message contains "timeout" OR last 30m"#).unwrap();
748        let now = Utc.with_ymd_and_hms(2026, 4, 20, 12, 0, 0).unwrap();
749        let (sql, binds) = build_sql(&ast, QueryOptions::default(), now).unwrap();
750        assert_eq!(sql.matches(" OR ").count(), 2);
751        assert_eq!(binds.len(), 3);
752
753        // Group 1: text bind for the level value.
754        assert!(matches!(&binds[0], SqlValue::Text(s) if s == "error"));
755        // Group 2: text bind with %timeout% LIKE pattern.
756        assert!(matches!(&binds[1], SqlValue::Text(s) if s == "%timeout%"));
757        // Group 3: text bind with the cutoff timestamp.
758        match &binds[2] {
759            SqlValue::Text(s) => assert!(s.starts_with("2026-04-20T11:30:00")),
760            other => panic!("expected text cutoff, got {other:?}"),
761        }
762    }
763
764    #[test]
765    fn limit_applies_to_or_query_too() {
766        let ast = parse("level=error OR level=warn").unwrap();
767        let opts = QueryOptions {
768            limit: Some(25),
769            offset: None,
770        };
771        let (sql, _) = build_sql(&ast, opts, Utc::now()).unwrap();
772        assert!(sql.ends_with("LIMIT 25"));
773        // LIMIT is outside the WHERE clause.
774        assert!(sql.contains(" ORDER BY "));
775    }
776
777    // -----------------------------------------------------------------
778    // SQL generation — parenthesized group cases (new in v0.3.0)
779    // -----------------------------------------------------------------
780
781    #[test]
782    fn paren_single_clause_sql_shape_and_binds() {
783        // `(level=error)` acquires one extra level of parens vs the plain form
784        // because the Group clause is itself parenthesized by translate_and_group.
785        // Both forms are semantically equivalent; binds must be identical.
786        let paren_ast = parse("(level=error)").unwrap();
787        let plain_ast = parse("level=error").unwrap();
788        let (paren_sql, paren_binds) =
789            build_sql(&paren_ast, QueryOptions::default(), Utc::now()).unwrap();
790        let (plain_sql, plain_binds) =
791            build_sql(&plain_ast, QueryOptions::default(), Utc::now()).unwrap();
792        assert!(
793            paren_sql.contains("WHERE ((lower(level) = ?))"),
794            "paren form: {paren_sql}"
795        );
796        assert!(
797            plain_sql.contains("WHERE (lower(level) = ?)"),
798            "plain form: {plain_sql}"
799        );
800        assert_eq!(paren_binds, plain_binds);
801    }
802
803    #[test]
804    fn paren_or_inside_and_emits_nested_parens() {
805        // `(level=error OR level=warn) AND service=payments`
806        // Expected WHERE shape:
807        // `(((lower(level) = ?) OR (lower(level) = ?)) AND json_extract(fields, '$.service') = ?)`
808        let ast = parse("(level=error OR level=warn) AND service=payments").unwrap();
809        let (sql, binds) = build_sql(&ast, QueryOptions::default(), Utc::now()).unwrap();
810        // The inner OR group must be parenthesized inside the outer AND group.
811        assert!(sql.contains("((lower(level) = ?) OR (lower(level) = ?))"));
812        assert!(sql.contains("json_extract(fields, '$.service') = ?"));
813        assert_eq!(binds.len(), 3);
814        match (&binds[0], &binds[1], &binds[2]) {
815            (SqlValue::Text(a), SqlValue::Text(b), SqlValue::Text(c)) => {
816                assert_eq!(a, "error");
817                assert_eq!(b, "warn");
818                assert_eq!(c, "payments");
819            }
820            other => panic!("unexpected binds: {other:?}"),
821        }
822    }
823
824    #[test]
825    fn paren_group_binds_are_in_clause_order() {
826        // Bind values inside the paren group must precede the outer AND clause.
827        let ast = parse("(a=1 OR b=2) AND c=3").unwrap();
828        let (_, binds) = build_sql(&ast, QueryOptions::default(), Utc::now()).unwrap();
829        assert_eq!(binds.len(), 3);
830        match (&binds[0], &binds[1], &binds[2]) {
831            (SqlValue::Integer(a), SqlValue::Integer(b), SqlValue::Integer(c)) => {
832                assert_eq!(*a, 1);
833                assert_eq!(*b, 2);
834                assert_eq!(*c, 3);
835            }
836            other => panic!("unexpected binds: {other:?}"),
837        }
838    }
839
840    // -----------------------------------------------------------------
841    // Round-trip: insert, query (without OR), assert results
842    // -----------------------------------------------------------------
843
844    #[test]
845    fn round_trip_known_field_equality() {
846        let idx = fixture();
847        let rows = run_query(idx.connection(), "level=error");
848        assert_eq!(rows.len(), 2);
849        assert!(rows.iter().all(|e| e.level.as_deref() == Some("error")));
850    }
851
852    #[test]
853    fn round_trip_unknown_field_via_json_extract() {
854        let idx = fixture();
855        let rows = run_query(idx.connection(), "service=payments");
856        assert_eq!(rows.len(), 2);
857        assert!(
858            rows.iter()
859                .all(|e| e.fields.get("service") == Some(&Value::String("payments".into())))
860        );
861    }
862
863    #[test]
864    fn round_trip_and_chain() {
865        let idx = fixture();
866        let rows = run_query(idx.connection(), "level=error AND service=payments");
867        assert_eq!(rows.len(), 1);
868        assert_eq!(rows[0].message.as_deref(), Some("payment failed"));
869    }
870
871    #[test]
872    fn round_trip_contains_substring_match() {
873        let idx = fixture();
874        let rows = run_query(idx.connection(), r#"message contains "timeout""#);
875        assert_eq!(rows.len(), 1);
876        assert!(rows[0].message.as_deref().unwrap().contains("timeout"));
877    }
878
879    #[test]
880    fn round_trip_numeric_comparison_on_json_field() {
881        let idx = fixture();
882        let rows = run_query(idx.connection(), "req_id > 150");
883        assert_eq!(rows.len(), 2);
884        let ids: HashSet<i64> = rows
885            .iter()
886            .map(|e| e.fields.get("req_id").and_then(|v| v.as_i64()).unwrap())
887            .collect();
888        assert_eq!(ids, HashSet::from([200, 300]));
889    }
890
891    #[test]
892    fn round_trip_last_duration_uses_now() {
893        let idx = fixture();
894        let now = Utc.with_ymd_and_hms(2026, 4, 20, 13, 0, 0).unwrap();
895        let rows = run_query_at(idx.connection(), "last 3h", now);
896        assert_eq!(rows.len(), 3);
897
898        let rows = run_query_at(idx.connection(), "last 70m", now);
899        assert_eq!(rows.len(), 1);
900        assert_eq!(rows[0].timestamp.as_deref(), Some("2026-04-20T12:00:00Z"));
901    }
902
903    #[test]
904    fn round_trip_since_datetime() {
905        let idx = fixture();
906        let rows = run_query(idx.connection(), "since 2026-04-20T11:00:00Z");
907        assert_eq!(rows.len(), 2);
908    }
909
910    #[test]
911    fn round_trip_results_ordered_newest_first() {
912        let idx = fixture();
913        let rows = run_query(idx.connection(), "level=error");
914        assert!(rows[0].timestamp > rows[1].timestamp);
915    }
916
917    #[test]
918    fn round_trip_not_equal_operator() {
919        let idx = fixture();
920        let rows = run_query(idx.connection(), "level!=error");
921        assert_eq!(rows.len(), 1);
922        assert_eq!(rows[0].level.as_deref(), Some("info"));
923    }
924
925    #[test]
926    fn round_trip_contains_with_wildcard_character_is_literal() {
927        let mut idx = Indexer::open_in_memory().unwrap();
928        let a = make_entry("2026-04-20T10:00:00Z", "info", "discount 50% today");
929        let b = make_entry("2026-04-20T11:00:00Z", "info", "no special char here");
930        idx.insert_batch(&[a, b]).unwrap();
931
932        let rows = run_query(idx.connection(), r#"message contains "50%""#);
933        assert_eq!(rows.len(), 1);
934        assert!(rows[0].message.as_deref().unwrap().contains("50%"));
935    }
936
937    #[test]
938    fn round_trip_empty_result_is_empty_vec_not_error() {
939        let idx = fixture();
940        let rows = run_query(idx.connection(), "level=nonsense");
941        assert!(rows.is_empty());
942    }
943
944    #[test]
945    fn round_trip_level_compare_is_case_insensitive() {
946        // Fixture stores "error" (lowercase). Query with "ERROR" must still match.
947        let idx = fixture();
948        let rows_upper = run_query(idx.connection(), "level=ERROR");
949        let rows_mixed = run_query(idx.connection(), "level=Error");
950        let rows_lower = run_query(idx.connection(), "level=error");
951        assert_eq!(rows_upper.len(), rows_lower.len());
952        assert_eq!(rows_mixed.len(), rows_lower.len());
953        assert!(
954            rows_lower
955                .iter()
956                .all(|e| e.level.as_deref() == Some("error"))
957        );
958    }
959
960    #[test]
961    fn round_trip_level_contains_is_case_insensitive() {
962        // "ERR" must match rows whose stored level is "error".
963        let idx = fixture();
964        let rows = run_query(idx.connection(), r#"level contains "ERR""#);
965        assert!(!rows.is_empty());
966        assert!(rows.iter().all(|e| e.level.as_deref() == Some("error")));
967    }
968
969    #[test]
970    fn round_trip_reconstructs_fields_map() {
971        let idx = fixture();
972        let rows = run_query(idx.connection(), "level=error AND service=payments");
973        assert_eq!(rows.len(), 1);
974        let e = &rows[0];
975        assert_eq!(
976            e.fields.get("service"),
977            Some(&Value::String("payments".into()))
978        );
979        assert_eq!(e.fields.get("req_id").and_then(|v| v.as_i64()), Some(100));
980    }
981
982    // -----------------------------------------------------------------
983    // Round-trip: pagination (new in v0.3.0)
984    // -----------------------------------------------------------------
985
986    #[test]
987    fn round_trip_limit_caps_result_count() {
988        // Fixture has 3 rows. limit=2 must return exactly 2.
989        let idx = fixture();
990        let rows = run_query_opts(
991            idx.connection(),
992            "level=error OR level=info",
993            QueryOptions {
994                limit: Some(2),
995                offset: None,
996            },
997        );
998        assert_eq!(rows.len(), 2);
999    }
1000
1001    #[test]
1002    fn round_trip_offset_skips_leading_rows() {
1003        // Fixture rows ordered newest-first: 12:00 (error), 11:00 (info), 10:00 (error).
1004        // offset=1 must skip the 12:00 row and return the next two.
1005        let idx = fixture();
1006        let all = run_query(idx.connection(), "level=error OR level=info");
1007        assert_eq!(all.len(), 3);
1008
1009        let paged = run_query_opts(
1010            idx.connection(),
1011            "level=error OR level=info",
1012            QueryOptions {
1013                limit: None,
1014                offset: Some(1),
1015            },
1016        );
1017        assert_eq!(paged.len(), 2);
1018        // The skipped row is the newest one.
1019        assert_eq!(paged[0].timestamp, all[1].timestamp);
1020        assert_eq!(paged[1].timestamp, all[2].timestamp);
1021    }
1022
1023    #[test]
1024    fn round_trip_limit_and_offset_returns_page() {
1025        // Fixture rows (newest-first): 12:00, 11:00, 10:00.
1026        // limit=1 offset=1 should return only the 11:00 row.
1027        let idx = fixture();
1028        let rows = run_query_opts(
1029            idx.connection(),
1030            "level=error OR level=info",
1031            QueryOptions {
1032                limit: Some(1),
1033                offset: Some(1),
1034            },
1035        );
1036        assert_eq!(rows.len(), 1);
1037        assert_eq!(rows[0].timestamp.as_deref(), Some("2026-04-20T11:00:00Z"));
1038    }
1039
1040    #[test]
1041    fn round_trip_offset_beyond_result_set_returns_empty() {
1042        let idx = fixture();
1043        let rows = run_query_opts(
1044            idx.connection(),
1045            "level=error OR level=info",
1046            QueryOptions {
1047                limit: None,
1048                offset: Some(100),
1049            },
1050        );
1051        assert!(rows.is_empty());
1052    }
1053
1054    // -----------------------------------------------------------------
1055    // Round-trip: OR queries (new in v0.2.0)
1056    // -----------------------------------------------------------------
1057
1058    #[test]
1059    fn round_trip_or_two_groups_returns_union() {
1060        let idx = three_level_fixture();
1061        let rows = run_query(idx.connection(), "level=error OR level=warn");
1062        assert_eq!(rows.len(), 2);
1063
1064        let levels: HashSet<String> = rows
1065            .iter()
1066            .map(|e| e.level.clone().unwrap_or_default())
1067            .collect();
1068        assert_eq!(
1069            levels,
1070            HashSet::from(["error".to_string(), "warn".to_string()])
1071        );
1072    }
1073
1074    #[test]
1075    fn round_trip_or_three_groups_returns_full_union() {
1076        let idx = three_level_fixture();
1077        let rows = run_query(idx.connection(), "level=error OR level=warn OR level=info");
1078        assert_eq!(rows.len(), 3);
1079    }
1080
1081    #[test]
1082    fn round_trip_or_does_not_double_count_overlapping_rows() {
1083        // A row matched by both groups must appear once (SQL OR is set
1084        // union over candidate rows in the WHERE evaluation, but each
1085        // row is yielded once because the FROM is a single table).
1086        let idx = three_level_fixture();
1087        // Both clauses match the error row.
1088        let rows = run_query(
1089            idx.connection(),
1090            r#"level=error OR message contains "boom""#,
1091        );
1092        assert_eq!(rows.len(), 1);
1093        assert_eq!(rows[0].level.as_deref(), Some("error"));
1094    }
1095
1096    #[test]
1097    fn round_trip_or_respects_and_precedence() {
1098        // (level=error AND service=payments) OR level=warn
1099        // Fixture has one error row with service=payments, one error
1100        // row with service=users, one info row, no warn rows.
1101        // Adding a warn row to the fixture so the OR side has a match.
1102        let mut idx = fixture();
1103        let mut warn = make_entry("2026-04-20T13:00:00Z", "warn", "almost full");
1104        warn.fields
1105            .insert("service".into(), Value::String("orders".into()));
1106        warn.fields.insert("req_id".into(), Value::from(400));
1107        idx.insert_batch(&[warn]).unwrap();
1108
1109        let rows = run_query(
1110            idx.connection(),
1111            "level=error AND service=payments OR level=warn",
1112        );
1113        // One error/payments row + one warn row = 2.
1114        assert_eq!(rows.len(), 2);
1115        let messages: HashSet<String> = rows
1116            .iter()
1117            .map(|e| e.message.clone().unwrap_or_default())
1118            .collect();
1119        assert!(messages.contains("payment failed"));
1120        assert!(messages.contains("almost full"));
1121    }
1122
1123    #[test]
1124    fn round_trip_or_with_time_range() {
1125        // "Errors any time, OR anything in the last hour" is a real-
1126        // world disjunction shape.
1127        let idx = fixture();
1128        let now = Utc.with_ymd_and_hms(2026, 4, 20, 12, 30, 0).unwrap();
1129        let rows = run_query_at(idx.connection(), "level=error OR last 30m", now);
1130        // The two error rows + the one row in the last 30m (12:00 row).
1131        // The 12:00 row is also an error row, so dedup yields 2.
1132        assert_eq!(rows.len(), 2);
1133    }
1134
1135    #[test]
1136    fn round_trip_or_yields_results_ordered_newest_first() {
1137        // Ordering must be applied across the union, not within each group.
1138        let idx = three_level_fixture();
1139        let rows = run_query(idx.connection(), "level=error OR level=info");
1140        assert_eq!(rows.len(), 2);
1141        assert!(rows[0].timestamp > rows[1].timestamp);
1142        // Newest is the info row at 11:00, oldest is the error at 09:00.
1143        assert_eq!(rows[0].level.as_deref(), Some("info"));
1144        assert_eq!(rows[1].level.as_deref(), Some("error"));
1145    }
1146
1147    #[test]
1148    fn round_trip_or_with_zero_matches_in_one_group_still_returns_other() {
1149        let idx = three_level_fixture();
1150        // First group matches nothing, second matches one row.
1151        let rows = run_query(idx.connection(), "level=fatal OR level=warn");
1152        assert_eq!(rows.len(), 1);
1153        assert_eq!(rows[0].level.as_deref(), Some("warn"));
1154    }
1155
1156    // -----------------------------------------------------------------
1157    // Round-trip: parenthesized group queries (new in v0.3.0)
1158    // -----------------------------------------------------------------
1159
1160    #[test]
1161    fn round_trip_paren_single_clause_same_result_as_unwrapped() {
1162        // `(level=error)` must return the same rows as `level=error`.
1163        let idx = fixture();
1164        let paren = run_query(idx.connection(), "(level=error)");
1165        let plain = run_query(idx.connection(), "level=error");
1166        assert_eq!(paren.len(), plain.len());
1167        let paren_ts: HashSet<_> = paren.iter().map(|e| e.timestamp.clone()).collect();
1168        let plain_ts: HashSet<_> = plain.iter().map(|e| e.timestamp.clone()).collect();
1169        assert_eq!(paren_ts, plain_ts);
1170    }
1171
1172    #[test]
1173    fn round_trip_paren_or_inside_and_filters_correctly() {
1174        // Fixture rows:
1175        //   a: error, service=payments
1176        //   b: info,  service=payments
1177        //   c: error, service=users
1178        //
1179        // `(level=error OR level=info) AND service=payments`
1180        // Matches a (error/payments) and b (info/payments). NOT c (users).
1181        let idx = fixture();
1182        let rows = run_query(
1183            idx.connection(),
1184            "(level=error OR level=info) AND service=payments",
1185        );
1186        assert_eq!(rows.len(), 2);
1187        let messages: HashSet<String> = rows
1188            .iter()
1189            .map(|e| e.message.clone().unwrap_or_default())
1190            .collect();
1191        assert!(messages.contains("payment failed"));
1192        assert!(messages.contains("health check"));
1193        assert!(!messages.contains("timeout on db call"));
1194    }
1195
1196    #[test]
1197    fn round_trip_paren_changes_precedence_vs_no_paren() {
1198        // Fixture rows:
1199        //   a: error, service=payments
1200        //   b: info,  service=payments
1201        //   c: error, service=users
1202        //
1203        // WITHOUT parens: `level=error OR level=info AND service=payments`
1204        //   = (level=error) OR (level=info AND service=payments)
1205        //   Matches a, c (error) + b (info/payments) = 3 rows.
1206        //
1207        // WITH parens: `(level=error OR level=info) AND service=payments`
1208        //   = ((level=error OR level=info)) AND service=payments
1209        //   Matches a (error/payments) + b (info/payments) = 2 rows.
1210        let idx = fixture();
1211        let without_paren = run_query(
1212            idx.connection(),
1213            "level=error OR level=info AND service=payments",
1214        );
1215        let with_paren = run_query(
1216            idx.connection(),
1217            "(level=error OR level=info) AND service=payments",
1218        );
1219        assert_eq!(
1220            without_paren.len(),
1221            3,
1222            "no parens: all error rows + info/payments"
1223        );
1224        assert_eq!(with_paren.len(), 2, "parens: only payments-service rows");
1225    }
1226
1227    #[test]
1228    fn round_trip_nested_parens_execute_correctly() {
1229        // `((level=error) OR level=info) AND service=payments`
1230        // Outer paren wraps an inner paren — same result as single-level paren test.
1231        let idx = fixture();
1232        let rows = run_query(
1233            idx.connection(),
1234            "((level=error) OR level=info) AND service=payments",
1235        );
1236        assert_eq!(rows.len(), 2);
1237    }
1238
1239    // -----------------------------------------------------------------
1240    // Safety guards
1241    // -----------------------------------------------------------------
1242
1243    #[test]
1244    fn unsafe_field_name_is_rejected_at_executor() {
1245        let result = column_for_field("service; DROP TABLE log_entries--");
1246        assert!(matches!(result, Err(LogdiveError::UnsafeFieldName(_))));
1247    }
1248
1249    #[test]
1250    fn is_safe_json_path_segment_rejects_single_quote() {
1251        assert!(!is_safe_json_path_segment("service'"));
1252    }
1253
1254    #[test]
1255    fn is_safe_json_path_segment_rejects_space() {
1256        assert!(!is_safe_json_path_segment("ser vice"));
1257    }
1258
1259    #[test]
1260    fn is_safe_json_path_segment_rejects_empty_string() {
1261        assert!(!is_safe_json_path_segment(""));
1262    }
1263
1264    #[test]
1265    fn is_safe_json_path_segment_allows_dotted() {
1266        assert!(is_safe_json_path_segment("user.id"));
1267    }
1268
1269    #[test]
1270    fn column_for_field_rejects_hyphen_payload() {
1271        // `svc-name` passes the tokenizer (hyphens allowed in bare words) but
1272        // is caught by the executor's defensive re-check before reaching SQL.
1273        let err = column_for_field("svc-name").unwrap_err();
1274        assert!(matches!(err, LogdiveError::UnsafeFieldName(_)));
1275    }
1276
1277    #[test]
1278    fn column_for_field_rejects_unicode_non_ascii() {
1279        // U+2019 RIGHT SINGLE QUOTATION MARK — visually similar to a quote,
1280        // multi-byte UTF-8. Not ASCII-alphanumeric, so rejected as unsafe.
1281        let err = column_for_field("svc\u{2019}").unwrap_err();
1282        assert!(matches!(err, LogdiveError::UnsafeFieldName(_)));
1283    }
1284
1285    // -----------------------------------------------------------------
1286    // Time-range edge cases
1287    // -----------------------------------------------------------------
1288
1289    #[test]
1290    fn since_accepts_naive_datetime_space_separator() {
1291        // `since "YYYY-MM-DD HH:MM:SS"` (space instead of T) must be parsed
1292        // as UTC midnight and filter correctly.
1293        let idx = fixture();
1294        // Row at 10:00 is the boundary; rows at 11:00 and 12:00 are above.
1295        let rows = run_query(idx.connection(), r#"since "2026-04-20 11:00:00""#);
1296        assert_eq!(
1297            rows.len(),
1298            2,
1299            "space-separated naive datetime must filter rows"
1300        );
1301    }
1302
1303    #[test]
1304    fn since_boundary_row_at_cutoff_is_included() {
1305        // `since` uses `>=`, so a row whose timestamp equals the cutoff must
1306        // appear in results — it is not strictly in the past.
1307        let idx = fixture();
1308        let rows = run_query(idx.connection(), "since 2026-04-20T12:00:00Z");
1309        assert_eq!(rows.len(), 1);
1310        assert_eq!(rows[0].timestamp.as_deref(), Some("2026-04-20T12:00:00Z"));
1311    }
1312
1313    #[test]
1314    fn since_future_timestamp_returns_empty() {
1315        let idx = fixture();
1316        let rows = run_query(idx.connection(), "since 2030-01-01T00:00:00Z");
1317        assert!(rows.is_empty(), "future cutoff must return no rows");
1318    }
1319
1320    #[test]
1321    fn last_very_large_amount_saturates_to_epoch_returns_all_rows() {
1322        // A pathologically large duration saturates to the Unix epoch (via
1323        // `checked_sub_signed` → `unwrap_or` epoch fallback in executor).
1324        // All rows with any timestamp must match; must not panic or error.
1325        let idx = fixture();
1326        // 9_999_999_999h ≈ 1.14 billion years — far beyond any real timestamp.
1327        let rows = run_query(idx.connection(), "last 9999999999h");
1328        assert_eq!(rows.len(), 3, "epoch-saturated cutoff must match all rows");
1329    }
1330
1331    #[test]
1332    fn since_rfc3339_with_utc_offset_equivalent_to_z() {
1333        // +00:00 and Z are both UTC; the cutoff must be identical.
1334        let idx = fixture();
1335        let rows_z = run_query(idx.connection(), "since 2026-04-20T11:00:00Z");
1336        let rows_offset = run_query(idx.connection(), r#"since "2026-04-20T11:00:00+00:00""#);
1337        assert_eq!(
1338            rows_z.len(),
1339            rows_offset.len(),
1340            "Z and +00:00 offsets must produce the same row count"
1341        );
1342    }
1343}