1use chrono::{DateTime, NaiveDate, NaiveDateTime, TimeZone, Utc};
23use rusqlite::{Connection, params_from_iter, types::Value as SqlValue};
24use serde_json::{Map, Value};
25
26use crate::entry::LogEntry;
27use crate::error::{LogdiveError, Result};
28use crate::query::{Clause, Duration, QueryNode, QueryValue};
29
30pub fn execute(
40 query: &QueryNode,
41 conn: &Connection,
42 limit: Option<usize>,
43) -> Result<Vec<LogEntry>> {
44 let (sql, binds) = build_sql(query, limit, Utc::now())?;
45 run(conn, &sql, &binds)
46}
47
48pub fn execute_at(
52 query: &QueryNode,
53 conn: &Connection,
54 limit: Option<usize>,
55 now: DateTime<Utc>,
56) -> Result<Vec<LogEntry>> {
57 let (sql, binds) = build_sql(query, limit, now)?;
58 run(conn, &sql, &binds)
59}
60
61type Bind = SqlValue;
69
70fn build_sql(
71 query: &QueryNode,
72 limit: Option<usize>,
73 now: DateTime<Utc>,
74) -> Result<(String, Vec<Bind>)> {
75 let QueryNode::And(clauses) = query;
76
77 let mut where_parts: Vec<String> = Vec::with_capacity(clauses.len());
78 let mut binds: Vec<Bind> = Vec::with_capacity(clauses.len());
79
80 for clause in clauses {
81 let (sql, mut clause_binds) = translate_clause(clause, now)?;
82 where_parts.push(sql);
83 binds.append(&mut clause_binds);
84 }
85
86 let where_sql = if where_parts.is_empty() {
87 "1=1".to_string()
90 } else {
91 where_parts.join(" AND ")
92 };
93
94 let mut sql = format!(
95 "SELECT timestamp, level, message, tag, fields, raw \
96 FROM log_entries \
97 WHERE {where_sql} \
98 ORDER BY timestamp DESC, id DESC"
99 );
100 if let Some(n) = limit {
101 sql.push_str(&format!(" LIMIT {n}"));
102 }
103 Ok((sql, binds))
104}
105
106fn translate_clause(clause: &Clause, now: DateTime<Utc>) -> Result<(String, Vec<Bind>)> {
107 match clause {
108 Clause::Compare { field, op, value } => {
109 let column_expr = column_for_field(field)?;
110 let sql = format!("{column_expr} {op} ?");
111 Ok((sql, vec![value_to_bind(value)]))
112 }
113 Clause::Contains { field, value } => {
114 let column_expr = column_for_field(field)?;
115 let escaped = escape_like(value);
118 let pattern = format!("%{escaped}%");
119 let sql = format!("{column_expr} LIKE ? ESCAPE '\\'");
120 Ok((sql, vec![SqlValue::Text(pattern)]))
121 }
122 Clause::LastDuration(d) => {
123 let cutoff = compute_last_cutoff(*d, now);
124 Ok((
125 "timestamp >= ?".to_string(),
126 vec![SqlValue::Text(cutoff.to_rfc3339())],
127 ))
128 }
129 Clause::SinceDatetime(s) => {
130 let dt = parse_datetime(s)?;
131 Ok((
132 "timestamp >= ?".to_string(),
133 vec![SqlValue::Text(dt.to_rfc3339())],
134 ))
135 }
136 }
137}
138
139fn column_for_field(field: &str) -> Result<String> {
146 if LogEntry::KNOWN_KEYS.contains(&field) {
147 Ok(field.to_string())
148 } else {
149 if !is_safe_json_path_segment(field) {
150 return Err(LogdiveError::UnsafeFieldName(field.to_string()));
151 }
152 Ok(format!("json_extract(fields, '$.{field}')"))
153 }
154}
155
156fn is_safe_json_path_segment(s: &str) -> bool {
160 !s.is_empty()
161 && s.chars()
162 .next()
163 .map(|c| c.is_ascii_alphabetic() || c == '_')
164 .unwrap_or(false)
165 && s.chars()
166 .all(|c| c.is_ascii_alphanumeric() || c == '_' || c == '.')
167}
168
169fn value_to_bind(v: &QueryValue) -> Bind {
170 match v {
171 QueryValue::String(s) => SqlValue::Text(s.clone()),
172 QueryValue::Integer(n) => SqlValue::Integer(*n),
173 QueryValue::Float(f) => SqlValue::Real(*f),
174 QueryValue::Bool(b) => SqlValue::Integer(if *b { 1 } else { 0 }),
175 }
176}
177
178fn escape_like(input: &str) -> String {
181 let mut out = String::with_capacity(input.len());
182 for ch in input.chars() {
183 match ch {
184 '\\' | '%' | '_' => {
185 out.push('\\');
186 out.push(ch);
187 }
188 _ => out.push(ch),
189 }
190 }
191 out
192}
193
194fn compute_last_cutoff(d: Duration, now: DateTime<Utc>) -> DateTime<Utc> {
195 let amount_i64 = i64::try_from(d.amount).unwrap_or(i64::MAX);
198 let secs = amount_i64.saturating_mul(d.unit.seconds());
199 let delta = chrono::Duration::seconds(secs);
200 now.checked_sub_signed(delta).unwrap_or_else(|| {
201 Utc.timestamp_opt(0, 0)
202 .single()
203 .expect("unix epoch is valid")
204 })
205}
206
207fn parse_datetime(s: &str) -> Result<DateTime<Utc>> {
212 if let Ok(dt) = DateTime::parse_from_rfc3339(s) {
213 return Ok(dt.with_timezone(&Utc));
214 }
215 for fmt in &["%Y-%m-%dT%H:%M:%S", "%Y-%m-%d %H:%M:%S"] {
216 if let Ok(ndt) = NaiveDateTime::parse_from_str(s, fmt) {
217 return Ok(Utc.from_utc_datetime(&ndt));
218 }
219 }
220 if let Ok(nd) = NaiveDate::parse_from_str(s, "%Y-%m-%d") {
221 let ndt = nd.and_hms_opt(0, 0, 0).expect("00:00:00 is valid");
222 return Ok(Utc.from_utc_datetime(&ndt));
223 }
224 Err(LogdiveError::InvalidDatetime {
225 input: s.to_string(),
226 reason: "expected RFC3339, `YYYY-MM-DD HH:MM:SS`, or `YYYY-MM-DD`".to_string(),
227 })
228}
229
230fn run(conn: &Connection, sql: &str, binds: &[Bind]) -> Result<Vec<LogEntry>> {
235 let mut stmt = conn.prepare(sql)?;
236 let rows = stmt.query_map(params_from_iter(binds.iter()), |row| {
237 let timestamp: Option<String> = row.get(0)?;
238 let level: Option<String> = row.get(1)?;
239 let message: Option<String> = row.get(2)?;
240 let tag: Option<String> = row.get(3)?;
241 let fields_json: String = row.get(4)?;
242 let raw: String = row.get(5)?;
243 Ok((timestamp, level, message, tag, fields_json, raw))
246 })?;
247
248 let mut out = Vec::new();
249 for row in rows {
250 let (timestamp, level, message, tag, fields_json, raw) = row?;
251 let fields: Map<String, Value> =
252 serde_json::from_str(&fields_json).map_err(LogdiveError::CorruptFieldsJson)?;
253 out.push(LogEntry {
254 timestamp,
255 level,
256 message,
257 tag,
258 fields,
259 raw,
260 });
261 }
262 Ok(out)
263}
264
265#[cfg(test)]
270mod tests {
271 use super::*;
272 use crate::indexer::Indexer;
273 use crate::query::parse;
274 use std::collections::HashSet;
275
276 fn run_query(conn: &Connection, q: &str) -> Vec<LogEntry> {
279 let ast = parse(q).expect("test queries are well-formed");
280 execute(&ast, conn, None).expect("execute")
281 }
282
283 fn run_query_at(conn: &Connection, q: &str, now: DateTime<Utc>) -> Vec<LogEntry> {
284 let ast = parse(q).expect("test queries are well-formed");
285 execute_at(&ast, conn, None, now).expect("execute")
286 }
287
288 fn make_entry(ts: &str, level: &str, message: &str) -> LogEntry {
289 let raw = format!(r#"{{"timestamp":"{ts}","level":"{level}","message":"{message}"}}"#);
290 let mut e = LogEntry::new(raw);
291 e.timestamp = Some(ts.to_string());
292 e.level = Some(level.to_string());
293 e.message = Some(message.to_string());
294 e
295 }
296
297 fn fixture() -> Indexer {
298 let mut idx = Indexer::open_in_memory().unwrap();
299 let mut a = make_entry("2026-04-20T10:00:00Z", "error", "payment failed");
300 a.tag = Some("api".into());
301 a.fields
302 .insert("service".into(), Value::String("payments".into()));
303 a.fields.insert("req_id".into(), Value::from(100));
304
305 let mut b = make_entry("2026-04-20T11:00:00Z", "info", "health check");
306 b.tag = Some("api".into());
307 b.fields
308 .insert("service".into(), Value::String("payments".into()));
309 b.fields.insert("req_id".into(), Value::from(200));
310
311 let mut c = make_entry("2026-04-20T12:00:00Z", "error", "timeout on db call");
312 c.fields
313 .insert("service".into(), Value::String("users".into()));
314 c.fields.insert("req_id".into(), Value::from(300));
315
316 idx.insert_batch(&[a, b, c]).unwrap();
317 idx
318 }
319
320 #[test]
323 fn compare_on_known_field_binds_value_not_interpolates() {
324 let ast = parse("level=error").unwrap();
325 let (sql, binds) = build_sql(&ast, None, Utc::now()).unwrap();
326 assert!(sql.contains("WHERE level = ?"));
327 assert!(!sql.contains("error"));
328 assert_eq!(binds.len(), 1);
329 match &binds[0] {
330 SqlValue::Text(s) => assert_eq!(s, "error"),
331 other => panic!("expected text bind, got {other:?}"),
332 }
333 }
334
335 #[test]
336 fn compare_on_unknown_field_uses_json_extract() {
337 let ast = parse("service=payments").unwrap();
338 let (sql, binds) = build_sql(&ast, None, Utc::now()).unwrap();
339 assert!(sql.contains("json_extract(fields, '$.service')"));
340 assert_eq!(binds.len(), 1);
341 }
342
343 #[test]
344 fn contains_uses_like_with_escape_and_wildcards() {
345 let ast = parse(r#"message contains "timeout""#).unwrap();
346 let (sql, binds) = build_sql(&ast, None, Utc::now()).unwrap();
347 assert!(sql.contains("LIKE ? ESCAPE '\\'"));
348 match &binds[0] {
349 SqlValue::Text(s) => assert_eq!(s, "%timeout%"),
350 other => panic!("expected text bind, got {other:?}"),
351 }
352 }
353
354 #[test]
355 fn contains_escapes_like_metacharacters() {
356 let ast = parse(r#"message contains "50%""#).unwrap();
357 let (_, binds) = build_sql(&ast, None, Utc::now()).unwrap();
358 match &binds[0] {
359 SqlValue::Text(s) => assert_eq!(s, r"%50\%%"),
360 other => panic!("unexpected bind: {other:?}"),
361 }
362 }
363
364 #[test]
365 fn last_duration_produces_timestamp_lower_bound() {
366 let ast = parse("last 2h").unwrap();
367 let now = Utc.with_ymd_and_hms(2026, 4, 20, 12, 0, 0).unwrap();
368 let (sql, binds) = build_sql(&ast, None, now).unwrap();
369 assert!(sql.contains("timestamp >= ?"));
370 match &binds[0] {
371 SqlValue::Text(s) => assert!(s.starts_with("2026-04-20T10:00:00")),
372 other => panic!("unexpected bind: {other:?}"),
373 }
374 }
375
376 #[test]
377 fn since_accepts_rfc3339() {
378 let ast = parse(r#"since "2024-01-01T10:00:00Z""#).unwrap();
379 let (sql, binds) = build_sql(&ast, None, Utc::now()).unwrap();
380 assert!(sql.contains("timestamp >= ?"));
381 match &binds[0] {
382 SqlValue::Text(s) => assert!(s.starts_with("2024-01-01T10:00:00")),
383 other => panic!("unexpected: {other:?}"),
384 }
385 }
386
387 #[test]
388 fn since_accepts_bare_date() {
389 let ast = parse("since 2024-06-15").unwrap();
390 let (_, binds) = build_sql(&ast, None, Utc::now()).unwrap();
391 match &binds[0] {
392 SqlValue::Text(s) => assert!(s.starts_with("2024-06-15T00:00:00")),
393 other => panic!("unexpected: {other:?}"),
394 }
395 }
396
397 #[test]
398 fn since_rejects_garbage() {
399 let ast = parse("since not-a-date").unwrap();
400 let err = build_sql(&ast, None, Utc::now()).unwrap_err();
401 assert!(matches!(err, LogdiveError::InvalidDatetime { .. }));
402 }
403
404 #[test]
405 fn and_chain_joins_with_and_and_preserves_bind_order() {
406 let ast = parse("level=error AND service=payments").unwrap();
407 let (sql, binds) = build_sql(&ast, None, Utc::now()).unwrap();
408 assert!(sql.contains("level = ?"));
409 assert!(sql.contains("json_extract(fields, '$.service') = ?"));
410 assert!(sql.contains(" AND "));
411 assert_eq!(binds.len(), 2);
412 match (&binds[0], &binds[1]) {
413 (SqlValue::Text(a), SqlValue::Text(b)) => {
414 assert_eq!(a, "error");
415 assert_eq!(b, "payments");
416 }
417 other => panic!("unexpected binds: {other:?}"),
418 }
419 }
420
421 #[test]
422 fn integer_binds_as_integer_not_text() {
423 let ast = parse("req_id > 100").unwrap();
424 let (_, binds) = build_sql(&ast, None, Utc::now()).unwrap();
425 match &binds[0] {
426 SqlValue::Integer(n) => assert_eq!(*n, 100),
427 other => panic!("expected integer bind, got {other:?}"),
428 }
429 }
430
431 #[test]
432 fn bool_binds_as_integer_zero_or_one() {
433 let ast = parse("ok=true").unwrap();
434 let (_, binds) = build_sql(&ast, None, Utc::now()).unwrap();
435 assert!(matches!(binds[0], SqlValue::Integer(1)));
436
437 let ast = parse("ok=false").unwrap();
438 let (_, binds) = build_sql(&ast, None, Utc::now()).unwrap();
439 assert!(matches!(binds[0], SqlValue::Integer(0)));
440 }
441
442 #[test]
443 fn float_binds_as_real() {
444 let ast = parse("duration < 1.5").unwrap();
445 let (_, binds) = build_sql(&ast, None, Utc::now()).unwrap();
446 match &binds[0] {
447 SqlValue::Real(f) => assert!((f - 1.5).abs() < 1e-9),
448 other => panic!("expected real bind, got {other:?}"),
449 }
450 }
451
452 #[test]
453 fn limit_appends_limit_clause() {
454 let ast = parse("level=error").unwrap();
455 let (sql, _) = build_sql(&ast, Some(50), Utc::now()).unwrap();
456 assert!(sql.ends_with("LIMIT 50"));
457 }
458
459 #[test]
462 fn round_trip_known_field_equality() {
463 let idx = fixture();
464 let rows = run_query(idx.connection(), "level=error");
465 assert_eq!(rows.len(), 2);
466 assert!(rows.iter().all(|e| e.level.as_deref() == Some("error")));
467 }
468
469 #[test]
470 fn round_trip_unknown_field_via_json_extract() {
471 let idx = fixture();
472 let rows = run_query(idx.connection(), "service=payments");
473 assert_eq!(rows.len(), 2);
474 assert!(
475 rows.iter()
476 .all(|e| e.fields.get("service") == Some(&Value::String("payments".into())))
477 );
478 }
479
480 #[test]
481 fn round_trip_and_chain() {
482 let idx = fixture();
483 let rows = run_query(idx.connection(), "level=error AND service=payments");
484 assert_eq!(rows.len(), 1);
485 assert_eq!(rows[0].message.as_deref(), Some("payment failed"));
486 }
487
488 #[test]
489 fn round_trip_contains_substring_match() {
490 let idx = fixture();
491 let rows = run_query(idx.connection(), r#"message contains "timeout""#);
492 assert_eq!(rows.len(), 1);
493 assert!(rows[0].message.as_deref().unwrap().contains("timeout"));
494 }
495
496 #[test]
497 fn round_trip_numeric_comparison_on_json_field() {
498 let idx = fixture();
499 let rows = run_query(idx.connection(), "req_id > 150");
500 assert_eq!(rows.len(), 2);
501 let ids: HashSet<i64> = rows
502 .iter()
503 .map(|e| e.fields.get("req_id").and_then(|v| v.as_i64()).unwrap())
504 .collect();
505 assert_eq!(ids, HashSet::from([200, 300]));
506 }
507
508 #[test]
509 fn round_trip_last_duration_uses_now() {
510 let idx = fixture();
511 let now = Utc.with_ymd_and_hms(2026, 4, 20, 13, 0, 0).unwrap();
512 let rows = run_query_at(idx.connection(), "last 3h", now);
513 assert_eq!(rows.len(), 3);
514
515 let rows = run_query_at(idx.connection(), "last 70m", now);
516 assert_eq!(rows.len(), 1);
517 assert_eq!(rows[0].timestamp.as_deref(), Some("2026-04-20T12:00:00Z"));
518 }
519
520 #[test]
521 fn round_trip_since_datetime() {
522 let idx = fixture();
523 let rows = run_query(idx.connection(), "since 2026-04-20T11:00:00Z");
524 assert_eq!(rows.len(), 2);
525 }
526
527 #[test]
528 fn round_trip_results_ordered_newest_first() {
529 let idx = fixture();
530 let rows = run_query(idx.connection(), "level=error");
531 assert!(rows[0].timestamp > rows[1].timestamp);
532 }
533
534 #[test]
535 fn round_trip_not_equal_operator() {
536 let idx = fixture();
537 let rows = run_query(idx.connection(), "level!=error");
538 assert_eq!(rows.len(), 1);
539 assert_eq!(rows[0].level.as_deref(), Some("info"));
540 }
541
542 #[test]
543 fn round_trip_contains_with_wildcard_character_is_literal() {
544 let mut idx = Indexer::open_in_memory().unwrap();
545 let a = make_entry("2026-04-20T10:00:00Z", "info", "discount 50% today");
546 let b = make_entry("2026-04-20T11:00:00Z", "info", "no special char here");
547 idx.insert_batch(&[a, b]).unwrap();
548
549 let rows = run_query(idx.connection(), r#"message contains "50%""#);
550 assert_eq!(rows.len(), 1);
551 assert!(rows[0].message.as_deref().unwrap().contains("50%"));
552 }
553
554 #[test]
555 fn round_trip_empty_result_is_empty_vec_not_error() {
556 let idx = fixture();
557 let rows = run_query(idx.connection(), "level=nonsense");
558 assert!(rows.is_empty());
559 }
560
561 #[test]
562 fn round_trip_reconstructs_fields_map() {
563 let idx = fixture();
564 let rows = run_query(idx.connection(), "level=error AND service=payments");
565 assert_eq!(rows.len(), 1);
566 let e = &rows[0];
567 assert_eq!(
568 e.fields.get("service"),
569 Some(&Value::String("payments".into()))
570 );
571 assert_eq!(e.fields.get("req_id").and_then(|v| v.as_i64()), Some(100));
572 }
573
574 #[test]
577 fn unsafe_field_name_is_rejected_at_executor() {
578 let result = column_for_field("service; DROP TABLE log_entries--");
579 assert!(matches!(result, Err(LogdiveError::UnsafeFieldName(_))));
580 }
581}