1use chrono::{DateTime, NaiveDate, NaiveDateTime, TimeZone, Utc};
62use rusqlite::{Connection, params_from_iter, types::Value as SqlValue};
63use serde_json::{Map, Value};
64
65use crate::entry::LogEntry;
66use crate::error::{LogdiveError, Result};
67use crate::query::{AndGroup, Clause, Duration, QueryNode, QueryValue};
68
69#[derive(Debug, Clone, Copy, Default)]
80pub struct QueryOptions {
81 pub limit: Option<usize>,
83 pub offset: Option<usize>,
86}
87
88pub fn execute(query: &QueryNode, conn: &Connection, opts: QueryOptions) -> Result<Vec<LogEntry>> {
98 let (sql, binds) = build_sql(query, opts, Utc::now())?;
99 run(conn, &sql, &binds)
100}
101
102pub fn execute_at(
106 query: &QueryNode,
107 conn: &Connection,
108 opts: QueryOptions,
109 now: DateTime<Utc>,
110) -> Result<Vec<LogEntry>> {
111 let (sql, binds) = build_sql(query, opts, now)?;
112 run(conn, &sql, &binds)
113}
114
115type Bind = SqlValue;
123
124fn build_sql(
125 query: &QueryNode,
126 opts: QueryOptions,
127 now: DateTime<Utc>,
128) -> Result<(String, Vec<Bind>)> {
129 let QueryNode::Or(groups) = query;
130
131 let mut group_sqls: Vec<String> = Vec::with_capacity(groups.len());
136 let mut binds: Vec<Bind> = Vec::new();
137
138 for group in groups {
139 let (group_sql, mut group_binds) = translate_and_group(group, now)?;
140 group_sqls.push(group_sql);
141 binds.append(&mut group_binds);
142 }
143
144 let where_sql = if group_sqls.is_empty() {
145 "1=1".to_string()
147 } else {
148 group_sqls.join(" OR ")
150 };
151
152 let mut sql = format!(
153 "SELECT timestamp, level, message, tag, fields, raw \
154 FROM log_entries \
155 WHERE {where_sql} \
156 ORDER BY timestamp DESC, id DESC"
157 );
158
159 match (opts.limit, opts.offset) {
162 (Some(lim), Some(off)) if off > 0 => {
163 sql.push_str(&format!(" LIMIT {lim} OFFSET {off}"));
164 }
165 (Some(lim), _) => {
166 sql.push_str(&format!(" LIMIT {lim}"));
167 }
168 (None, Some(off)) if off > 0 => {
169 sql.push_str(&format!(" LIMIT -1 OFFSET {off}"));
170 }
171 _ => {}
172 }
173
174 Ok((sql, binds))
175}
176
177fn translate_and_group(group: &AndGroup, now: DateTime<Utc>) -> Result<(String, Vec<Bind>)> {
183 let mut clause_sqls: Vec<String> = Vec::with_capacity(group.clauses.len());
184 let mut binds: Vec<Bind> = Vec::new();
185
186 for clause in &group.clauses {
187 let (sql, mut clause_binds) = translate_clause(clause, now)?;
188 clause_sqls.push(sql);
189 binds.append(&mut clause_binds);
190 }
191
192 let inner = if clause_sqls.is_empty() {
193 "1=1".to_string()
195 } else {
196 clause_sqls.join(" AND ")
197 };
198 Ok((format!("({inner})"), binds))
199}
200
201fn translate_clause(clause: &Clause, now: DateTime<Utc>) -> Result<(String, Vec<Bind>)> {
202 match clause {
203 Clause::Compare { field, op, value } => {
204 let (column_expr, bind) = if field == "level" {
207 let lowered = match value {
208 QueryValue::String(s) => SqlValue::Text(s.to_lowercase()),
209 other => value_to_bind(other),
210 };
211 ("lower(level)".to_string(), lowered)
212 } else {
213 (column_for_field(field)?, value_to_bind(value))
214 };
215 let sql = format!("{column_expr} {op} ?");
216 Ok((sql, vec![bind]))
217 }
218 Clause::Contains { field, value } => {
219 let (column_expr, normalised_value) = if field == "level" {
224 ("lower(level)".to_string(), value.to_lowercase())
225 } else {
226 (column_for_field(field)?, value.clone())
227 };
228 let escaped = escape_like(&normalised_value);
229 let pattern = format!("%{escaped}%");
230 let sql = format!("{column_expr} LIKE ? ESCAPE '\\'");
231 Ok((sql, vec![SqlValue::Text(pattern)]))
232 }
233 Clause::LastDuration(d) => {
234 let cutoff = compute_last_cutoff(*d, now);
235 Ok((
236 "timestamp >= ?".to_string(),
237 vec![SqlValue::Text(cutoff.to_rfc3339())],
238 ))
239 }
240 Clause::SinceDatetime(s) => {
241 let dt = parse_datetime(s)?;
242 Ok((
243 "timestamp >= ?".to_string(),
244 vec![SqlValue::Text(dt.to_rfc3339())],
245 ))
246 }
247 Clause::Group(inner) => {
248 let QueryNode::Or(groups) = inner.as_ref();
254 let mut group_sqls: Vec<String> = Vec::with_capacity(groups.len());
255 let mut binds: Vec<Bind> = Vec::new();
256 for group in groups {
257 let (gsql, mut gbinds) = translate_and_group(group, now)?;
258 group_sqls.push(gsql);
259 binds.append(&mut gbinds);
260 }
261 let sql = if group_sqls.len() == 1 {
262 group_sqls.into_iter().next().unwrap()
263 } else {
264 format!("({})", group_sqls.join(" OR "))
265 };
266 Ok((sql, binds))
267 }
268 }
269}
270
271fn column_for_field(field: &str) -> Result<String> {
278 if LogEntry::KNOWN_KEYS.contains(&field) {
279 Ok(field.to_string())
280 } else {
281 if !is_safe_json_path_segment(field) {
282 return Err(LogdiveError::UnsafeFieldName(field.to_string()));
283 }
284 Ok(format!("json_extract(fields, '$.{field}')"))
285 }
286}
287
288fn is_safe_json_path_segment(s: &str) -> bool {
292 !s.is_empty()
293 && s.chars()
294 .next()
295 .map(|c| c.is_ascii_alphabetic() || c == '_')
296 .unwrap_or(false)
297 && s.chars()
298 .all(|c| c.is_ascii_alphanumeric() || c == '_' || c == '.')
299}
300
301fn value_to_bind(v: &QueryValue) -> Bind {
302 match v {
303 QueryValue::String(s) => SqlValue::Text(s.clone()),
304 QueryValue::Integer(n) => SqlValue::Integer(*n),
305 QueryValue::Float(f) => SqlValue::Real(*f),
306 QueryValue::Bool(b) => SqlValue::Integer(if *b { 1 } else { 0 }),
307 }
308}
309
310fn escape_like(input: &str) -> String {
313 let mut out = String::with_capacity(input.len());
314 for ch in input.chars() {
315 match ch {
316 '\\' | '%' | '_' => {
317 out.push('\\');
318 out.push(ch);
319 }
320 _ => out.push(ch),
321 }
322 }
323 out
324}
325
326fn compute_last_cutoff(d: Duration, now: DateTime<Utc>) -> DateTime<Utc> {
327 let amount_i64 = i64::try_from(d.amount).unwrap_or(i64::MAX);
330 let secs = amount_i64.saturating_mul(d.unit.seconds());
331 let delta = chrono::Duration::seconds(secs);
332 now.checked_sub_signed(delta).unwrap_or_else(|| {
333 Utc.timestamp_opt(0, 0)
334 .single()
335 .expect("unix epoch is valid")
336 })
337}
338
339fn parse_datetime(s: &str) -> Result<DateTime<Utc>> {
344 if let Ok(dt) = DateTime::parse_from_rfc3339(s) {
345 return Ok(dt.with_timezone(&Utc));
346 }
347 for fmt in &["%Y-%m-%dT%H:%M:%S", "%Y-%m-%d %H:%M:%S"] {
348 if let Ok(ndt) = NaiveDateTime::parse_from_str(s, fmt) {
349 return Ok(Utc.from_utc_datetime(&ndt));
350 }
351 }
352 if let Ok(nd) = NaiveDate::parse_from_str(s, "%Y-%m-%d") {
353 let ndt = nd.and_hms_opt(0, 0, 0).expect("00:00:00 is valid");
354 return Ok(Utc.from_utc_datetime(&ndt));
355 }
356 Err(LogdiveError::InvalidDatetime {
357 input: s.to_string(),
358 reason: "expected RFC3339, `YYYY-MM-DD HH:MM:SS`, or `YYYY-MM-DD`".to_string(),
359 })
360}
361
362fn run(conn: &Connection, sql: &str, binds: &[Bind]) -> Result<Vec<LogEntry>> {
367 let mut stmt = conn.prepare(sql)?;
368 let rows = stmt.query_map(params_from_iter(binds.iter()), |row| {
369 let timestamp: Option<String> = row.get(0)?;
370 let level: Option<String> = row.get(1)?;
371 let message: Option<String> = row.get(2)?;
372 let tag: Option<String> = row.get(3)?;
373 let fields_json: String = row.get(4)?;
374 let raw: String = row.get(5)?;
375 Ok((timestamp, level, message, tag, fields_json, raw))
378 })?;
379
380 let mut out = Vec::new();
381 for row in rows {
382 let (timestamp, level, message, tag, fields_json, raw) = row?;
383 let fields: Map<String, Value> =
384 serde_json::from_str(&fields_json).map_err(LogdiveError::CorruptFieldsJson)?;
385 out.push(LogEntry {
386 timestamp,
387 level,
388 message,
389 tag,
390 fields,
391 raw,
392 });
393 }
394 Ok(out)
395}
396
397#[cfg(test)]
402mod tests {
403 use super::*;
404 use crate::indexer::Indexer;
405 use crate::query::parse;
406 use std::collections::HashSet;
407
408 fn run_query(conn: &Connection, q: &str) -> Vec<LogEntry> {
411 let ast = parse(q).expect("test queries are well-formed");
412 execute(&ast, conn, QueryOptions::default()).expect("execute")
413 }
414
415 fn run_query_opts(conn: &Connection, q: &str, opts: QueryOptions) -> Vec<LogEntry> {
416 let ast = parse(q).expect("test queries are well-formed");
417 execute(&ast, conn, opts).expect("execute")
418 }
419
420 fn run_query_at(conn: &Connection, q: &str, now: DateTime<Utc>) -> Vec<LogEntry> {
421 let ast = parse(q).expect("test queries are well-formed");
422 execute_at(&ast, conn, QueryOptions::default(), now).expect("execute")
423 }
424
425 fn make_entry(ts: &str, level: &str, message: &str) -> LogEntry {
426 let raw = format!(r#"{{"timestamp":"{ts}","level":"{level}","message":"{message}"}}"#);
427 let mut e = LogEntry::new(raw);
428 e.timestamp = Some(ts.to_string());
429 e.level = Some(level.to_string());
430 e.message = Some(message.to_string());
431 e
432 }
433
434 fn fixture() -> Indexer {
435 let mut idx = Indexer::open_in_memory().unwrap();
436 let mut a = make_entry("2026-04-20T10:00:00Z", "error", "payment failed");
437 a.tag = Some("api".into());
438 a.fields
439 .insert("service".into(), Value::String("payments".into()));
440 a.fields.insert("req_id".into(), Value::from(100));
441
442 let mut b = make_entry("2026-04-20T11:00:00Z", "info", "health check");
443 b.tag = Some("api".into());
444 b.fields
445 .insert("service".into(), Value::String("payments".into()));
446 b.fields.insert("req_id".into(), Value::from(200));
447
448 let mut c = make_entry("2026-04-20T12:00:00Z", "error", "timeout on db call");
449 c.fields
450 .insert("service".into(), Value::String("users".into()));
451 c.fields.insert("req_id".into(), Value::from(300));
452
453 idx.insert_batch(&[a, b, c]).unwrap();
454 idx
455 }
456
457 fn three_level_fixture() -> Indexer {
461 let mut idx = Indexer::open_in_memory().unwrap();
462 let a = make_entry("2026-04-20T09:00:00Z", "error", "boom");
463 let b = make_entry("2026-04-20T10:00:00Z", "warn", "slow query");
464 let c = make_entry("2026-04-20T11:00:00Z", "info", "ok");
465 idx.insert_batch(&[a, b, c]).unwrap();
466 idx
467 }
468
469 #[test]
474 fn compare_on_known_field_binds_value_not_interpolates() {
475 let ast = parse("level=error").unwrap();
476 let (sql, binds) = build_sql(&ast, QueryOptions::default(), Utc::now()).unwrap();
477 assert!(sql.contains("WHERE (lower(level) = ?)"));
480 assert!(!sql.contains("error"));
481 assert_eq!(binds.len(), 1);
482 match &binds[0] {
483 SqlValue::Text(s) => assert_eq!(s, "error"),
484 other => panic!("expected text bind, got {other:?}"),
485 }
486 }
487
488 #[test]
489 fn compare_on_unknown_field_uses_json_extract() {
490 let ast = parse("service=payments").unwrap();
491 let (sql, binds) = build_sql(&ast, QueryOptions::default(), Utc::now()).unwrap();
492 assert!(sql.contains("json_extract(fields, '$.service')"));
493 assert_eq!(binds.len(), 1);
494 }
495
496 #[test]
497 fn contains_uses_like_with_escape_and_wildcards() {
498 let ast = parse(r#"message contains "timeout""#).unwrap();
499 let (sql, binds) = build_sql(&ast, QueryOptions::default(), Utc::now()).unwrap();
500 assert!(sql.contains("LIKE ? ESCAPE '\\'"));
501 match &binds[0] {
502 SqlValue::Text(s) => assert_eq!(s, "%timeout%"),
503 other => panic!("expected text bind, got {other:?}"),
504 }
505 }
506
507 #[test]
508 fn contains_escapes_like_metacharacters() {
509 let ast = parse(r#"message contains "50%""#).unwrap();
510 let (_, binds) = build_sql(&ast, QueryOptions::default(), Utc::now()).unwrap();
511 match &binds[0] {
512 SqlValue::Text(s) => assert_eq!(s, r"%50\%%"),
513 other => panic!("unexpected bind: {other:?}"),
514 }
515 }
516
517 #[test]
518 fn last_duration_produces_timestamp_lower_bound() {
519 let ast = parse("last 2h").unwrap();
520 let now = Utc.with_ymd_and_hms(2026, 4, 20, 12, 0, 0).unwrap();
521 let (sql, binds) = build_sql(&ast, QueryOptions::default(), now).unwrap();
522 assert!(sql.contains("timestamp >= ?"));
523 match &binds[0] {
524 SqlValue::Text(s) => assert!(s.starts_with("2026-04-20T10:00:00")),
525 other => panic!("unexpected bind: {other:?}"),
526 }
527 }
528
529 #[test]
530 fn since_accepts_rfc3339() {
531 let ast = parse(r#"since "2024-01-01T10:00:00Z""#).unwrap();
532 let (sql, binds) = build_sql(&ast, QueryOptions::default(), Utc::now()).unwrap();
533 assert!(sql.contains("timestamp >= ?"));
534 match &binds[0] {
535 SqlValue::Text(s) => assert!(s.starts_with("2024-01-01T10:00:00")),
536 other => panic!("unexpected: {other:?}"),
537 }
538 }
539
540 #[test]
541 fn since_accepts_bare_date() {
542 let ast = parse("since 2024-06-15").unwrap();
543 let (_, binds) = build_sql(&ast, QueryOptions::default(), Utc::now()).unwrap();
544 match &binds[0] {
545 SqlValue::Text(s) => assert!(s.starts_with("2024-06-15T00:00:00")),
546 other => panic!("unexpected: {other:?}"),
547 }
548 }
549
550 #[test]
551 fn since_rejects_garbage() {
552 let ast = parse("since not-a-date").unwrap();
553 let err = build_sql(&ast, QueryOptions::default(), Utc::now()).unwrap_err();
554 assert!(matches!(err, LogdiveError::InvalidDatetime { .. }));
555 }
556
557 #[test]
558 fn and_chain_joins_with_and_inside_a_single_group() {
559 let ast = parse("level=error AND service=payments").unwrap();
560 let (sql, binds) = build_sql(&ast, QueryOptions::default(), Utc::now()).unwrap();
561 assert!(sql.contains("lower(level) = ?"));
562 assert!(sql.contains("json_extract(fields, '$.service') = ?"));
563 assert!(sql.contains(" AND "));
565 assert!(!sql.contains(" OR "));
566 assert!(sql.contains("WHERE (lower(level) = ? AND json_extract(fields, '$.service') = ?)"));
568 assert_eq!(binds.len(), 2);
569 match (&binds[0], &binds[1]) {
570 (SqlValue::Text(a), SqlValue::Text(b)) => {
571 assert_eq!(a, "error");
572 assert_eq!(b, "payments");
573 }
574 other => panic!("unexpected binds: {other:?}"),
575 }
576 }
577
578 #[test]
579 fn integer_binds_as_integer_not_text() {
580 let ast = parse("req_id > 100").unwrap();
581 let (_, binds) = build_sql(&ast, QueryOptions::default(), Utc::now()).unwrap();
582 match &binds[0] {
583 SqlValue::Integer(n) => assert_eq!(*n, 100),
584 other => panic!("expected integer bind, got {other:?}"),
585 }
586 }
587
588 #[test]
589 fn bool_binds_as_integer_zero_or_one() {
590 let ast = parse("ok=true").unwrap();
591 let (_, binds) = build_sql(&ast, QueryOptions::default(), Utc::now()).unwrap();
592 assert!(matches!(binds[0], SqlValue::Integer(1)));
593
594 let ast = parse("ok=false").unwrap();
595 let (_, binds) = build_sql(&ast, QueryOptions::default(), Utc::now()).unwrap();
596 assert!(matches!(binds[0], SqlValue::Integer(0)));
597 }
598
599 #[test]
600 fn float_binds_as_real() {
601 let ast = parse("duration < 1.5").unwrap();
602 let (_, binds) = build_sql(&ast, QueryOptions::default(), Utc::now()).unwrap();
603 match &binds[0] {
604 SqlValue::Real(f) => assert!((f - 1.5).abs() < 1e-9),
605 other => panic!("expected real bind, got {other:?}"),
606 }
607 }
608
609 #[test]
614 fn limit_only_appends_limit_clause() {
615 let ast = parse("level=error").unwrap();
616 let opts = QueryOptions {
617 limit: Some(50),
618 offset: None,
619 };
620 let (sql, _) = build_sql(&ast, opts, Utc::now()).unwrap();
621 assert!(sql.ends_with("LIMIT 50"), "sql: {sql}");
622 }
623
624 #[test]
625 fn offset_zero_treated_as_absent_no_suffix() {
626 let ast = parse("level=error").unwrap();
627 let opts = QueryOptions {
628 limit: None,
629 offset: Some(0),
630 };
631 let (sql, _) = build_sql(&ast, opts, Utc::now()).unwrap();
632 assert!(
633 sql.ends_with("DESC"),
634 "offset=0 must not append any suffix, sql: {sql}"
635 );
636 }
637
638 #[test]
639 fn offset_without_limit_uses_limit_neg1() {
640 let ast = parse("level=error").unwrap();
641 let opts = QueryOptions {
642 limit: None,
643 offset: Some(10),
644 };
645 let (sql, _) = build_sql(&ast, opts, Utc::now()).unwrap();
646 assert!(sql.ends_with("LIMIT -1 OFFSET 10"), "sql: {sql}");
647 }
648
649 #[test]
650 fn limit_and_offset_both_appended() {
651 let ast = parse("level=error").unwrap();
652 let opts = QueryOptions {
653 limit: Some(25),
654 offset: Some(50),
655 };
656 let (sql, _) = build_sql(&ast, opts, Utc::now()).unwrap();
657 assert!(sql.ends_with("LIMIT 25 OFFSET 50"), "sql: {sql}");
658 }
659
660 #[test]
661 fn no_limit_no_offset_produces_no_suffix() {
662 let ast = parse("level=error").unwrap();
663 let (sql, _) = build_sql(&ast, QueryOptions::default(), Utc::now()).unwrap();
664 assert!(sql.ends_with("DESC"), "sql: {sql}");
665 }
666
667 #[test]
672 fn or_emits_two_parenthesized_groups_joined_by_or() {
673 let ast = parse("level=error OR level=warn").unwrap();
674 let (sql, binds) = build_sql(&ast, QueryOptions::default(), Utc::now()).unwrap();
675 assert!(sql.contains("WHERE (lower(level) = ?) OR (lower(level) = ?)"));
677 match (&binds[0], &binds[1]) {
679 (SqlValue::Text(a), SqlValue::Text(b)) => {
680 assert_eq!(a, "error");
681 assert_eq!(b, "warn");
682 }
683 other => panic!("unexpected binds: {other:?}"),
684 }
685 }
686
687 #[test]
688 fn or_with_three_groups_joins_with_two_or_keywords() {
689 let ast = parse("level=error OR level=warn OR level=fatal").unwrap();
690 let (sql, _binds) = build_sql(&ast, QueryOptions::default(), Utc::now()).unwrap();
691 assert_eq!(sql.matches(" OR ").count(), 2);
693 assert!(sql.contains("(lower(level) = ?) OR (lower(level) = ?) OR (lower(level) = ?)"));
696 }
697
698 #[test]
699 fn or_with_and_inside_each_group_emits_correct_shape() {
700 let ast = parse("level=error AND service=payments OR level=warn").unwrap();
702 let (sql, binds) = build_sql(&ast, QueryOptions::default(), Utc::now()).unwrap();
703 assert!(sql.contains(
704 "WHERE (lower(level) = ? AND json_extract(fields, '$.service') = ?) \
705 OR (lower(level) = ?)"
706 ));
707 assert_eq!(binds.len(), 3);
709 match (&binds[0], &binds[1], &binds[2]) {
710 (SqlValue::Text(a), SqlValue::Text(b), SqlValue::Text(c)) => {
711 assert_eq!(a, "error");
712 assert_eq!(b, "payments");
713 assert_eq!(c, "warn");
714 }
715 other => panic!("unexpected binds: {other:?}"),
716 }
717 }
718
719 #[test]
720 fn or_with_and_on_both_sides_preserves_bind_ordering() {
721 let ast = parse("a=1 AND b=2 OR c=3 AND d=4").unwrap();
724 let (sql, binds) = build_sql(&ast, QueryOptions::default(), Utc::now()).unwrap();
725 assert_eq!(sql.matches(" OR ").count(), 1);
726 assert_eq!(binds.len(), 4);
727 match (&binds[0], &binds[1], &binds[2], &binds[3]) {
728 (
729 SqlValue::Integer(a),
730 SqlValue::Integer(b),
731 SqlValue::Integer(c),
732 SqlValue::Integer(d),
733 ) => {
734 assert_eq!(*a, 1);
735 assert_eq!(*b, 2);
736 assert_eq!(*c, 3);
737 assert_eq!(*d, 4);
738 }
739 other => panic!("unexpected binds: {other:?}"),
740 }
741 }
742
743 #[test]
744 fn or_with_mixed_clause_kinds_in_each_group() {
745 let ast = parse(r#"level=error OR message contains "timeout" OR last 30m"#).unwrap();
748 let now = Utc.with_ymd_and_hms(2026, 4, 20, 12, 0, 0).unwrap();
749 let (sql, binds) = build_sql(&ast, QueryOptions::default(), now).unwrap();
750 assert_eq!(sql.matches(" OR ").count(), 2);
751 assert_eq!(binds.len(), 3);
752
753 assert!(matches!(&binds[0], SqlValue::Text(s) if s == "error"));
755 assert!(matches!(&binds[1], SqlValue::Text(s) if s == "%timeout%"));
757 match &binds[2] {
759 SqlValue::Text(s) => assert!(s.starts_with("2026-04-20T11:30:00")),
760 other => panic!("expected text cutoff, got {other:?}"),
761 }
762 }
763
764 #[test]
765 fn limit_applies_to_or_query_too() {
766 let ast = parse("level=error OR level=warn").unwrap();
767 let opts = QueryOptions {
768 limit: Some(25),
769 offset: None,
770 };
771 let (sql, _) = build_sql(&ast, opts, Utc::now()).unwrap();
772 assert!(sql.ends_with("LIMIT 25"));
773 assert!(sql.contains(" ORDER BY "));
775 }
776
777 #[test]
782 fn paren_single_clause_sql_shape_and_binds() {
783 let paren_ast = parse("(level=error)").unwrap();
787 let plain_ast = parse("level=error").unwrap();
788 let (paren_sql, paren_binds) =
789 build_sql(&paren_ast, QueryOptions::default(), Utc::now()).unwrap();
790 let (plain_sql, plain_binds) =
791 build_sql(&plain_ast, QueryOptions::default(), Utc::now()).unwrap();
792 assert!(
793 paren_sql.contains("WHERE ((lower(level) = ?))"),
794 "paren form: {paren_sql}"
795 );
796 assert!(
797 plain_sql.contains("WHERE (lower(level) = ?)"),
798 "plain form: {plain_sql}"
799 );
800 assert_eq!(paren_binds, plain_binds);
801 }
802
803 #[test]
804 fn paren_or_inside_and_emits_nested_parens() {
805 let ast = parse("(level=error OR level=warn) AND service=payments").unwrap();
809 let (sql, binds) = build_sql(&ast, QueryOptions::default(), Utc::now()).unwrap();
810 assert!(sql.contains("((lower(level) = ?) OR (lower(level) = ?))"));
812 assert!(sql.contains("json_extract(fields, '$.service') = ?"));
813 assert_eq!(binds.len(), 3);
814 match (&binds[0], &binds[1], &binds[2]) {
815 (SqlValue::Text(a), SqlValue::Text(b), SqlValue::Text(c)) => {
816 assert_eq!(a, "error");
817 assert_eq!(b, "warn");
818 assert_eq!(c, "payments");
819 }
820 other => panic!("unexpected binds: {other:?}"),
821 }
822 }
823
824 #[test]
825 fn paren_group_binds_are_in_clause_order() {
826 let ast = parse("(a=1 OR b=2) AND c=3").unwrap();
828 let (_, binds) = build_sql(&ast, QueryOptions::default(), Utc::now()).unwrap();
829 assert_eq!(binds.len(), 3);
830 match (&binds[0], &binds[1], &binds[2]) {
831 (SqlValue::Integer(a), SqlValue::Integer(b), SqlValue::Integer(c)) => {
832 assert_eq!(*a, 1);
833 assert_eq!(*b, 2);
834 assert_eq!(*c, 3);
835 }
836 other => panic!("unexpected binds: {other:?}"),
837 }
838 }
839
840 #[test]
845 fn round_trip_known_field_equality() {
846 let idx = fixture();
847 let rows = run_query(idx.connection(), "level=error");
848 assert_eq!(rows.len(), 2);
849 assert!(rows.iter().all(|e| e.level.as_deref() == Some("error")));
850 }
851
852 #[test]
853 fn round_trip_unknown_field_via_json_extract() {
854 let idx = fixture();
855 let rows = run_query(idx.connection(), "service=payments");
856 assert_eq!(rows.len(), 2);
857 assert!(
858 rows.iter()
859 .all(|e| e.fields.get("service") == Some(&Value::String("payments".into())))
860 );
861 }
862
863 #[test]
864 fn round_trip_and_chain() {
865 let idx = fixture();
866 let rows = run_query(idx.connection(), "level=error AND service=payments");
867 assert_eq!(rows.len(), 1);
868 assert_eq!(rows[0].message.as_deref(), Some("payment failed"));
869 }
870
871 #[test]
872 fn round_trip_contains_substring_match() {
873 let idx = fixture();
874 let rows = run_query(idx.connection(), r#"message contains "timeout""#);
875 assert_eq!(rows.len(), 1);
876 assert!(rows[0].message.as_deref().unwrap().contains("timeout"));
877 }
878
879 #[test]
880 fn round_trip_numeric_comparison_on_json_field() {
881 let idx = fixture();
882 let rows = run_query(idx.connection(), "req_id > 150");
883 assert_eq!(rows.len(), 2);
884 let ids: HashSet<i64> = rows
885 .iter()
886 .map(|e| e.fields.get("req_id").and_then(|v| v.as_i64()).unwrap())
887 .collect();
888 assert_eq!(ids, HashSet::from([200, 300]));
889 }
890
891 #[test]
892 fn round_trip_last_duration_uses_now() {
893 let idx = fixture();
894 let now = Utc.with_ymd_and_hms(2026, 4, 20, 13, 0, 0).unwrap();
895 let rows = run_query_at(idx.connection(), "last 3h", now);
896 assert_eq!(rows.len(), 3);
897
898 let rows = run_query_at(idx.connection(), "last 70m", now);
899 assert_eq!(rows.len(), 1);
900 assert_eq!(rows[0].timestamp.as_deref(), Some("2026-04-20T12:00:00Z"));
901 }
902
903 #[test]
904 fn round_trip_since_datetime() {
905 let idx = fixture();
906 let rows = run_query(idx.connection(), "since 2026-04-20T11:00:00Z");
907 assert_eq!(rows.len(), 2);
908 }
909
910 #[test]
911 fn round_trip_results_ordered_newest_first() {
912 let idx = fixture();
913 let rows = run_query(idx.connection(), "level=error");
914 assert!(rows[0].timestamp > rows[1].timestamp);
915 }
916
917 #[test]
918 fn round_trip_not_equal_operator() {
919 let idx = fixture();
920 let rows = run_query(idx.connection(), "level!=error");
921 assert_eq!(rows.len(), 1);
922 assert_eq!(rows[0].level.as_deref(), Some("info"));
923 }
924
925 #[test]
926 fn round_trip_contains_with_wildcard_character_is_literal() {
927 let mut idx = Indexer::open_in_memory().unwrap();
928 let a = make_entry("2026-04-20T10:00:00Z", "info", "discount 50% today");
929 let b = make_entry("2026-04-20T11:00:00Z", "info", "no special char here");
930 idx.insert_batch(&[a, b]).unwrap();
931
932 let rows = run_query(idx.connection(), r#"message contains "50%""#);
933 assert_eq!(rows.len(), 1);
934 assert!(rows[0].message.as_deref().unwrap().contains("50%"));
935 }
936
937 #[test]
938 fn round_trip_empty_result_is_empty_vec_not_error() {
939 let idx = fixture();
940 let rows = run_query(idx.connection(), "level=nonsense");
941 assert!(rows.is_empty());
942 }
943
944 #[test]
945 fn round_trip_level_compare_is_case_insensitive() {
946 let idx = fixture();
948 let rows_upper = run_query(idx.connection(), "level=ERROR");
949 let rows_mixed = run_query(idx.connection(), "level=Error");
950 let rows_lower = run_query(idx.connection(), "level=error");
951 assert_eq!(rows_upper.len(), rows_lower.len());
952 assert_eq!(rows_mixed.len(), rows_lower.len());
953 assert!(
954 rows_lower
955 .iter()
956 .all(|e| e.level.as_deref() == Some("error"))
957 );
958 }
959
960 #[test]
961 fn round_trip_level_contains_is_case_insensitive() {
962 let idx = fixture();
964 let rows = run_query(idx.connection(), r#"level contains "ERR""#);
965 assert!(!rows.is_empty());
966 assert!(rows.iter().all(|e| e.level.as_deref() == Some("error")));
967 }
968
969 #[test]
970 fn round_trip_reconstructs_fields_map() {
971 let idx = fixture();
972 let rows = run_query(idx.connection(), "level=error AND service=payments");
973 assert_eq!(rows.len(), 1);
974 let e = &rows[0];
975 assert_eq!(
976 e.fields.get("service"),
977 Some(&Value::String("payments".into()))
978 );
979 assert_eq!(e.fields.get("req_id").and_then(|v| v.as_i64()), Some(100));
980 }
981
982 #[test]
987 fn round_trip_limit_caps_result_count() {
988 let idx = fixture();
990 let rows = run_query_opts(
991 idx.connection(),
992 "level=error OR level=info",
993 QueryOptions {
994 limit: Some(2),
995 offset: None,
996 },
997 );
998 assert_eq!(rows.len(), 2);
999 }
1000
1001 #[test]
1002 fn round_trip_offset_skips_leading_rows() {
1003 let idx = fixture();
1006 let all = run_query(idx.connection(), "level=error OR level=info");
1007 assert_eq!(all.len(), 3);
1008
1009 let paged = run_query_opts(
1010 idx.connection(),
1011 "level=error OR level=info",
1012 QueryOptions {
1013 limit: None,
1014 offset: Some(1),
1015 },
1016 );
1017 assert_eq!(paged.len(), 2);
1018 assert_eq!(paged[0].timestamp, all[1].timestamp);
1020 assert_eq!(paged[1].timestamp, all[2].timestamp);
1021 }
1022
1023 #[test]
1024 fn round_trip_limit_and_offset_returns_page() {
1025 let idx = fixture();
1028 let rows = run_query_opts(
1029 idx.connection(),
1030 "level=error OR level=info",
1031 QueryOptions {
1032 limit: Some(1),
1033 offset: Some(1),
1034 },
1035 );
1036 assert_eq!(rows.len(), 1);
1037 assert_eq!(rows[0].timestamp.as_deref(), Some("2026-04-20T11:00:00Z"));
1038 }
1039
1040 #[test]
1041 fn round_trip_offset_beyond_result_set_returns_empty() {
1042 let idx = fixture();
1043 let rows = run_query_opts(
1044 idx.connection(),
1045 "level=error OR level=info",
1046 QueryOptions {
1047 limit: None,
1048 offset: Some(100),
1049 },
1050 );
1051 assert!(rows.is_empty());
1052 }
1053
1054 #[test]
1059 fn round_trip_or_two_groups_returns_union() {
1060 let idx = three_level_fixture();
1061 let rows = run_query(idx.connection(), "level=error OR level=warn");
1062 assert_eq!(rows.len(), 2);
1063
1064 let levels: HashSet<String> = rows
1065 .iter()
1066 .map(|e| e.level.clone().unwrap_or_default())
1067 .collect();
1068 assert_eq!(
1069 levels,
1070 HashSet::from(["error".to_string(), "warn".to_string()])
1071 );
1072 }
1073
1074 #[test]
1075 fn round_trip_or_three_groups_returns_full_union() {
1076 let idx = three_level_fixture();
1077 let rows = run_query(idx.connection(), "level=error OR level=warn OR level=info");
1078 assert_eq!(rows.len(), 3);
1079 }
1080
1081 #[test]
1082 fn round_trip_or_does_not_double_count_overlapping_rows() {
1083 let idx = three_level_fixture();
1087 let rows = run_query(
1089 idx.connection(),
1090 r#"level=error OR message contains "boom""#,
1091 );
1092 assert_eq!(rows.len(), 1);
1093 assert_eq!(rows[0].level.as_deref(), Some("error"));
1094 }
1095
1096 #[test]
1097 fn round_trip_or_respects_and_precedence() {
1098 let mut idx = fixture();
1103 let mut warn = make_entry("2026-04-20T13:00:00Z", "warn", "almost full");
1104 warn.fields
1105 .insert("service".into(), Value::String("orders".into()));
1106 warn.fields.insert("req_id".into(), Value::from(400));
1107 idx.insert_batch(&[warn]).unwrap();
1108
1109 let rows = run_query(
1110 idx.connection(),
1111 "level=error AND service=payments OR level=warn",
1112 );
1113 assert_eq!(rows.len(), 2);
1115 let messages: HashSet<String> = rows
1116 .iter()
1117 .map(|e| e.message.clone().unwrap_or_default())
1118 .collect();
1119 assert!(messages.contains("payment failed"));
1120 assert!(messages.contains("almost full"));
1121 }
1122
1123 #[test]
1124 fn round_trip_or_with_time_range() {
1125 let idx = fixture();
1128 let now = Utc.with_ymd_and_hms(2026, 4, 20, 12, 30, 0).unwrap();
1129 let rows = run_query_at(idx.connection(), "level=error OR last 30m", now);
1130 assert_eq!(rows.len(), 2);
1133 }
1134
1135 #[test]
1136 fn round_trip_or_yields_results_ordered_newest_first() {
1137 let idx = three_level_fixture();
1139 let rows = run_query(idx.connection(), "level=error OR level=info");
1140 assert_eq!(rows.len(), 2);
1141 assert!(rows[0].timestamp > rows[1].timestamp);
1142 assert_eq!(rows[0].level.as_deref(), Some("info"));
1144 assert_eq!(rows[1].level.as_deref(), Some("error"));
1145 }
1146
1147 #[test]
1148 fn round_trip_or_with_zero_matches_in_one_group_still_returns_other() {
1149 let idx = three_level_fixture();
1150 let rows = run_query(idx.connection(), "level=fatal OR level=warn");
1152 assert_eq!(rows.len(), 1);
1153 assert_eq!(rows[0].level.as_deref(), Some("warn"));
1154 }
1155
1156 #[test]
1161 fn round_trip_paren_single_clause_same_result_as_unwrapped() {
1162 let idx = fixture();
1164 let paren = run_query(idx.connection(), "(level=error)");
1165 let plain = run_query(idx.connection(), "level=error");
1166 assert_eq!(paren.len(), plain.len());
1167 let paren_ts: HashSet<_> = paren.iter().map(|e| e.timestamp.clone()).collect();
1168 let plain_ts: HashSet<_> = plain.iter().map(|e| e.timestamp.clone()).collect();
1169 assert_eq!(paren_ts, plain_ts);
1170 }
1171
1172 #[test]
1173 fn round_trip_paren_or_inside_and_filters_correctly() {
1174 let idx = fixture();
1182 let rows = run_query(
1183 idx.connection(),
1184 "(level=error OR level=info) AND service=payments",
1185 );
1186 assert_eq!(rows.len(), 2);
1187 let messages: HashSet<String> = rows
1188 .iter()
1189 .map(|e| e.message.clone().unwrap_or_default())
1190 .collect();
1191 assert!(messages.contains("payment failed"));
1192 assert!(messages.contains("health check"));
1193 assert!(!messages.contains("timeout on db call"));
1194 }
1195
1196 #[test]
1197 fn round_trip_paren_changes_precedence_vs_no_paren() {
1198 let idx = fixture();
1211 let without_paren = run_query(
1212 idx.connection(),
1213 "level=error OR level=info AND service=payments",
1214 );
1215 let with_paren = run_query(
1216 idx.connection(),
1217 "(level=error OR level=info) AND service=payments",
1218 );
1219 assert_eq!(
1220 without_paren.len(),
1221 3,
1222 "no parens: all error rows + info/payments"
1223 );
1224 assert_eq!(with_paren.len(), 2, "parens: only payments-service rows");
1225 }
1226
1227 #[test]
1228 fn round_trip_nested_parens_execute_correctly() {
1229 let idx = fixture();
1232 let rows = run_query(
1233 idx.connection(),
1234 "((level=error) OR level=info) AND service=payments",
1235 );
1236 assert_eq!(rows.len(), 2);
1237 }
1238
1239 #[test]
1244 fn unsafe_field_name_is_rejected_at_executor() {
1245 let result = column_for_field("service; DROP TABLE log_entries--");
1246 assert!(matches!(result, Err(LogdiveError::UnsafeFieldName(_))));
1247 }
1248
1249 #[test]
1250 fn is_safe_json_path_segment_rejects_single_quote() {
1251 assert!(!is_safe_json_path_segment("service'"));
1252 }
1253
1254 #[test]
1255 fn is_safe_json_path_segment_rejects_space() {
1256 assert!(!is_safe_json_path_segment("ser vice"));
1257 }
1258
1259 #[test]
1260 fn is_safe_json_path_segment_rejects_empty_string() {
1261 assert!(!is_safe_json_path_segment(""));
1262 }
1263
1264 #[test]
1265 fn is_safe_json_path_segment_allows_dotted() {
1266 assert!(is_safe_json_path_segment("user.id"));
1267 }
1268
1269 #[test]
1270 fn column_for_field_rejects_hyphen_payload() {
1271 let err = column_for_field("svc-name").unwrap_err();
1274 assert!(matches!(err, LogdiveError::UnsafeFieldName(_)));
1275 }
1276
1277 #[test]
1278 fn column_for_field_rejects_unicode_non_ascii() {
1279 let err = column_for_field("svc\u{2019}").unwrap_err();
1282 assert!(matches!(err, LogdiveError::UnsafeFieldName(_)));
1283 }
1284
1285 #[test]
1290 fn since_accepts_naive_datetime_space_separator() {
1291 let idx = fixture();
1294 let rows = run_query(idx.connection(), r#"since "2026-04-20 11:00:00""#);
1296 assert_eq!(
1297 rows.len(),
1298 2,
1299 "space-separated naive datetime must filter rows"
1300 );
1301 }
1302
1303 #[test]
1304 fn since_boundary_row_at_cutoff_is_included() {
1305 let idx = fixture();
1308 let rows = run_query(idx.connection(), "since 2026-04-20T12:00:00Z");
1309 assert_eq!(rows.len(), 1);
1310 assert_eq!(rows[0].timestamp.as_deref(), Some("2026-04-20T12:00:00Z"));
1311 }
1312
1313 #[test]
1314 fn since_future_timestamp_returns_empty() {
1315 let idx = fixture();
1316 let rows = run_query(idx.connection(), "since 2030-01-01T00:00:00Z");
1317 assert!(rows.is_empty(), "future cutoff must return no rows");
1318 }
1319
1320 #[test]
1321 fn last_very_large_amount_saturates_to_epoch_returns_all_rows() {
1322 let idx = fixture();
1326 let rows = run_query(idx.connection(), "last 9999999999h");
1328 assert_eq!(rows.len(), 3, "epoch-saturated cutoff must match all rows");
1329 }
1330
1331 #[test]
1332 fn since_rfc3339_with_utc_offset_equivalent_to_z() {
1333 let idx = fixture();
1335 let rows_z = run_query(idx.connection(), "since 2026-04-20T11:00:00Z");
1336 let rows_offset = run_query(idx.connection(), r#"since "2026-04-20T11:00:00+00:00""#);
1337 assert_eq!(
1338 rows_z.len(),
1339 rows_offset.len(),
1340 "Z and +00:00 offsets must produce the same row count"
1341 );
1342 }
1343}