1use std::path::{Path, PathBuf};
29
30use rusqlite::{Connection, OpenFlags, params};
31
32use crate::entry::LogEntry;
33use crate::error::{LogdiveError, Result};
34
35pub const BATCH_SIZE: usize = 1000;
38
39const DEFAULT_DB_FILENAME: &str = "index.db";
40const LOGDIVE_HOME_DIRNAME: &str = ".logdive";
41
42pub fn db_path(override_path: Option<&Path>) -> PathBuf {
50 if let Some(p) = override_path {
51 return p.to_path_buf();
52 }
53 let home = std::env::var("HOME").unwrap_or_else(|_| ".".to_string());
57 PathBuf::from(home)
58 .join(LOGDIVE_HOME_DIRNAME)
59 .join(DEFAULT_DB_FILENAME)
60}
61
62#[derive(Debug, Default, Clone, Copy, PartialEq, Eq)]
65pub struct InsertStats {
66 pub inserted: usize,
68 pub deduplicated: usize,
71 pub skipped_no_timestamp: usize,
73}
74
75impl InsertStats {
76 fn extend(&mut self, other: InsertStats) {
77 self.inserted += other.inserted;
78 self.deduplicated += other.deduplicated;
79 self.skipped_no_timestamp += other.skipped_no_timestamp;
80 }
81}
82
83#[derive(Debug, Clone)]
99#[non_exhaustive]
100pub struct Stats {
101 pub entries: u64,
103 pub min_timestamp: Option<String>,
107 pub max_timestamp: Option<String>,
110 pub tags: Vec<Option<String>>,
114}
115
116#[derive(Debug)]
118pub struct Indexer {
119 conn: Connection,
120}
121
122impl Indexer {
123 pub fn open(path: &Path) -> Result<Self> {
128 ensure_parent_dir(path)?;
129 let conn = Connection::open(path)?;
130 init_schema(&conn)?;
131 Ok(Self { conn })
132 }
133
134 pub fn open_in_memory() -> Result<Self> {
137 let conn = Connection::open_in_memory()?;
138 init_schema(&conn)?;
139 Ok(Self { conn })
140 }
141
142 pub fn open_read_only(path: &Path) -> Result<Self> {
156 let flags = OpenFlags::SQLITE_OPEN_READ_ONLY | OpenFlags::SQLITE_OPEN_URI;
160 let conn = Connection::open_with_flags(path, flags)?;
161 Ok(Self { conn })
162 }
163
164 pub fn connection(&self) -> &Connection {
170 &self.conn
171 }
172
173 pub fn insert_batch(&mut self, entries: &[LogEntry]) -> Result<InsertStats> {
179 let mut total = InsertStats::default();
180 for chunk in entries.chunks(BATCH_SIZE) {
181 let stats = insert_one_chunk(&mut self.conn, chunk)?;
182 total.extend(stats);
183 }
184 Ok(total)
185 }
186
187 pub fn stats(&self) -> Result<Stats> {
197 let entries_i64: i64 =
199 self.conn
200 .query_row("SELECT COUNT(*) FROM log_entries", [], |row| row.get(0))?;
201 let entries = entries_i64 as u64;
202
203 let (min_timestamp, max_timestamp): (Option<String>, Option<String>) =
207 self.conn.query_row(
208 "SELECT MIN(timestamp), MAX(timestamp) FROM log_entries",
209 [],
210 |row| Ok((row.get(0)?, row.get(1)?)),
211 )?;
212
213 let mut stmt = self
216 .conn
217 .prepare("SELECT DISTINCT tag FROM log_entries ORDER BY tag")?;
218 let rows = stmt.query_map([], |row| row.get::<_, Option<String>>(0))?;
219 let mut tags: Vec<Option<String>> = Vec::new();
220 for row in rows {
221 tags.push(row?);
222 }
223
224 Ok(Stats {
225 entries,
226 min_timestamp,
227 max_timestamp,
228 tags,
229 })
230 }
231}
232
233fn ensure_parent_dir(path: &Path) -> Result<()> {
238 let Some(parent) = path.parent() else {
239 return Ok(());
240 };
241 if parent.as_os_str().is_empty() {
242 return Ok(());
244 }
245 std::fs::create_dir_all(parent).map_err(|io_err| LogdiveError::io_at(parent, io_err))
246}
247
248fn init_schema(conn: &Connection) -> Result<()> {
249 conn.execute_batch(
252 "CREATE TABLE IF NOT EXISTS log_entries (
253 id INTEGER PRIMARY KEY AUTOINCREMENT,
254 timestamp TEXT NOT NULL,
255 level TEXT,
256 message TEXT,
257 tag TEXT,
258 fields TEXT,
259 raw TEXT NOT NULL,
260 raw_hash TEXT NOT NULL UNIQUE,
261 ingested_at TEXT NOT NULL DEFAULT (datetime('now'))
262 );
263 CREATE INDEX IF NOT EXISTS idx_level ON log_entries(level);
264 CREATE INDEX IF NOT EXISTS idx_tag ON log_entries(tag);
265 CREATE INDEX IF NOT EXISTS idx_timestamp ON log_entries(timestamp);",
266 )?;
267 Ok(())
268}
269
270fn insert_one_chunk(conn: &mut Connection, entries: &[LogEntry]) -> Result<InsertStats> {
271 let tx = conn.transaction()?;
272 let mut stats = InsertStats::default();
273
274 {
275 let mut stmt = tx.prepare(
276 "INSERT OR IGNORE INTO log_entries
277 (timestamp, level, message, tag, fields, raw, raw_hash)
278 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)",
279 )?;
280
281 for entry in entries {
282 let Some(ref ts) = entry.timestamp else {
284 stats.skipped_no_timestamp += 1;
285 continue;
286 };
287
288 let fields_json = serde_json::to_string(&entry.fields)
291 .expect("serializing serde_json::Map<String, Value> is infallible");
292 let raw_hash = blake3::hash(entry.raw.as_bytes()).to_hex().to_string();
293
294 let changes = stmt.execute(params![
295 ts,
296 entry.level,
297 entry.message,
298 entry.tag,
299 fields_json,
300 entry.raw,
301 raw_hash,
302 ])?;
303
304 if changes == 0 {
305 stats.deduplicated += 1;
306 } else {
307 stats.inserted += 1;
308 }
309 }
310 }
311
312 tx.commit()?;
313 Ok(stats)
314}
315
316#[cfg(test)]
321mod tests {
322 use super::*;
323 use serde_json::json;
324
325 fn make_entry(ts: &str, level: &str, message: &str) -> LogEntry {
329 let raw = format!(r#"{{"timestamp":"{ts}","level":"{level}","message":"{message}"}}"#);
330 let mut e = LogEntry::new(raw);
331 e.timestamp = Some(ts.to_string());
332 e.level = Some(level.to_string());
333 e.message = Some(message.to_string());
334 e
335 }
336
337 #[test]
338 fn open_in_memory_creates_table_and_three_indexes() {
339 let idx = Indexer::open_in_memory().expect("open in-memory");
340 let table_count: i64 = idx
341 .connection()
342 .query_row(
343 "SELECT COUNT(*) FROM sqlite_master \
344 WHERE type='table' AND name='log_entries'",
345 [],
346 |row| row.get(0),
347 )
348 .unwrap();
349 assert_eq!(table_count, 1);
350
351 let index_count: i64 = idx
352 .connection()
353 .query_row(
354 "SELECT COUNT(*) FROM sqlite_master \
355 WHERE type='index' AND name IN ('idx_level','idx_tag','idx_timestamp')",
356 [],
357 |row| row.get(0),
358 )
359 .unwrap();
360 assert_eq!(index_count, 3);
361 }
362
363 #[test]
364 fn insert_batch_adds_rows_and_reports_stats() {
365 let mut idx = Indexer::open_in_memory().unwrap();
366 let entries = vec![
367 make_entry("2026-04-20T10:00:00Z", "info", "one"),
368 make_entry("2026-04-20T10:00:01Z", "error", "two"),
369 ];
370 let stats = idx.insert_batch(&entries).unwrap();
371
372 assert_eq!(stats.inserted, 2);
373 assert_eq!(stats.deduplicated, 0);
374 assert_eq!(stats.skipped_no_timestamp, 0);
375
376 let count: i64 = idx
377 .connection()
378 .query_row("SELECT COUNT(*) FROM log_entries", [], |row| row.get(0))
379 .unwrap();
380 assert_eq!(count, 2);
381 }
382
383 #[test]
384 fn reinsert_is_deduplicated_by_raw_hash() {
385 let mut idx = Indexer::open_in_memory().unwrap();
386 let entries = vec![make_entry("2026-04-20T10:00:00Z", "info", "hello")];
387
388 let first = idx.insert_batch(&entries).unwrap();
389 assert_eq!(first.inserted, 1);
390 assert_eq!(first.deduplicated, 0);
391
392 let second = idx.insert_batch(&entries).unwrap();
393 assert_eq!(second.inserted, 0);
394 assert_eq!(second.deduplicated, 1);
395
396 let count: i64 = idx
397 .connection()
398 .query_row("SELECT COUNT(*) FROM log_entries", [], |row| row.get(0))
399 .unwrap();
400 assert_eq!(count, 1);
401 }
402
403 #[test]
404 fn entries_without_timestamp_are_skipped_not_fabricated() {
405 let mut idx = Indexer::open_in_memory().unwrap();
406 let mut no_ts = LogEntry::new(r#"{"level":"info"}"#);
407 no_ts.level = Some("info".to_string());
408
409 let stats = idx.insert_batch(&[no_ts]).unwrap();
410 assert_eq!(stats.inserted, 0);
411 assert_eq!(stats.skipped_no_timestamp, 1);
412
413 let count: i64 = idx
414 .connection()
415 .query_row("SELECT COUNT(*) FROM log_entries", [], |row| row.get(0))
416 .unwrap();
417 assert_eq!(count, 0);
418 }
419
420 #[test]
421 fn mixed_batch_counts_each_outcome_category() {
422 let mut idx = Indexer::open_in_memory().unwrap();
423 idx.insert_batch(&[make_entry("2026-04-20T10:00:00Z", "info", "first")])
424 .unwrap();
425
426 let mut no_ts = LogEntry::new(r#"{"level":"warn"}"#);
427 no_ts.level = Some("warn".to_string());
428
429 let mixed = vec![
430 make_entry("2026-04-20T10:00:00Z", "info", "first"),
431 make_entry("2026-04-20T10:00:05Z", "error", "second"),
432 no_ts,
433 ];
434 let stats = idx.insert_batch(&mixed).unwrap();
435 assert_eq!(stats.inserted, 1);
436 assert_eq!(stats.deduplicated, 1);
437 assert_eq!(stats.skipped_no_timestamp, 1);
438 }
439
440 #[test]
441 fn fields_are_stored_as_json_queryable_via_json_extract() {
442 let mut idx = Indexer::open_in_memory().unwrap();
443 let mut e = make_entry("2026-04-20T10:00:00Z", "info", "hi");
444 e.fields.insert("service".to_string(), json!("payments"));
445 e.fields.insert("req_id".to_string(), json!(42));
446 idx.insert_batch(&[e]).unwrap();
447
448 let service: String = idx
449 .connection()
450 .query_row(
451 "SELECT json_extract(fields, '$.service') FROM log_entries",
452 [],
453 |row| row.get(0),
454 )
455 .unwrap();
456 assert_eq!(service, "payments");
457
458 let req_id: i64 = idx
459 .connection()
460 .query_row(
461 "SELECT json_extract(fields, '$.req_id') FROM log_entries",
462 [],
463 |row| row.get(0),
464 )
465 .unwrap();
466 assert_eq!(req_id, 42);
467 }
468
469 #[test]
470 fn empty_fields_round_trip_as_empty_json_object_not_null() {
471 let mut idx = Indexer::open_in_memory().unwrap();
472 idx.insert_batch(&[make_entry("2026-04-20T10:00:00Z", "info", "x")])
473 .unwrap();
474
475 let stored: String = idx
476 .connection()
477 .query_row("SELECT fields FROM log_entries", [], |row| row.get(0))
478 .unwrap();
479 assert_eq!(stored, "{}");
480 }
481
482 #[test]
483 fn raw_hash_is_a_64_char_hex_blake3_digest() {
484 let mut idx = Indexer::open_in_memory().unwrap();
485 idx.insert_batch(&[make_entry("2026-04-20T10:00:00Z", "info", "hash me")])
486 .unwrap();
487
488 let stored_hash: String = idx
489 .connection()
490 .query_row("SELECT raw_hash FROM log_entries", [], |row| row.get(0))
491 .unwrap();
492 assert_eq!(stored_hash.len(), 64);
493 assert!(stored_hash.chars().all(|c| c.is_ascii_hexdigit()));
494 }
495
496 #[test]
497 fn chunking_handles_batches_larger_than_batch_size() {
498 let mut idx = Indexer::open_in_memory().unwrap();
499 let total = BATCH_SIZE + 337;
500 let entries: Vec<_> = (0..total)
501 .map(|i| make_entry("2026-04-20T10:00:00Z", "info", &format!("message-{i}")))
502 .collect();
503
504 let stats = idx.insert_batch(&entries).unwrap();
505 assert_eq!(stats.inserted, total);
506 assert_eq!(stats.deduplicated, 0);
507
508 let count: i64 = idx
509 .connection()
510 .query_row("SELECT COUNT(*) FROM log_entries", [], |row| row.get(0))
511 .unwrap();
512 assert_eq!(count, total as i64);
513 }
514
515 #[test]
516 fn db_path_returns_override_verbatim() {
517 let p = Path::new("/tmp/logdive-test/override.db");
518 assert_eq!(
519 db_path(Some(p)),
520 PathBuf::from("/tmp/logdive-test/override.db")
521 );
522 }
523
524 #[test]
525 fn db_path_default_ends_with_standard_location() {
526 let default = db_path(None);
527 assert!(default.ends_with(".logdive/index.db"));
528 }
529
530 #[test]
531 fn open_creates_parent_directory_and_is_idempotent_across_opens() {
532 let dir = tempfile::tempdir().unwrap();
533 let db = dir.path().join("sub").join("dir").join("index.db");
534
535 {
536 let mut idx = Indexer::open(&db).expect("first open");
537 idx.insert_batch(&[make_entry("2026-04-20T10:00:00Z", "info", "persist me")])
538 .unwrap();
539 }
540
541 {
542 let idx = Indexer::open(&db).expect("second open");
543 let count: i64 = idx
544 .connection()
545 .query_row("SELECT COUNT(*) FROM log_entries", [], |row| row.get(0))
546 .unwrap();
547 assert_eq!(count, 1);
548 }
549 }
550
551 #[test]
552 fn io_error_variant_attaches_parent_path() {
553 let dir = tempfile::tempdir().unwrap();
557 let blocker = dir.path().join("blocker");
558 std::fs::write(&blocker, b"not a directory").unwrap();
559 let bad_db = blocker.join("child").join("index.db");
560
561 let err = Indexer::open(&bad_db).unwrap_err();
562 match err {
563 LogdiveError::Io { path, .. } => {
564 assert!(path.starts_with(dir.path()));
565 }
566 other => panic!("expected Io variant, got {other:?}"),
567 }
568 }
569
570 #[test]
575 fn stats_empty_database_returns_zeroed_values() {
576 let idx = Indexer::open_in_memory().unwrap();
577 let stats = idx.stats().unwrap();
578
579 assert_eq!(stats.entries, 0);
580 assert_eq!(stats.min_timestamp, None);
581 assert_eq!(stats.max_timestamp, None);
582 assert!(stats.tags.is_empty());
583 }
584
585 #[test]
586 fn stats_counts_entries() {
587 let mut idx = Indexer::open_in_memory().unwrap();
588 let entries: Vec<_> = (0..5)
589 .map(|i| make_entry("2026-04-20T10:00:00Z", "info", &format!("msg-{i}")))
590 .collect();
591 idx.insert_batch(&entries).unwrap();
592
593 let stats = idx.stats().unwrap();
594 assert_eq!(stats.entries, 5);
595 }
596
597 #[test]
598 fn stats_timestamp_range_uses_lexical_min_and_max() {
599 let mut idx = Indexer::open_in_memory().unwrap();
600 idx.insert_batch(&[
603 make_entry("2026-04-22T15:30:00Z", "error", "second"),
604 make_entry("2026-04-20T10:00:00Z", "info", "first"),
605 make_entry("2026-04-21T12:00:00Z", "warn", "third"),
606 ])
607 .unwrap();
608
609 let stats = idx.stats().unwrap();
610 assert_eq!(stats.min_timestamp.as_deref(), Some("2026-04-20T10:00:00Z"));
611 assert_eq!(stats.max_timestamp.as_deref(), Some("2026-04-22T15:30:00Z"));
612 }
613
614 #[test]
615 fn stats_distinct_tags_place_untagged_first_then_alphabetical() {
616 let mut idx = Indexer::open_in_memory().unwrap();
617
618 let untagged = make_entry("2026-04-20T10:00:00Z", "info", "untagged-msg");
620
621 let mut api1 = make_entry("2026-04-20T10:00:01Z", "info", "api-msg-1");
623 api1.tag = Some("api".to_string());
624 let mut api2 = make_entry("2026-04-20T10:00:02Z", "info", "api-msg-2");
625 api2.tag = Some("api".to_string());
626
627 let mut payments = make_entry("2026-04-20T10:00:03Z", "info", "payments-msg");
629 payments.tag = Some("payments".to_string());
630
631 idx.insert_batch(&[untagged, api1, api2, payments]).unwrap();
632
633 let stats = idx.stats().unwrap();
634 assert_eq!(stats.tags.len(), 3);
635 assert_eq!(stats.tags[0], None);
637 assert_eq!(stats.tags[1], Some("api".to_string()));
638 assert_eq!(stats.tags[2], Some("payments".to_string()));
639 }
640
641 #[test]
642 fn stats_entries_count_respects_dedup() {
643 let mut idx = Indexer::open_in_memory().unwrap();
644 idx.insert_batch(&[make_entry("2026-04-20T10:00:00Z", "info", "dup")])
646 .unwrap();
647 idx.insert_batch(&[make_entry("2026-04-20T10:00:00Z", "info", "dup")])
648 .unwrap();
649
650 let stats = idx.stats().unwrap();
651 assert_eq!(stats.entries, 1);
652 }
653
654 #[test]
655 fn stats_entries_count_excludes_timestamp_less_entries() {
656 let mut idx = Indexer::open_in_memory().unwrap();
657
658 let mut no_ts = LogEntry::new(r#"{"level":"info"}"#);
659 no_ts.level = Some("info".to_string());
660
661 idx.insert_batch(&[make_entry("2026-04-20T10:00:00Z", "info", "present"), no_ts])
662 .unwrap();
663
664 let stats = idx.stats().unwrap();
665 assert_eq!(stats.entries, 1);
666 }
667
668 #[test]
673 fn open_read_only_errors_when_file_is_missing() {
674 let dir = tempfile::tempdir().unwrap();
675 let missing = dir.path().join("does-not-exist.db");
676 let err = Indexer::open_read_only(&missing).unwrap_err();
677 assert!(matches!(err, LogdiveError::Sqlite(_)));
680 }
681
682 #[test]
683 fn open_read_only_can_read_existing_rows() {
684 let dir = tempfile::tempdir().unwrap();
685 let db = dir.path().join("ro.db");
686
687 {
689 let mut idx = Indexer::open(&db).unwrap();
690 idx.insert_batch(&[make_entry("2026-04-20T10:00:00Z", "info", "visible")])
691 .unwrap();
692 }
693
694 let ro = Indexer::open_read_only(&db).unwrap();
696 let count: i64 = ro
697 .connection()
698 .query_row("SELECT COUNT(*) FROM log_entries", [], |row| row.get(0))
699 .unwrap();
700 assert_eq!(count, 1);
701
702 let stats = ro.stats().unwrap();
703 assert_eq!(stats.entries, 1);
704 }
705
706 #[test]
707 fn open_read_only_rejects_writes_at_sqlite_level() {
708 let dir = tempfile::tempdir().unwrap();
709 let db = dir.path().join("ro-reject.db");
710
711 {
713 let _ = Indexer::open(&db).unwrap();
714 }
715
716 let ro = Indexer::open_read_only(&db).unwrap();
718 let result = ro.connection().execute(
719 "INSERT INTO log_entries (timestamp, raw, raw_hash) VALUES ('x', 'y', 'z')",
720 [],
721 );
722 assert!(result.is_err(), "read-only connection must reject writes");
723 }
724
725 #[test]
726 fn open_read_only_does_not_run_schema_migrations() {
727 let dir = tempfile::tempdir().unwrap();
732 let db = dir.path().join("bare.db");
733
734 {
736 let c = Connection::open(&db).unwrap();
737 c.execute_batch("PRAGMA user_version = 0;").unwrap();
739 }
740
741 let ro = Indexer::open_read_only(&db).expect("open ro on bare db");
743
744 let err = ro
746 .connection()
747 .query_row("SELECT COUNT(*) FROM log_entries", [], |row| {
748 row.get::<_, i64>(0)
749 });
750 assert!(err.is_err());
751 }
752}