Skip to main content

logdive_core/
indexer.rs

1//! SQLite-backed index for ingested log entries.
2//!
3//! This module owns the persistent storage side of logdive: schema creation,
4//! row-level deduplication via `blake3`, batched inserts of 1000 rows per
5//! transaction (per the decisions log entry dated 2026-04-19), and time-based
6//! retention via [`Indexer::prune`]. The schema is reproduced verbatim from
7//! the project doc's "SQLite schema" section with `IF NOT EXISTS` added so
8//! opening an existing database is idempotent.
9//!
10//! `Indexer` is an owning handle over a `rusqlite::Connection`. It can be
11//! constructed against a filesystem path via [`Indexer::open`] or against an
12//! in-memory database via [`Indexer::open_in_memory`] — the latter is used
13//! by the unit tests below and will also serve ad-hoc one-shot scenarios.
14//! For read-only consumers (the HTTP API in milestone 8), [`Indexer::
15//! open_read_only`] opens an existing database without the schema init or
16//! directory-creation side effects of [`Indexer::open`], and enforces
17//! read-only semantics at the SQLite level via `SQLITE_OPEN_READ_ONLY`.
18//!
19//! # Timestamp NOT NULL policy
20//!
21//! The schema declares `timestamp TEXT NOT NULL`, but the parser produces
22//! `LogEntry::timestamp = None` for lines that omit the key. Rather than
23//! fabricating a fallback (which would falsely anchor those rows to
24//! ingestion time and confuse `last Nh` queries), the indexer *skips* such
25//! rows and reports them in [`InsertStats::skipped_no_timestamp`]. This
26//! mirrors the parser's "graceful skip" philosophy — bad data is counted
27//! and dropped, never manufactured.
28
29use std::path::{Path, PathBuf};
30
31use rusqlite::{Connection, OpenFlags, params};
32
33use crate::entry::LogEntry;
34use crate::error::{LogdiveError, Result};
35
36/// Size of a single insert transaction, per the decisions log
37/// (2026-04-19: "batch insert per 1000 lines").
38pub const BATCH_SIZE: usize = 1000;
39
40const DEFAULT_DB_FILENAME: &str = "index.db";
41const LOGDIVE_HOME_DIRNAME: &str = ".logdive";
42
43/// Resolve the path to the index database.
44///
45/// When `override_path` is `Some`, it is used verbatim — this is what the
46/// CLI's `--db` flag wires into. Otherwise the default `~/.logdive/index.db`
47/// is returned per the "Default index location" decision in the project doc.
48///
49/// Purely functional: does not touch the filesystem.
50pub fn db_path(override_path: Option<&Path>) -> PathBuf {
51    if let Some(p) = override_path {
52        return p.to_path_buf();
53    }
54    // POSIX-centric: logdive's Phase 4 release targets are Linux and macOS,
55    // both of which expose HOME. Fall back to CWD if it is unset (containers,
56    // stripped CI environments) rather than panicking.
57    let home = std::env::var("HOME").unwrap_or_else(|_| ".".to_string());
58    PathBuf::from(home)
59        .join(LOGDIVE_HOME_DIRNAME)
60        .join(DEFAULT_DB_FILENAME)
61}
62
63/// Outcome of an insert batch, surfaced to the CLI for progress output
64/// ("lines ingested / lines skipped per second", per milestone 6).
65#[derive(Debug, Default, Clone, Copy, PartialEq, Eq)]
66pub struct InsertStats {
67    /// Rows newly added to the index.
68    pub inserted: usize,
69    /// Rows rejected by `INSERT OR IGNORE` because their `raw_hash` already
70    /// existed — the dedup path per the decisions log.
71    pub deduplicated: usize,
72    /// Rows rejected because they had no `timestamp`. See module docs.
73    pub skipped_no_timestamp: usize,
74}
75
76impl InsertStats {
77    fn extend(&mut self, other: InsertStats) {
78        self.inserted += other.inserted;
79        self.deduplicated += other.deduplicated;
80        self.skipped_no_timestamp += other.skipped_no_timestamp;
81    }
82}
83
84/// Outcome of a [`Indexer::prune`] operation, surfaced to the CLI's `prune`
85/// subcommand for its completion summary.
86///
87/// Marked `#[non_exhaustive]` so later milestones can add fields (e.g. bytes
88/// reclaimed by the `VACUUM`) without breaking the public API.
89#[derive(Debug, Default, Clone, Copy, PartialEq, Eq)]
90#[non_exhaustive]
91pub struct PruneStats {
92    /// Number of rows deleted by the prune.
93    pub deleted: u64,
94}
95
96/// Aggregate metadata about the contents of an index.
97///
98/// Produced by [`Indexer::stats`] and consumed by the CLI `stats` subcommand
99/// (milestone 7) and the `GET /stats` HTTP endpoint (milestone 8). The shape
100/// is intentionally minimal and structural; the CLI and HTTP layers format
101/// it for human or machine consumption.
102///
103/// `tags` ordering: `None` (untagged rows) first, then non-null tag strings
104/// in ascending alphabetical order. This ordering is produced directly by
105/// SQLite (`ORDER BY tag` places NULL first in ascending order) and is not
106/// re-sorted in Rust. The CLI renders the `None` slot as "(untagged)".
107///
108/// Marked `#[non_exhaustive]` so additional summary fields (e.g. distinct
109/// level counts) can be added in later milestones without breaking the
110/// public API.
111#[derive(Debug, Clone)]
112#[non_exhaustive]
113pub struct Stats {
114    /// Total number of rows currently in the `log_entries` table.
115    pub entries: u64,
116    /// Lexically smallest `timestamp` value in the index, or `None` on an
117    /// empty database. Lexical ordering is correct for ISO-8601 timestamps;
118    /// see the "live design decisions" section of the project handoff.
119    pub min_timestamp: Option<String>,
120    /// Lexically largest `timestamp` value in the index, or `None` on an
121    /// empty database.
122    pub max_timestamp: Option<String>,
123    /// Distinct tag values observed across all rows. `None` represents rows
124    /// with no tag (SQL NULL) and — when present — is always the first
125    /// element; non-null tags follow in ascending alphabetical order.
126    pub tags: Vec<Option<String>>,
127}
128
129/// Owning handle over a SQLite connection to a logdive index.
130#[derive(Debug)]
131pub struct Indexer {
132    conn: Connection,
133}
134
135impl Indexer {
136    /// Open (or create) a logdive index at `path`.
137    ///
138    /// Creates the parent directory if it does not already exist, opens the
139    /// SQLite database, and runs idempotent schema migrations.
140    pub fn open(path: &Path) -> Result<Self> {
141        ensure_parent_dir(path)?;
142        let conn = Connection::open(path)?;
143        init_schema(&conn)?;
144        Ok(Self { conn })
145    }
146
147    /// Open an in-memory index. Used by tests; also usable for one-shot
148    /// scenarios that don't need persistence.
149    pub fn open_in_memory() -> Result<Self> {
150        let conn = Connection::open_in_memory()?;
151        init_schema(&conn)?;
152        Ok(Self { conn })
153    }
154
155    /// Open an existing logdive index at `path` in read-only mode.
156    ///
157    /// Unlike [`Indexer::open`], this method:
158    ///   1. Does **not** create the database file if it is missing (the
159    ///      `SQLITE_OPEN_READ_ONLY` flag fails rather than creates),
160    ///   2. Does **not** create the parent directory,
161    ///   3. Does **not** run schema migrations — the caller is promising
162    ///      that `path` already points at a valid logdive index.
163    ///
164    /// Enforcement of read-only semantics is at the SQLite level: any
165    /// attempted write through the returned connection raises a runtime
166    /// error. This is defense-in-depth for the HTTP API (milestone 8),
167    /// whose surface is exclusively read.
168    pub fn open_read_only(path: &Path) -> Result<Self> {
169        // `SQLITE_OPEN_URI` is included because it's the safe default
170        // documented by rusqlite; it only affects parsing of `file:...`
171        // URIs, which we never pass in.
172        let flags = OpenFlags::SQLITE_OPEN_READ_ONLY | OpenFlags::SQLITE_OPEN_URI;
173        let conn = Connection::open_with_flags(path, flags)?;
174        Ok(Self { conn })
175    }
176
177    /// Borrow the underlying connection.
178    ///
179    /// Exposed so the query executor can run reads without an extra
180    /// abstraction layer. Read-only borrow keeps ingestion and querying
181    /// from contending over `&mut`.
182    pub fn connection(&self) -> &Connection {
183        &self.conn
184    }
185
186    /// Insert a slice of entries into the index, chunking internally into
187    /// transactions of [`BATCH_SIZE`] rows each.
188    ///
189    /// Returns aggregate stats across all chunks. Entry ordering within
190    /// the index is not guaranteed.
191    pub fn insert_batch(&mut self, entries: &[LogEntry]) -> Result<InsertStats> {
192        let mut total = InsertStats::default();
193        for chunk in entries.chunks(BATCH_SIZE) {
194            let stats = insert_one_chunk(&mut self.conn, chunk)?;
195            total.extend(stats);
196        }
197        Ok(total)
198    }
199
200    /// Delete every entry whose `timestamp` is strictly older than `cutoff`,
201    /// then `VACUUM` to reclaim the freed disk space.
202    ///
203    /// `cutoff` is compared lexically against the stored `timestamp` TEXT
204    /// column. This is correct for ISO-8601 / RFC3339 timestamps, which sort
205    /// chronologically as text — the same comparison contract the query
206    /// executor's `last` / `since` clauses rely on. A non-ISO-shaped cutoff
207    /// (or non-ISO timestamps in the index) will compare incorrectly, the
208    /// same known limitation that applies to time-range queries.
209    ///
210    /// The comparison is strict `<`: a row whose timestamp exactly equals
211    /// `cutoff` is **kept**, not deleted.
212    ///
213    /// Returns the number of rows deleted in [`PruneStats::deleted`].
214    ///
215    /// # VACUUM and transactions
216    ///
217    /// SQLite refuses to run `VACUUM` inside an explicit transaction, so this
218    /// method issues the `DELETE` and the `VACUUM` as two separate autocommit
219    /// statements rather than wrapping them in `conn.transaction()`. The
220    /// `DELETE` is a single statement and therefore atomic on its own; a
221    /// crash between the two would leave the rows deleted but the file not
222    /// yet compacted — harmless, since any later `VACUUM` reclaims the space.
223    pub fn prune(&mut self, cutoff: &str) -> Result<PruneStats> {
224        let deleted = self.conn.execute(
225            "DELETE FROM log_entries WHERE timestamp < ?1",
226            params![cutoff],
227        )?;
228        // VACUUM cannot run inside a transaction — issue it on its own.
229        self.conn.execute_batch("VACUUM")?;
230        Ok(PruneStats {
231            deleted: deleted as u64,
232        })
233    }
234
235    /// Read aggregate metadata about the index.
236    ///
237    /// Runs three read-only queries:
238    /// 1. `COUNT(*)` for the row count,
239    /// 2. `MIN(timestamp), MAX(timestamp)` for the time range,
240    /// 3. `SELECT DISTINCT tag ... ORDER BY tag` for the tag list.
241    ///
242    /// On an empty database, returns `entries = 0`, both timestamp bounds
243    /// as `None`, and an empty `tags` vector — not an error.
244    pub fn stats(&self) -> Result<Stats> {
245        // COUNT(*) is always non-negative; cast i64 → u64 is well-defined.
246        let entries_i64: i64 =
247            self.conn
248                .query_row("SELECT COUNT(*) FROM log_entries", [], |row| row.get(0))?;
249        let entries = entries_i64 as u64;
250
251        // Aggregates without GROUP BY always yield exactly one row; MIN/MAX
252        // on an empty table return (NULL, NULL), which maps cleanly to
253        // (None, None) via rusqlite's Option<T> FromSql impl.
254        let (min_timestamp, max_timestamp): (Option<String>, Option<String>) =
255            self.conn.query_row(
256                "SELECT MIN(timestamp), MAX(timestamp) FROM log_entries",
257                [],
258                |row| Ok((row.get(0)?, row.get(1)?)),
259            )?;
260
261        // SQLite's `ORDER BY tag` (default ascending) places NULLs first,
262        // which is exactly the ordering contract advertised on `Stats.tags`.
263        let mut stmt = self
264            .conn
265            .prepare("SELECT DISTINCT tag FROM log_entries ORDER BY tag")?;
266        let rows = stmt.query_map([], |row| row.get::<_, Option<String>>(0))?;
267        let mut tags: Vec<Option<String>> = Vec::new();
268        for row in rows {
269            tags.push(row?);
270        }
271
272        Ok(Stats {
273            entries,
274            min_timestamp,
275            max_timestamp,
276            tags,
277        })
278    }
279}
280
281// ---------------------------------------------------------------------------
282// Internals
283// ---------------------------------------------------------------------------
284
285fn ensure_parent_dir(path: &Path) -> Result<()> {
286    let Some(parent) = path.parent() else {
287        return Ok(());
288    };
289    if parent.as_os_str().is_empty() {
290        // Relative filename with no directory component ("index.db").
291        return Ok(());
292    }
293    std::fs::create_dir_all(parent).map_err(|io_err| LogdiveError::io_at(parent, io_err))
294}
295
296fn init_schema(conn: &Connection) -> Result<()> {
297    // Schema taken verbatim from the project doc's "SQLite schema" section,
298    // with `IF NOT EXISTS` added on every statement so open() is idempotent.
299    conn.execute_batch(
300        "CREATE TABLE IF NOT EXISTS log_entries (
301            id          INTEGER PRIMARY KEY AUTOINCREMENT,
302            timestamp   TEXT NOT NULL,
303            level       TEXT,
304            message     TEXT,
305            tag         TEXT,
306            fields      TEXT,
307            raw         TEXT NOT NULL,
308            raw_hash    TEXT NOT NULL UNIQUE,
309            ingested_at TEXT NOT NULL DEFAULT (datetime('now'))
310        );
311        CREATE INDEX IF NOT EXISTS idx_level     ON log_entries(level);
312        CREATE INDEX IF NOT EXISTS idx_tag       ON log_entries(tag);
313        CREATE INDEX IF NOT EXISTS idx_timestamp ON log_entries(timestamp);",
314    )?;
315    Ok(())
316}
317
318fn insert_one_chunk(conn: &mut Connection, entries: &[LogEntry]) -> Result<InsertStats> {
319    let tx = conn.transaction()?;
320    let mut stats = InsertStats::default();
321
322    {
323        let mut stmt = tx.prepare(
324            "INSERT OR IGNORE INTO log_entries
325             (timestamp, level, message, tag, fields, raw, raw_hash)
326             VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)",
327        )?;
328
329        for entry in entries {
330            // NOT NULL enforcement — see module-level docs.
331            let Some(ref ts) = entry.timestamp else {
332                stats.skipped_no_timestamp += 1;
333                continue;
334            };
335
336            // Serializing a `Map<String, Value>` via serde_json is infallible:
337            // every `Value` variant has a defined JSON representation.
338            let fields_json = serde_json::to_string(&entry.fields)
339                .expect("serializing serde_json::Map<String, Value> is infallible");
340            let raw_hash = blake3::hash(entry.raw.as_bytes()).to_hex().to_string();
341
342            let changes = stmt.execute(params![
343                ts,
344                entry.level,
345                entry.message,
346                entry.tag,
347                fields_json,
348                entry.raw,
349                raw_hash,
350            ])?;
351
352            if changes == 0 {
353                stats.deduplicated += 1;
354            } else {
355                stats.inserted += 1;
356            }
357        }
358    }
359
360    tx.commit()?;
361    Ok(stats)
362}
363
364// ---------------------------------------------------------------------------
365// Tests
366// ---------------------------------------------------------------------------
367
368#[cfg(test)]
369mod tests {
370    use super::*;
371    use serde_json::json;
372
373    /// Build a LogEntry whose `raw` is unique per input tuple, guaranteeing
374    /// a distinct `raw_hash` across calls (critical for the chunking test
375    /// where we insert thousands of entries).
376    fn make_entry(ts: &str, level: &str, message: &str) -> LogEntry {
377        let raw = format!(r#"{{"timestamp":"{ts}","level":"{level}","message":"{message}"}}"#);
378        let mut e = LogEntry::new(raw);
379        e.timestamp = Some(ts.to_string());
380        e.level = Some(level.to_string());
381        e.message = Some(message.to_string());
382        e
383    }
384
385    #[test]
386    fn open_in_memory_creates_table_and_three_indexes() {
387        let idx = Indexer::open_in_memory().expect("open in-memory");
388        let table_count: i64 = idx
389            .connection()
390            .query_row(
391                "SELECT COUNT(*) FROM sqlite_master \
392                 WHERE type='table' AND name='log_entries'",
393                [],
394                |row| row.get(0),
395            )
396            .unwrap();
397        assert_eq!(table_count, 1);
398
399        let index_count: i64 = idx
400            .connection()
401            .query_row(
402                "SELECT COUNT(*) FROM sqlite_master \
403                 WHERE type='index' AND name IN ('idx_level','idx_tag','idx_timestamp')",
404                [],
405                |row| row.get(0),
406            )
407            .unwrap();
408        assert_eq!(index_count, 3);
409    }
410
411    #[test]
412    fn insert_batch_adds_rows_and_reports_stats() {
413        let mut idx = Indexer::open_in_memory().unwrap();
414        let entries = vec![
415            make_entry("2026-04-20T10:00:00Z", "info", "one"),
416            make_entry("2026-04-20T10:00:01Z", "error", "two"),
417        ];
418        let stats = idx.insert_batch(&entries).unwrap();
419
420        assert_eq!(stats.inserted, 2);
421        assert_eq!(stats.deduplicated, 0);
422        assert_eq!(stats.skipped_no_timestamp, 0);
423
424        let count: i64 = idx
425            .connection()
426            .query_row("SELECT COUNT(*) FROM log_entries", [], |row| row.get(0))
427            .unwrap();
428        assert_eq!(count, 2);
429    }
430
431    #[test]
432    fn reinsert_is_deduplicated_by_raw_hash() {
433        let mut idx = Indexer::open_in_memory().unwrap();
434        let entries = vec![make_entry("2026-04-20T10:00:00Z", "info", "hello")];
435
436        let first = idx.insert_batch(&entries).unwrap();
437        assert_eq!(first.inserted, 1);
438        assert_eq!(first.deduplicated, 0);
439
440        let second = idx.insert_batch(&entries).unwrap();
441        assert_eq!(second.inserted, 0);
442        assert_eq!(second.deduplicated, 1);
443
444        let count: i64 = idx
445            .connection()
446            .query_row("SELECT COUNT(*) FROM log_entries", [], |row| row.get(0))
447            .unwrap();
448        assert_eq!(count, 1);
449    }
450
451    #[test]
452    fn entries_without_timestamp_are_skipped_not_fabricated() {
453        let mut idx = Indexer::open_in_memory().unwrap();
454        let mut no_ts = LogEntry::new(r#"{"level":"info"}"#);
455        no_ts.level = Some("info".to_string());
456
457        let stats = idx.insert_batch(&[no_ts]).unwrap();
458        assert_eq!(stats.inserted, 0);
459        assert_eq!(stats.skipped_no_timestamp, 1);
460
461        let count: i64 = idx
462            .connection()
463            .query_row("SELECT COUNT(*) FROM log_entries", [], |row| row.get(0))
464            .unwrap();
465        assert_eq!(count, 0);
466    }
467
468    #[test]
469    fn mixed_batch_counts_each_outcome_category() {
470        let mut idx = Indexer::open_in_memory().unwrap();
471        idx.insert_batch(&[make_entry("2026-04-20T10:00:00Z", "info", "first")])
472            .unwrap();
473
474        let mut no_ts = LogEntry::new(r#"{"level":"warn"}"#);
475        no_ts.level = Some("warn".to_string());
476
477        let mixed = vec![
478            make_entry("2026-04-20T10:00:00Z", "info", "first"),
479            make_entry("2026-04-20T10:00:05Z", "error", "second"),
480            no_ts,
481        ];
482        let stats = idx.insert_batch(&mixed).unwrap();
483        assert_eq!(stats.inserted, 1);
484        assert_eq!(stats.deduplicated, 1);
485        assert_eq!(stats.skipped_no_timestamp, 1);
486    }
487
488    #[test]
489    fn fields_are_stored_as_json_queryable_via_json_extract() {
490        let mut idx = Indexer::open_in_memory().unwrap();
491        let mut e = make_entry("2026-04-20T10:00:00Z", "info", "hi");
492        e.fields.insert("service".to_string(), json!("payments"));
493        e.fields.insert("req_id".to_string(), json!(42));
494        idx.insert_batch(&[e]).unwrap();
495
496        let service: String = idx
497            .connection()
498            .query_row(
499                "SELECT json_extract(fields, '$.service') FROM log_entries",
500                [],
501                |row| row.get(0),
502            )
503            .unwrap();
504        assert_eq!(service, "payments");
505
506        let req_id: i64 = idx
507            .connection()
508            .query_row(
509                "SELECT json_extract(fields, '$.req_id') FROM log_entries",
510                [],
511                |row| row.get(0),
512            )
513            .unwrap();
514        assert_eq!(req_id, 42);
515    }
516
517    #[test]
518    fn empty_fields_round_trip_as_empty_json_object_not_null() {
519        let mut idx = Indexer::open_in_memory().unwrap();
520        idx.insert_batch(&[make_entry("2026-04-20T10:00:00Z", "info", "x")])
521            .unwrap();
522
523        let stored: String = idx
524            .connection()
525            .query_row("SELECT fields FROM log_entries", [], |row| row.get(0))
526            .unwrap();
527        assert_eq!(stored, "{}");
528    }
529
530    #[test]
531    fn raw_hash_is_a_64_char_hex_blake3_digest() {
532        let mut idx = Indexer::open_in_memory().unwrap();
533        idx.insert_batch(&[make_entry("2026-04-20T10:00:00Z", "info", "hash me")])
534            .unwrap();
535
536        let stored_hash: String = idx
537            .connection()
538            .query_row("SELECT raw_hash FROM log_entries", [], |row| row.get(0))
539            .unwrap();
540        assert_eq!(stored_hash.len(), 64);
541        assert!(stored_hash.chars().all(|c| c.is_ascii_hexdigit()));
542    }
543
544    #[test]
545    fn chunking_handles_batches_larger_than_batch_size() {
546        let mut idx = Indexer::open_in_memory().unwrap();
547        let total = BATCH_SIZE + 337;
548        let entries: Vec<_> = (0..total)
549            .map(|i| make_entry("2026-04-20T10:00:00Z", "info", &format!("message-{i}")))
550            .collect();
551
552        let stats = idx.insert_batch(&entries).unwrap();
553        assert_eq!(stats.inserted, total);
554        assert_eq!(stats.deduplicated, 0);
555
556        let count: i64 = idx
557            .connection()
558            .query_row("SELECT COUNT(*) FROM log_entries", [], |row| row.get(0))
559            .unwrap();
560        assert_eq!(count, total as i64);
561    }
562
563    #[test]
564    fn db_path_returns_override_verbatim() {
565        let p = Path::new("/tmp/logdive-test/override.db");
566        assert_eq!(
567            db_path(Some(p)),
568            PathBuf::from("/tmp/logdive-test/override.db")
569        );
570    }
571
572    #[test]
573    fn db_path_default_ends_with_standard_location() {
574        let default = db_path(None);
575        assert!(default.ends_with(".logdive/index.db"));
576    }
577
578    #[test]
579    fn open_creates_parent_directory_and_is_idempotent_across_opens() {
580        let dir = tempfile::tempdir().unwrap();
581        let db = dir.path().join("sub").join("dir").join("index.db");
582
583        {
584            let mut idx = Indexer::open(&db).expect("first open");
585            idx.insert_batch(&[make_entry("2026-04-20T10:00:00Z", "info", "persist me")])
586                .unwrap();
587        }
588
589        {
590            let idx = Indexer::open(&db).expect("second open");
591            let count: i64 = idx
592                .connection()
593                .query_row("SELECT COUNT(*) FROM log_entries", [], |row| row.get(0))
594                .unwrap();
595            assert_eq!(count, 1);
596        }
597    }
598
599    #[test]
600    fn io_error_variant_attaches_parent_path() {
601        // If the parent directory cannot be created (e.g. because it lives
602        // under a regular file), we should get LogdiveError::Io with the
603        // offending path, not a SqliteFailure.
604        let dir = tempfile::tempdir().unwrap();
605        let blocker = dir.path().join("blocker");
606        std::fs::write(&blocker, b"not a directory").unwrap();
607        let bad_db = blocker.join("child").join("index.db");
608
609        let err = Indexer::open(&bad_db).unwrap_err();
610        match err {
611            LogdiveError::Io { path, .. } => {
612                assert!(path.starts_with(dir.path()));
613            }
614            other => panic!("expected Io variant, got {other:?}"),
615        }
616    }
617
618    // -----------------------------------------------------------------
619    // stats()
620    // -----------------------------------------------------------------
621
622    #[test]
623    fn stats_empty_database_returns_zeroed_values() {
624        let idx = Indexer::open_in_memory().unwrap();
625        let stats = idx.stats().unwrap();
626
627        assert_eq!(stats.entries, 0);
628        assert_eq!(stats.min_timestamp, None);
629        assert_eq!(stats.max_timestamp, None);
630        assert!(stats.tags.is_empty());
631    }
632
633    #[test]
634    fn stats_counts_entries() {
635        let mut idx = Indexer::open_in_memory().unwrap();
636        let entries: Vec<_> = (0..5)
637            .map(|i| make_entry("2026-04-20T10:00:00Z", "info", &format!("msg-{i}")))
638            .collect();
639        idx.insert_batch(&entries).unwrap();
640
641        let stats = idx.stats().unwrap();
642        assert_eq!(stats.entries, 5);
643    }
644
645    #[test]
646    fn stats_timestamp_range_uses_lexical_min_and_max() {
647        let mut idx = Indexer::open_in_memory().unwrap();
648        // Insert intentionally out-of-order to confirm MIN/MAX, not insertion
649        // order, drives the bounds.
650        idx.insert_batch(&[
651            make_entry("2026-04-22T15:30:00Z", "error", "second"),
652            make_entry("2026-04-20T10:00:00Z", "info", "first"),
653            make_entry("2026-04-21T12:00:00Z", "warn", "third"),
654        ])
655        .unwrap();
656
657        let stats = idx.stats().unwrap();
658        assert_eq!(stats.min_timestamp.as_deref(), Some("2026-04-20T10:00:00Z"));
659        assert_eq!(stats.max_timestamp.as_deref(), Some("2026-04-22T15:30:00Z"));
660    }
661
662    #[test]
663    fn stats_distinct_tags_place_untagged_first_then_alphabetical() {
664        let mut idx = Indexer::open_in_memory().unwrap();
665
666        // One untagged row.
667        let untagged = make_entry("2026-04-20T10:00:00Z", "info", "untagged-msg");
668
669        // Two distinct rows sharing tag "api" — must collapse via DISTINCT.
670        let mut api1 = make_entry("2026-04-20T10:00:01Z", "info", "api-msg-1");
671        api1.tag = Some("api".to_string());
672        let mut api2 = make_entry("2026-04-20T10:00:02Z", "info", "api-msg-2");
673        api2.tag = Some("api".to_string());
674
675        // One row with tag "payments".
676        let mut payments = make_entry("2026-04-20T10:00:03Z", "info", "payments-msg");
677        payments.tag = Some("payments".to_string());
678
679        idx.insert_batch(&[untagged, api1, api2, payments]).unwrap();
680
681        let stats = idx.stats().unwrap();
682        assert_eq!(stats.tags.len(), 3);
683        // NULL comes first in SQLite's ascending sort.
684        assert_eq!(stats.tags[0], None);
685        assert_eq!(stats.tags[1], Some("api".to_string()));
686        assert_eq!(stats.tags[2], Some("payments".to_string()));
687    }
688
689    #[test]
690    fn stats_entries_count_respects_dedup() {
691        let mut idx = Indexer::open_in_memory().unwrap();
692        // Two batches of the same entry — second is deduplicated away.
693        idx.insert_batch(&[make_entry("2026-04-20T10:00:00Z", "info", "dup")])
694            .unwrap();
695        idx.insert_batch(&[make_entry("2026-04-20T10:00:00Z", "info", "dup")])
696            .unwrap();
697
698        let stats = idx.stats().unwrap();
699        assert_eq!(stats.entries, 1);
700    }
701
702    #[test]
703    fn stats_entries_count_excludes_timestamp_less_entries() {
704        let mut idx = Indexer::open_in_memory().unwrap();
705
706        let mut no_ts = LogEntry::new(r#"{"level":"info"}"#);
707        no_ts.level = Some("info".to_string());
708
709        idx.insert_batch(&[make_entry("2026-04-20T10:00:00Z", "info", "present"), no_ts])
710            .unwrap();
711
712        let stats = idx.stats().unwrap();
713        assert_eq!(stats.entries, 1);
714    }
715
716    // -----------------------------------------------------------------
717    // open_read_only()
718    // -----------------------------------------------------------------
719
720    #[test]
721    fn open_read_only_errors_when_file_is_missing() {
722        let dir = tempfile::tempdir().unwrap();
723        let missing = dir.path().join("does-not-exist.db");
724        let err = Indexer::open_read_only(&missing).unwrap_err();
725        // SQLite returns "unable to open database file" for missing paths in
726        // read-only mode; surfaced through `LogdiveError::Sqlite`.
727        assert!(matches!(err, LogdiveError::Sqlite(_)));
728    }
729
730    #[test]
731    fn open_read_only_can_read_existing_rows() {
732        let dir = tempfile::tempdir().unwrap();
733        let db = dir.path().join("ro.db");
734
735        // Populate via the writable opener.
736        {
737            let mut idx = Indexer::open(&db).unwrap();
738            idx.insert_batch(&[make_entry("2026-04-20T10:00:00Z", "info", "visible")])
739                .unwrap();
740        }
741
742        // Re-open read-only and read back.
743        let ro = Indexer::open_read_only(&db).unwrap();
744        let count: i64 = ro
745            .connection()
746            .query_row("SELECT COUNT(*) FROM log_entries", [], |row| row.get(0))
747            .unwrap();
748        assert_eq!(count, 1);
749
750        let stats = ro.stats().unwrap();
751        assert_eq!(stats.entries, 1);
752    }
753
754    #[test]
755    fn open_read_only_rejects_writes_at_sqlite_level() {
756        let dir = tempfile::tempdir().unwrap();
757        let db = dir.path().join("ro-reject.db");
758
759        // Create and close.
760        {
761            let _ = Indexer::open(&db).unwrap();
762        }
763
764        // Re-open RO and attempt a write via raw SQL — SQLite should block it.
765        let ro = Indexer::open_read_only(&db).unwrap();
766        let result = ro.connection().execute(
767            "INSERT INTO log_entries (timestamp, raw, raw_hash) VALUES ('x', 'y', 'z')",
768            [],
769        );
770        assert!(result.is_err(), "read-only connection must reject writes");
771    }
772
773    #[test]
774    fn open_read_only_does_not_run_schema_migrations() {
775        // If `open_read_only` tried to CREATE IF NOT EXISTS anything, it
776        // would error against a read-only connection. Opening an empty DB
777        // that's NOT been initialized demonstrates open_read_only doesn't
778        // attempt writes of any kind.
779        let dir = tempfile::tempdir().unwrap();
780        let db = dir.path().join("bare.db");
781
782        // Create a totally empty SQLite file (no schema).
783        {
784            let c = Connection::open(&db).unwrap();
785            // Ensure the file exists without creating the log_entries table.
786            c.execute_batch("PRAGMA user_version = 0;").unwrap();
787        }
788
789        // open_read_only must succeed (no migration attempt).
790        let ro = Indexer::open_read_only(&db).expect("open ro on bare db");
791
792        // Table is absent, so a SELECT errors — proving we didn't create it.
793        let err = ro
794            .connection()
795            .query_row("SELECT COUNT(*) FROM log_entries", [], |row| {
796                row.get::<_, i64>(0)
797            });
798        assert!(err.is_err());
799    }
800
801    // -----------------------------------------------------------------
802    // prune()
803    // -----------------------------------------------------------------
804
805    #[test]
806    fn prune_deletes_entries_strictly_older_than_cutoff() {
807        let mut idx = Indexer::open_in_memory().unwrap();
808        idx.insert_batch(&[
809            make_entry("2026-04-01T00:00:00Z", "info", "old one"),
810            make_entry("2026-04-10T00:00:00Z", "info", "old two"),
811            make_entry("2026-04-20T00:00:00Z", "info", "kept"),
812        ])
813        .unwrap();
814
815        let stats = idx.prune("2026-04-15T00:00:00Z").unwrap();
816        assert_eq!(stats.deleted, 2);
817
818        let count: i64 = idx
819            .connection()
820            .query_row("SELECT COUNT(*) FROM log_entries", [], |row| row.get(0))
821            .unwrap();
822        assert_eq!(count, 1);
823
824        // The surviving row is the one newer than the cutoff.
825        let surviving: String = idx
826            .connection()
827            .query_row("SELECT message FROM log_entries", [], |row| row.get(0))
828            .unwrap();
829        assert_eq!(surviving, "kept");
830    }
831
832    #[test]
833    fn prune_keeps_entry_exactly_at_cutoff() {
834        // The comparison is strict `<`, so a row whose timestamp equals the
835        // cutoff is retained, not deleted.
836        let mut idx = Indexer::open_in_memory().unwrap();
837        idx.insert_batch(&[make_entry("2026-04-15T00:00:00Z", "info", "boundary")])
838            .unwrap();
839
840        let stats = idx.prune("2026-04-15T00:00:00Z").unwrap();
841        assert_eq!(stats.deleted, 0);
842
843        let count: i64 = idx
844            .connection()
845            .query_row("SELECT COUNT(*) FROM log_entries", [], |row| row.get(0))
846            .unwrap();
847        assert_eq!(count, 1);
848    }
849
850    #[test]
851    fn prune_on_empty_database_deletes_nothing() {
852        let mut idx = Indexer::open_in_memory().unwrap();
853        let stats = idx.prune("2026-04-15T00:00:00Z").unwrap();
854        assert_eq!(stats.deleted, 0);
855    }
856
857    #[test]
858    fn prune_with_cutoff_before_all_entries_deletes_nothing() {
859        let mut idx = Indexer::open_in_memory().unwrap();
860        idx.insert_batch(&[
861            make_entry("2026-04-20T00:00:00Z", "info", "a"),
862            make_entry("2026-04-21T00:00:00Z", "info", "b"),
863        ])
864        .unwrap();
865
866        let stats = idx.prune("2026-01-01T00:00:00Z").unwrap();
867        assert_eq!(stats.deleted, 0);
868
869        let count: i64 = idx
870            .connection()
871            .query_row("SELECT COUNT(*) FROM log_entries", [], |row| row.get(0))
872            .unwrap();
873        assert_eq!(count, 2);
874    }
875
876    #[test]
877    fn prune_with_cutoff_after_all_entries_deletes_all() {
878        let mut idx = Indexer::open_in_memory().unwrap();
879        idx.insert_batch(&[
880            make_entry("2026-04-20T00:00:00Z", "info", "a"),
881            make_entry("2026-04-21T00:00:00Z", "info", "b"),
882            make_entry("2026-04-22T00:00:00Z", "info", "c"),
883        ])
884        .unwrap();
885
886        let stats = idx.prune("2027-01-01T00:00:00Z").unwrap();
887        assert_eq!(stats.deleted, 3);
888
889        let count: i64 = idx
890            .connection()
891            .query_row("SELECT COUNT(*) FROM log_entries", [], |row| row.get(0))
892            .unwrap();
893        assert_eq!(count, 0);
894    }
895
896    #[test]
897    fn prune_returns_accurate_deleted_count() {
898        let mut idx = Indexer::open_in_memory().unwrap();
899        // Ten entries, one per day from the 1st to the 10th.
900        let entries: Vec<_> = (1..=10)
901            .map(|day| {
902                make_entry(
903                    &format!("2026-04-{day:02}T00:00:00Z"),
904                    "info",
905                    &format!("day-{day}"),
906                )
907            })
908            .collect();
909        idx.insert_batch(&entries).unwrap();
910
911        // Cutoff at the 6th deletes days 1-5 (strictly older): 5 rows.
912        let stats = idx.prune("2026-04-06T00:00:00Z").unwrap();
913        assert_eq!(stats.deleted, 5);
914
915        let count: i64 = idx
916            .connection()
917            .query_row("SELECT COUNT(*) FROM log_entries", [], |row| row.get(0))
918            .unwrap();
919        assert_eq!(count, 5);
920    }
921
922    #[test]
923    fn prune_then_stats_reflects_deletion() {
924        let mut idx = Indexer::open_in_memory().unwrap();
925        idx.insert_batch(&[
926            make_entry("2026-04-01T00:00:00Z", "info", "gone"),
927            make_entry("2026-04-20T00:00:00Z", "info", "stays"),
928        ])
929        .unwrap();
930
931        idx.prune("2026-04-10T00:00:00Z").unwrap();
932
933        let stats = idx.stats().unwrap();
934        assert_eq!(stats.entries, 1);
935        assert_eq!(stats.min_timestamp.as_deref(), Some("2026-04-20T00:00:00Z"));
936        assert_eq!(stats.max_timestamp.as_deref(), Some("2026-04-20T00:00:00Z"));
937    }
938
939    #[test]
940    fn prune_works_on_disk_backed_index() {
941        // VACUUM exercises a different code path on-disk than in-memory;
942        // run the real on-disk path to confirm DELETE + VACUUM both succeed.
943        let dir = tempfile::tempdir().unwrap();
944        let db = dir.path().join("prune.db");
945        let mut idx = Indexer::open(&db).unwrap();
946        idx.insert_batch(&[
947            make_entry("2026-04-01T00:00:00Z", "info", "old"),
948            make_entry("2026-04-20T00:00:00Z", "info", "new"),
949        ])
950        .unwrap();
951
952        let stats = idx.prune("2026-04-10T00:00:00Z").unwrap();
953        assert_eq!(stats.deleted, 1);
954
955        let count: i64 = idx
956            .connection()
957            .query_row("SELECT COUNT(*) FROM log_entries", [], |row| row.get(0))
958            .unwrap();
959        assert_eq!(count, 1);
960    }
961}