1use chrono::{DateTime, NaiveDate, NaiveDateTime, TimeZone, Utc};
39use rusqlite::{Connection, params_from_iter, types::Value as SqlValue};
40use serde_json::{Map, Value};
41
42use crate::entry::LogEntry;
43use crate::error::{LogdiveError, Result};
44use crate::query::{AndGroup, Clause, Duration, QueryNode, QueryValue};
45
46pub fn execute(
56 query: &QueryNode,
57 conn: &Connection,
58 limit: Option<usize>,
59) -> Result<Vec<LogEntry>> {
60 let (sql, binds) = build_sql(query, limit, Utc::now())?;
61 run(conn, &sql, &binds)
62}
63
64pub fn execute_at(
68 query: &QueryNode,
69 conn: &Connection,
70 limit: Option<usize>,
71 now: DateTime<Utc>,
72) -> Result<Vec<LogEntry>> {
73 let (sql, binds) = build_sql(query, limit, now)?;
74 run(conn, &sql, &binds)
75}
76
77type Bind = SqlValue;
85
86fn build_sql(
87 query: &QueryNode,
88 limit: Option<usize>,
89 now: DateTime<Utc>,
90) -> Result<(String, Vec<Bind>)> {
91 let QueryNode::Or(groups) = query;
92
93 let mut group_sqls: Vec<String> = Vec::with_capacity(groups.len());
98 let mut binds: Vec<Bind> = Vec::new();
99
100 for group in groups {
101 let (group_sql, mut group_binds) = translate_and_group(group, now)?;
102 group_sqls.push(group_sql);
103 binds.append(&mut group_binds);
104 }
105
106 let where_sql = if group_sqls.is_empty() {
107 "1=1".to_string()
109 } else {
110 group_sqls.join(" OR ")
112 };
113
114 let mut sql = format!(
115 "SELECT timestamp, level, message, tag, fields, raw \
116 FROM log_entries \
117 WHERE {where_sql} \
118 ORDER BY timestamp DESC, id DESC"
119 );
120 if let Some(n) = limit {
121 sql.push_str(&format!(" LIMIT {n}"));
122 }
123 Ok((sql, binds))
124}
125
126fn translate_and_group(group: &AndGroup, now: DateTime<Utc>) -> Result<(String, Vec<Bind>)> {
132 let mut clause_sqls: Vec<String> = Vec::with_capacity(group.clauses.len());
133 let mut binds: Vec<Bind> = Vec::new();
134
135 for clause in &group.clauses {
136 let (sql, mut clause_binds) = translate_clause(clause, now)?;
137 clause_sqls.push(sql);
138 binds.append(&mut clause_binds);
139 }
140
141 let inner = if clause_sqls.is_empty() {
142 "1=1".to_string()
144 } else {
145 clause_sqls.join(" AND ")
146 };
147 Ok((format!("({inner})"), binds))
148}
149
150fn translate_clause(clause: &Clause, now: DateTime<Utc>) -> Result<(String, Vec<Bind>)> {
151 match clause {
152 Clause::Compare { field, op, value } => {
153 let column_expr = column_for_field(field)?;
154 let sql = format!("{column_expr} {op} ?");
155 Ok((sql, vec![value_to_bind(value)]))
156 }
157 Clause::Contains { field, value } => {
158 let column_expr = column_for_field(field)?;
159 let escaped = escape_like(value);
162 let pattern = format!("%{escaped}%");
163 let sql = format!("{column_expr} LIKE ? ESCAPE '\\'");
164 Ok((sql, vec![SqlValue::Text(pattern)]))
165 }
166 Clause::LastDuration(d) => {
167 let cutoff = compute_last_cutoff(*d, now);
168 Ok((
169 "timestamp >= ?".to_string(),
170 vec![SqlValue::Text(cutoff.to_rfc3339())],
171 ))
172 }
173 Clause::SinceDatetime(s) => {
174 let dt = parse_datetime(s)?;
175 Ok((
176 "timestamp >= ?".to_string(),
177 vec![SqlValue::Text(dt.to_rfc3339())],
178 ))
179 }
180 }
181}
182
183fn column_for_field(field: &str) -> Result<String> {
190 if LogEntry::KNOWN_KEYS.contains(&field) {
191 Ok(field.to_string())
192 } else {
193 if !is_safe_json_path_segment(field) {
194 return Err(LogdiveError::UnsafeFieldName(field.to_string()));
195 }
196 Ok(format!("json_extract(fields, '$.{field}')"))
197 }
198}
199
200fn is_safe_json_path_segment(s: &str) -> bool {
204 !s.is_empty()
205 && s.chars()
206 .next()
207 .map(|c| c.is_ascii_alphabetic() || c == '_')
208 .unwrap_or(false)
209 && s.chars()
210 .all(|c| c.is_ascii_alphanumeric() || c == '_' || c == '.')
211}
212
213fn value_to_bind(v: &QueryValue) -> Bind {
214 match v {
215 QueryValue::String(s) => SqlValue::Text(s.clone()),
216 QueryValue::Integer(n) => SqlValue::Integer(*n),
217 QueryValue::Float(f) => SqlValue::Real(*f),
218 QueryValue::Bool(b) => SqlValue::Integer(if *b { 1 } else { 0 }),
219 }
220}
221
222fn escape_like(input: &str) -> String {
225 let mut out = String::with_capacity(input.len());
226 for ch in input.chars() {
227 match ch {
228 '\\' | '%' | '_' => {
229 out.push('\\');
230 out.push(ch);
231 }
232 _ => out.push(ch),
233 }
234 }
235 out
236}
237
238fn compute_last_cutoff(d: Duration, now: DateTime<Utc>) -> DateTime<Utc> {
239 let amount_i64 = i64::try_from(d.amount).unwrap_or(i64::MAX);
242 let secs = amount_i64.saturating_mul(d.unit.seconds());
243 let delta = chrono::Duration::seconds(secs);
244 now.checked_sub_signed(delta).unwrap_or_else(|| {
245 Utc.timestamp_opt(0, 0)
246 .single()
247 .expect("unix epoch is valid")
248 })
249}
250
251fn parse_datetime(s: &str) -> Result<DateTime<Utc>> {
256 if let Ok(dt) = DateTime::parse_from_rfc3339(s) {
257 return Ok(dt.with_timezone(&Utc));
258 }
259 for fmt in &["%Y-%m-%dT%H:%M:%S", "%Y-%m-%d %H:%M:%S"] {
260 if let Ok(ndt) = NaiveDateTime::parse_from_str(s, fmt) {
261 return Ok(Utc.from_utc_datetime(&ndt));
262 }
263 }
264 if let Ok(nd) = NaiveDate::parse_from_str(s, "%Y-%m-%d") {
265 let ndt = nd.and_hms_opt(0, 0, 0).expect("00:00:00 is valid");
266 return Ok(Utc.from_utc_datetime(&ndt));
267 }
268 Err(LogdiveError::InvalidDatetime {
269 input: s.to_string(),
270 reason: "expected RFC3339, `YYYY-MM-DD HH:MM:SS`, or `YYYY-MM-DD`".to_string(),
271 })
272}
273
274fn run(conn: &Connection, sql: &str, binds: &[Bind]) -> Result<Vec<LogEntry>> {
279 let mut stmt = conn.prepare(sql)?;
280 let rows = stmt.query_map(params_from_iter(binds.iter()), |row| {
281 let timestamp: Option<String> = row.get(0)?;
282 let level: Option<String> = row.get(1)?;
283 let message: Option<String> = row.get(2)?;
284 let tag: Option<String> = row.get(3)?;
285 let fields_json: String = row.get(4)?;
286 let raw: String = row.get(5)?;
287 Ok((timestamp, level, message, tag, fields_json, raw))
290 })?;
291
292 let mut out = Vec::new();
293 for row in rows {
294 let (timestamp, level, message, tag, fields_json, raw) = row?;
295 let fields: Map<String, Value> =
296 serde_json::from_str(&fields_json).map_err(LogdiveError::CorruptFieldsJson)?;
297 out.push(LogEntry {
298 timestamp,
299 level,
300 message,
301 tag,
302 fields,
303 raw,
304 });
305 }
306 Ok(out)
307}
308
309#[cfg(test)]
314mod tests {
315 use super::*;
316 use crate::indexer::Indexer;
317 use crate::query::parse;
318 use std::collections::HashSet;
319
320 fn run_query(conn: &Connection, q: &str) -> Vec<LogEntry> {
323 let ast = parse(q).expect("test queries are well-formed");
324 execute(&ast, conn, None).expect("execute")
325 }
326
327 fn run_query_at(conn: &Connection, q: &str, now: DateTime<Utc>) -> Vec<LogEntry> {
328 let ast = parse(q).expect("test queries are well-formed");
329 execute_at(&ast, conn, None, now).expect("execute")
330 }
331
332 fn make_entry(ts: &str, level: &str, message: &str) -> LogEntry {
333 let raw = format!(r#"{{"timestamp":"{ts}","level":"{level}","message":"{message}"}}"#);
334 let mut e = LogEntry::new(raw);
335 e.timestamp = Some(ts.to_string());
336 e.level = Some(level.to_string());
337 e.message = Some(message.to_string());
338 e
339 }
340
341 fn fixture() -> Indexer {
342 let mut idx = Indexer::open_in_memory().unwrap();
343 let mut a = make_entry("2026-04-20T10:00:00Z", "error", "payment failed");
344 a.tag = Some("api".into());
345 a.fields
346 .insert("service".into(), Value::String("payments".into()));
347 a.fields.insert("req_id".into(), Value::from(100));
348
349 let mut b = make_entry("2026-04-20T11:00:00Z", "info", "health check");
350 b.tag = Some("api".into());
351 b.fields
352 .insert("service".into(), Value::String("payments".into()));
353 b.fields.insert("req_id".into(), Value::from(200));
354
355 let mut c = make_entry("2026-04-20T12:00:00Z", "error", "timeout on db call");
356 c.fields
357 .insert("service".into(), Value::String("users".into()));
358 c.fields.insert("req_id".into(), Value::from(300));
359
360 idx.insert_batch(&[a, b, c]).unwrap();
361 idx
362 }
363
364 fn three_level_fixture() -> Indexer {
368 let mut idx = Indexer::open_in_memory().unwrap();
369 let a = make_entry("2026-04-20T09:00:00Z", "error", "boom");
370 let b = make_entry("2026-04-20T10:00:00Z", "warn", "slow query");
371 let c = make_entry("2026-04-20T11:00:00Z", "info", "ok");
372 idx.insert_batch(&[a, b, c]).unwrap();
373 idx
374 }
375
376 #[test]
381 fn compare_on_known_field_binds_value_not_interpolates() {
382 let ast = parse("level=error").unwrap();
383 let (sql, binds) = build_sql(&ast, None, Utc::now()).unwrap();
384 assert!(sql.contains("WHERE (level = ?)"));
387 assert!(!sql.contains("error"));
388 assert_eq!(binds.len(), 1);
389 match &binds[0] {
390 SqlValue::Text(s) => assert_eq!(s, "error"),
391 other => panic!("expected text bind, got {other:?}"),
392 }
393 }
394
395 #[test]
396 fn compare_on_unknown_field_uses_json_extract() {
397 let ast = parse("service=payments").unwrap();
398 let (sql, binds) = build_sql(&ast, None, Utc::now()).unwrap();
399 assert!(sql.contains("json_extract(fields, '$.service')"));
400 assert_eq!(binds.len(), 1);
401 }
402
403 #[test]
404 fn contains_uses_like_with_escape_and_wildcards() {
405 let ast = parse(r#"message contains "timeout""#).unwrap();
406 let (sql, binds) = build_sql(&ast, None, Utc::now()).unwrap();
407 assert!(sql.contains("LIKE ? ESCAPE '\\'"));
408 match &binds[0] {
409 SqlValue::Text(s) => assert_eq!(s, "%timeout%"),
410 other => panic!("expected text bind, got {other:?}"),
411 }
412 }
413
414 #[test]
415 fn contains_escapes_like_metacharacters() {
416 let ast = parse(r#"message contains "50%""#).unwrap();
417 let (_, binds) = build_sql(&ast, None, Utc::now()).unwrap();
418 match &binds[0] {
419 SqlValue::Text(s) => assert_eq!(s, r"%50\%%"),
420 other => panic!("unexpected bind: {other:?}"),
421 }
422 }
423
424 #[test]
425 fn last_duration_produces_timestamp_lower_bound() {
426 let ast = parse("last 2h").unwrap();
427 let now = Utc.with_ymd_and_hms(2026, 4, 20, 12, 0, 0).unwrap();
428 let (sql, binds) = build_sql(&ast, None, now).unwrap();
429 assert!(sql.contains("timestamp >= ?"));
430 match &binds[0] {
431 SqlValue::Text(s) => assert!(s.starts_with("2026-04-20T10:00:00")),
432 other => panic!("unexpected bind: {other:?}"),
433 }
434 }
435
436 #[test]
437 fn since_accepts_rfc3339() {
438 let ast = parse(r#"since "2024-01-01T10:00:00Z""#).unwrap();
439 let (sql, binds) = build_sql(&ast, None, Utc::now()).unwrap();
440 assert!(sql.contains("timestamp >= ?"));
441 match &binds[0] {
442 SqlValue::Text(s) => assert!(s.starts_with("2024-01-01T10:00:00")),
443 other => panic!("unexpected: {other:?}"),
444 }
445 }
446
447 #[test]
448 fn since_accepts_bare_date() {
449 let ast = parse("since 2024-06-15").unwrap();
450 let (_, binds) = build_sql(&ast, None, Utc::now()).unwrap();
451 match &binds[0] {
452 SqlValue::Text(s) => assert!(s.starts_with("2024-06-15T00:00:00")),
453 other => panic!("unexpected: {other:?}"),
454 }
455 }
456
457 #[test]
458 fn since_rejects_garbage() {
459 let ast = parse("since not-a-date").unwrap();
460 let err = build_sql(&ast, None, Utc::now()).unwrap_err();
461 assert!(matches!(err, LogdiveError::InvalidDatetime { .. }));
462 }
463
464 #[test]
465 fn and_chain_joins_with_and_inside_a_single_group() {
466 let ast = parse("level=error AND service=payments").unwrap();
467 let (sql, binds) = build_sql(&ast, None, Utc::now()).unwrap();
468 assert!(sql.contains("level = ?"));
469 assert!(sql.contains("json_extract(fields, '$.service') = ?"));
470 assert!(sql.contains(" AND "));
472 assert!(!sql.contains(" OR "));
473 assert!(sql.contains("WHERE (level = ? AND json_extract(fields, '$.service') = ?)"));
475 assert_eq!(binds.len(), 2);
476 match (&binds[0], &binds[1]) {
477 (SqlValue::Text(a), SqlValue::Text(b)) => {
478 assert_eq!(a, "error");
479 assert_eq!(b, "payments");
480 }
481 other => panic!("unexpected binds: {other:?}"),
482 }
483 }
484
485 #[test]
486 fn integer_binds_as_integer_not_text() {
487 let ast = parse("req_id > 100").unwrap();
488 let (_, binds) = build_sql(&ast, None, Utc::now()).unwrap();
489 match &binds[0] {
490 SqlValue::Integer(n) => assert_eq!(*n, 100),
491 other => panic!("expected integer bind, got {other:?}"),
492 }
493 }
494
495 #[test]
496 fn bool_binds_as_integer_zero_or_one() {
497 let ast = parse("ok=true").unwrap();
498 let (_, binds) = build_sql(&ast, None, Utc::now()).unwrap();
499 assert!(matches!(binds[0], SqlValue::Integer(1)));
500
501 let ast = parse("ok=false").unwrap();
502 let (_, binds) = build_sql(&ast, None, Utc::now()).unwrap();
503 assert!(matches!(binds[0], SqlValue::Integer(0)));
504 }
505
506 #[test]
507 fn float_binds_as_real() {
508 let ast = parse("duration < 1.5").unwrap();
509 let (_, binds) = build_sql(&ast, None, Utc::now()).unwrap();
510 match &binds[0] {
511 SqlValue::Real(f) => assert!((f - 1.5).abs() < 1e-9),
512 other => panic!("expected real bind, got {other:?}"),
513 }
514 }
515
516 #[test]
517 fn limit_appends_limit_clause() {
518 let ast = parse("level=error").unwrap();
519 let (sql, _) = build_sql(&ast, Some(50), Utc::now()).unwrap();
520 assert!(sql.ends_with("LIMIT 50"));
521 }
522
523 #[test]
528 fn or_emits_two_parenthesized_groups_joined_by_or() {
529 let ast = parse("level=error OR level=warn").unwrap();
530 let (sql, binds) = build_sql(&ast, None, Utc::now()).unwrap();
531 assert!(sql.contains("WHERE (level = ?) OR (level = ?)"));
533 match (&binds[0], &binds[1]) {
535 (SqlValue::Text(a), SqlValue::Text(b)) => {
536 assert_eq!(a, "error");
537 assert_eq!(b, "warn");
538 }
539 other => panic!("unexpected binds: {other:?}"),
540 }
541 }
542
543 #[test]
544 fn or_with_three_groups_joins_with_two_or_keywords() {
545 let ast = parse("level=error OR level=warn OR level=fatal").unwrap();
546 let (sql, _binds) = build_sql(&ast, None, Utc::now()).unwrap();
547 assert_eq!(sql.matches(" OR ").count(), 2);
549 assert!(sql.contains("(level = ?) OR (level = ?) OR (level = ?)"));
552 }
553
554 #[test]
555 fn or_with_and_inside_each_group_emits_correct_shape() {
556 let ast = parse("level=error AND service=payments OR level=warn").unwrap();
558 let (sql, binds) = build_sql(&ast, None, Utc::now()).unwrap();
559 assert!(sql.contains(
560 "WHERE (level = ? AND json_extract(fields, '$.service') = ?) OR (level = ?)"
561 ));
562 assert_eq!(binds.len(), 3);
564 match (&binds[0], &binds[1], &binds[2]) {
565 (SqlValue::Text(a), SqlValue::Text(b), SqlValue::Text(c)) => {
566 assert_eq!(a, "error");
567 assert_eq!(b, "payments");
568 assert_eq!(c, "warn");
569 }
570 other => panic!("unexpected binds: {other:?}"),
571 }
572 }
573
574 #[test]
575 fn or_with_and_on_both_sides_preserves_bind_ordering() {
576 let ast = parse("a=1 AND b=2 OR c=3 AND d=4").unwrap();
579 let (sql, binds) = build_sql(&ast, None, Utc::now()).unwrap();
580 assert_eq!(sql.matches(" OR ").count(), 1);
581 assert_eq!(binds.len(), 4);
582 match (&binds[0], &binds[1], &binds[2], &binds[3]) {
583 (
584 SqlValue::Integer(a),
585 SqlValue::Integer(b),
586 SqlValue::Integer(c),
587 SqlValue::Integer(d),
588 ) => {
589 assert_eq!(*a, 1);
590 assert_eq!(*b, 2);
591 assert_eq!(*c, 3);
592 assert_eq!(*d, 4);
593 }
594 other => panic!("unexpected binds: {other:?}"),
595 }
596 }
597
598 #[test]
599 fn or_with_mixed_clause_kinds_in_each_group() {
600 let ast = parse(r#"level=error OR message contains "timeout" OR last 30m"#).unwrap();
603 let now = Utc.with_ymd_and_hms(2026, 4, 20, 12, 0, 0).unwrap();
604 let (sql, binds) = build_sql(&ast, None, now).unwrap();
605 assert_eq!(sql.matches(" OR ").count(), 2);
606 assert_eq!(binds.len(), 3);
607
608 assert!(matches!(&binds[0], SqlValue::Text(s) if s == "error"));
610 assert!(matches!(&binds[1], SqlValue::Text(s) if s == "%timeout%"));
612 match &binds[2] {
614 SqlValue::Text(s) => assert!(s.starts_with("2026-04-20T11:30:00")),
615 other => panic!("expected text cutoff, got {other:?}"),
616 }
617 }
618
619 #[test]
620 fn limit_applies_to_or_query_too() {
621 let ast = parse("level=error OR level=warn").unwrap();
622 let (sql, _) = build_sql(&ast, Some(25), Utc::now()).unwrap();
623 assert!(sql.ends_with("LIMIT 25"));
624 assert!(sql.contains(" ORDER BY "));
626 }
627
628 #[test]
633 fn round_trip_known_field_equality() {
634 let idx = fixture();
635 let rows = run_query(idx.connection(), "level=error");
636 assert_eq!(rows.len(), 2);
637 assert!(rows.iter().all(|e| e.level.as_deref() == Some("error")));
638 }
639
640 #[test]
641 fn round_trip_unknown_field_via_json_extract() {
642 let idx = fixture();
643 let rows = run_query(idx.connection(), "service=payments");
644 assert_eq!(rows.len(), 2);
645 assert!(
646 rows.iter()
647 .all(|e| e.fields.get("service") == Some(&Value::String("payments".into())))
648 );
649 }
650
651 #[test]
652 fn round_trip_and_chain() {
653 let idx = fixture();
654 let rows = run_query(idx.connection(), "level=error AND service=payments");
655 assert_eq!(rows.len(), 1);
656 assert_eq!(rows[0].message.as_deref(), Some("payment failed"));
657 }
658
659 #[test]
660 fn round_trip_contains_substring_match() {
661 let idx = fixture();
662 let rows = run_query(idx.connection(), r#"message contains "timeout""#);
663 assert_eq!(rows.len(), 1);
664 assert!(rows[0].message.as_deref().unwrap().contains("timeout"));
665 }
666
667 #[test]
668 fn round_trip_numeric_comparison_on_json_field() {
669 let idx = fixture();
670 let rows = run_query(idx.connection(), "req_id > 150");
671 assert_eq!(rows.len(), 2);
672 let ids: HashSet<i64> = rows
673 .iter()
674 .map(|e| e.fields.get("req_id").and_then(|v| v.as_i64()).unwrap())
675 .collect();
676 assert_eq!(ids, HashSet::from([200, 300]));
677 }
678
679 #[test]
680 fn round_trip_last_duration_uses_now() {
681 let idx = fixture();
682 let now = Utc.with_ymd_and_hms(2026, 4, 20, 13, 0, 0).unwrap();
683 let rows = run_query_at(idx.connection(), "last 3h", now);
684 assert_eq!(rows.len(), 3);
685
686 let rows = run_query_at(idx.connection(), "last 70m", now);
687 assert_eq!(rows.len(), 1);
688 assert_eq!(rows[0].timestamp.as_deref(), Some("2026-04-20T12:00:00Z"));
689 }
690
691 #[test]
692 fn round_trip_since_datetime() {
693 let idx = fixture();
694 let rows = run_query(idx.connection(), "since 2026-04-20T11:00:00Z");
695 assert_eq!(rows.len(), 2);
696 }
697
698 #[test]
699 fn round_trip_results_ordered_newest_first() {
700 let idx = fixture();
701 let rows = run_query(idx.connection(), "level=error");
702 assert!(rows[0].timestamp > rows[1].timestamp);
703 }
704
705 #[test]
706 fn round_trip_not_equal_operator() {
707 let idx = fixture();
708 let rows = run_query(idx.connection(), "level!=error");
709 assert_eq!(rows.len(), 1);
710 assert_eq!(rows[0].level.as_deref(), Some("info"));
711 }
712
713 #[test]
714 fn round_trip_contains_with_wildcard_character_is_literal() {
715 let mut idx = Indexer::open_in_memory().unwrap();
716 let a = make_entry("2026-04-20T10:00:00Z", "info", "discount 50% today");
717 let b = make_entry("2026-04-20T11:00:00Z", "info", "no special char here");
718 idx.insert_batch(&[a, b]).unwrap();
719
720 let rows = run_query(idx.connection(), r#"message contains "50%""#);
721 assert_eq!(rows.len(), 1);
722 assert!(rows[0].message.as_deref().unwrap().contains("50%"));
723 }
724
725 #[test]
726 fn round_trip_empty_result_is_empty_vec_not_error() {
727 let idx = fixture();
728 let rows = run_query(idx.connection(), "level=nonsense");
729 assert!(rows.is_empty());
730 }
731
732 #[test]
733 fn round_trip_reconstructs_fields_map() {
734 let idx = fixture();
735 let rows = run_query(idx.connection(), "level=error AND service=payments");
736 assert_eq!(rows.len(), 1);
737 let e = &rows[0];
738 assert_eq!(
739 e.fields.get("service"),
740 Some(&Value::String("payments".into()))
741 );
742 assert_eq!(e.fields.get("req_id").and_then(|v| v.as_i64()), Some(100));
743 }
744
745 #[test]
750 fn round_trip_or_two_groups_returns_union() {
751 let idx = three_level_fixture();
752 let rows = run_query(idx.connection(), "level=error OR level=warn");
753 assert_eq!(rows.len(), 2);
754
755 let levels: HashSet<String> = rows
756 .iter()
757 .map(|e| e.level.clone().unwrap_or_default())
758 .collect();
759 assert_eq!(
760 levels,
761 HashSet::from(["error".to_string(), "warn".to_string()])
762 );
763 }
764
765 #[test]
766 fn round_trip_or_three_groups_returns_full_union() {
767 let idx = three_level_fixture();
768 let rows = run_query(idx.connection(), "level=error OR level=warn OR level=info");
769 assert_eq!(rows.len(), 3);
770 }
771
772 #[test]
773 fn round_trip_or_does_not_double_count_overlapping_rows() {
774 let idx = three_level_fixture();
778 let rows = run_query(
780 idx.connection(),
781 r#"level=error OR message contains "boom""#,
782 );
783 assert_eq!(rows.len(), 1);
784 assert_eq!(rows[0].level.as_deref(), Some("error"));
785 }
786
787 #[test]
788 fn round_trip_or_respects_and_precedence() {
789 let mut idx = fixture();
794 let mut warn = make_entry("2026-04-20T13:00:00Z", "warn", "almost full");
795 warn.fields
796 .insert("service".into(), Value::String("orders".into()));
797 warn.fields.insert("req_id".into(), Value::from(400));
798 idx.insert_batch(&[warn]).unwrap();
799
800 let rows = run_query(
801 idx.connection(),
802 "level=error AND service=payments OR level=warn",
803 );
804 assert_eq!(rows.len(), 2);
806 let messages: HashSet<String> = rows
807 .iter()
808 .map(|e| e.message.clone().unwrap_or_default())
809 .collect();
810 assert!(messages.contains("payment failed"));
811 assert!(messages.contains("almost full"));
812 }
813
814 #[test]
815 fn round_trip_or_with_time_range() {
816 let idx = fixture();
819 let now = Utc.with_ymd_and_hms(2026, 4, 20, 12, 30, 0).unwrap();
820 let rows = run_query_at(idx.connection(), "level=error OR last 30m", now);
821 assert_eq!(rows.len(), 2);
824 }
825
826 #[test]
827 fn round_trip_or_yields_results_ordered_newest_first() {
828 let idx = three_level_fixture();
830 let rows = run_query(idx.connection(), "level=error OR level=info");
831 assert_eq!(rows.len(), 2);
832 assert!(rows[0].timestamp > rows[1].timestamp);
833 assert_eq!(rows[0].level.as_deref(), Some("info"));
835 assert_eq!(rows[1].level.as_deref(), Some("error"));
836 }
837
838 #[test]
839 fn round_trip_or_with_zero_matches_in_one_group_still_returns_other() {
840 let idx = three_level_fixture();
841 let rows = run_query(idx.connection(), "level=fatal OR level=warn");
843 assert_eq!(rows.len(), 1);
844 assert_eq!(rows[0].level.as_deref(), Some("warn"));
845 }
846
847 #[test]
852 fn unsafe_field_name_is_rejected_at_executor() {
853 let result = column_for_field("service; DROP TABLE log_entries--");
854 assert!(matches!(result, Err(LogdiveError::UnsafeFieldName(_))));
855 }
856}