Skip to main content

logdive_core/
indexer.rs

1//! SQLite-backed index for ingested log entries.
2//!
3//! This module owns the persistent storage side of logdive: schema creation,
4//! row-level deduplication via `blake3`, batched inserts of 1000 rows per
5//! transaction (per the decisions log entry dated 2026-04-19), and time-based
6//! retention via [`Indexer::prune`]. The schema is reproduced verbatim from
7//! the project doc's "SQLite schema" section with `IF NOT EXISTS` added so
8//! opening an existing database is idempotent.
9//!
10//! `Indexer` is an owning handle over a `rusqlite::Connection`. It can be
11//! constructed against a filesystem path via [`Indexer::open`] or against an
12//! in-memory database via [`Indexer::open_in_memory`] — the latter is used
13//! by the unit tests below and will also serve ad-hoc one-shot scenarios.
14//! For read-only consumers (the HTTP API in milestone 8), [`Indexer::
15//! open_read_only`] opens an existing database without the schema init or
16//! directory-creation side effects of [`Indexer::open`], and enforces
17//! read-only semantics at the SQLite level via `SQLITE_OPEN_READ_ONLY`.
18//!
19//! # Timestamp NOT NULL policy
20//!
21//! The schema declares `timestamp TEXT NOT NULL`, but the parser produces
22//! `LogEntry::timestamp = None` for lines that omit the key. Rather than
23//! fabricating a fallback (which would falsely anchor those rows to
24//! ingestion time and confuse `last Nh` queries), the indexer *skips* such
25//! rows and reports them in [`InsertStats::skipped_no_timestamp`]. This
26//! mirrors the parser's "graceful skip" philosophy — bad data is counted
27//! and dropped, never manufactured.
28
29use std::path::{Path, PathBuf};
30
31use rusqlite::{Connection, OpenFlags, params};
32
33use crate::entry::LogEntry;
34use crate::error::{LogdiveError, Result};
35
36/// Size of a single insert transaction, per the decisions log
37/// (2026-04-19: "batch insert per 1000 lines").
38pub const BATCH_SIZE: usize = 1000;
39
40const DEFAULT_DB_FILENAME: &str = "index.db";
41const LOGDIVE_HOME_DIRNAME: &str = ".logdive";
42
43/// Resolve the path to the index database.
44///
45/// When `override_path` is `Some`, it is used verbatim — this is what the
46/// CLI's `--db` flag wires into. Otherwise the default `~/.logdive/index.db`
47/// is returned per the "Default index location" decision in the project doc.
48///
49/// Purely functional: does not touch the filesystem.
50pub fn db_path(override_path: Option<&Path>) -> PathBuf {
51    if let Some(p) = override_path {
52        return p.to_path_buf();
53    }
54    // POSIX-centric: logdive's Phase 4 release targets are Linux and macOS,
55    // both of which expose HOME. Fall back to CWD if it is unset (containers,
56    // stripped CI environments) rather than panicking.
57    let home = std::env::var("HOME").unwrap_or_else(|_| ".".to_string());
58    PathBuf::from(home)
59        .join(LOGDIVE_HOME_DIRNAME)
60        .join(DEFAULT_DB_FILENAME)
61}
62
63/// Outcome of an insert batch, surfaced to the CLI for progress output
64/// ("lines ingested / lines skipped per second", per milestone 6).
65#[derive(Debug, Default, Clone, Copy, PartialEq, Eq)]
66pub struct InsertStats {
67    /// Rows newly added to the index.
68    pub inserted: usize,
69    /// Rows rejected by `INSERT OR IGNORE` because their `raw_hash` already
70    /// existed — the dedup path per the decisions log.
71    pub deduplicated: usize,
72    /// Rows rejected because they had no `timestamp`. See module docs.
73    pub skipped_no_timestamp: usize,
74}
75
76impl InsertStats {
77    fn extend(&mut self, other: InsertStats) {
78        self.inserted += other.inserted;
79        self.deduplicated += other.deduplicated;
80        self.skipped_no_timestamp += other.skipped_no_timestamp;
81    }
82}
83
84/// Outcome of a [`Indexer::prune`] operation, surfaced to the CLI's `prune`
85/// subcommand for its completion summary.
86///
87/// Marked `#[non_exhaustive]` so later milestones can add fields (e.g. bytes
88/// reclaimed by the `VACUUM`) without breaking the public API.
89#[derive(Debug, Default, Clone, Copy, PartialEq, Eq)]
90#[non_exhaustive]
91pub struct PruneStats {
92    /// Number of rows deleted by the prune.
93    pub deleted: u64,
94}
95
96/// Aggregate metadata about the contents of an index.
97///
98/// Produced by [`Indexer::stats`] and consumed by the CLI `stats` subcommand
99/// (milestone 7) and the `GET /stats` HTTP endpoint (milestone 8). The shape
100/// is intentionally minimal and structural; the CLI and HTTP layers format
101/// it for human or machine consumption.
102///
103/// `tags` ordering: `None` (untagged rows) first, then non-null tag strings
104/// in ascending alphabetical order. This ordering is produced directly by
105/// SQLite (`ORDER BY tag` places NULL first in ascending order) and is not
106/// re-sorted in Rust. The CLI renders the `None` slot as "(untagged)".
107///
108/// Marked `#[non_exhaustive]` so additional summary fields (e.g. distinct
109/// level counts) can be added in later milestones without breaking the
110/// public API.
111#[derive(Debug, Clone)]
112#[non_exhaustive]
113pub struct Stats {
114    /// Total number of rows currently in the `log_entries` table.
115    pub entries: u64,
116    /// Lexically smallest `timestamp` value in the index, or `None` on an
117    /// empty database. Lexical ordering is correct for ISO-8601 timestamps;
118    /// see the "live design decisions" section of the project handoff.
119    pub min_timestamp: Option<String>,
120    /// Lexically largest `timestamp` value in the index, or `None` on an
121    /// empty database.
122    pub max_timestamp: Option<String>,
123    /// Distinct tag values observed across all rows. `None` represents rows
124    /// with no tag (SQL NULL) and — when present — is always the first
125    /// element; non-null tags follow in ascending alphabetical order.
126    pub tags: Vec<Option<String>>,
127}
128
129/// Owning handle over a SQLite connection to a logdive index.
130#[derive(Debug)]
131pub struct Indexer {
132    conn: Connection,
133}
134
135impl Indexer {
136    /// Open (or create) a logdive index at `path`.
137    ///
138    /// Creates the parent directory if it does not already exist, opens the
139    /// SQLite database, and runs idempotent schema migrations.
140    pub fn open(path: &Path) -> Result<Self> {
141        ensure_parent_dir(path)?;
142        let conn = Connection::open(path)?;
143        init_schema(&conn)?;
144        Ok(Self { conn })
145    }
146
147    /// Open an in-memory index. Used by tests; also usable for one-shot
148    /// scenarios that don't need persistence.
149    pub fn open_in_memory() -> Result<Self> {
150        let conn = Connection::open_in_memory()?;
151        init_schema(&conn)?;
152        Ok(Self { conn })
153    }
154
155    /// Open an existing logdive index at `path` in read-only mode.
156    ///
157    /// Unlike [`Indexer::open`], this method:
158    ///   1. Does **not** create the database file if it is missing (the
159    ///      `SQLITE_OPEN_READ_ONLY` flag fails rather than creates),
160    ///   2. Does **not** create the parent directory,
161    ///   3. Does **not** run schema migrations — the caller is promising
162    ///      that `path` already points at a valid logdive index.
163    ///
164    /// Enforcement of read-only semantics is at the SQLite level: any
165    /// attempted write through the returned connection raises a runtime
166    /// error. This is defense-in-depth for the HTTP API (milestone 8),
167    /// whose surface is exclusively read.
168    pub fn open_read_only(path: &Path) -> Result<Self> {
169        // `SQLITE_OPEN_URI` is included because it's the safe default
170        // documented by rusqlite; it only affects parsing of `file:...`
171        // URIs, which we never pass in.
172        let flags = OpenFlags::SQLITE_OPEN_READ_ONLY | OpenFlags::SQLITE_OPEN_URI;
173        let conn = Connection::open_with_flags(path, flags)?;
174        Ok(Self { conn })
175    }
176
177    /// Borrow the underlying connection.
178    ///
179    /// Exposed so the query executor can run reads without an extra
180    /// abstraction layer. Read-only borrow keeps ingestion and querying
181    /// from contending over `&mut`.
182    pub fn connection(&self) -> &Connection {
183        &self.conn
184    }
185
186    /// Insert a slice of entries into the index, chunking internally into
187    /// transactions of [`BATCH_SIZE`] rows each.
188    ///
189    /// Returns aggregate stats across all chunks. Entry ordering within
190    /// the index is not guaranteed.
191    pub fn insert_batch(&mut self, entries: &[LogEntry]) -> Result<InsertStats> {
192        let mut total = InsertStats::default();
193        for chunk in entries.chunks(BATCH_SIZE) {
194            let stats = insert_one_chunk(&mut self.conn, chunk)?;
195            total.extend(stats);
196        }
197        Ok(total)
198    }
199
200    /// Delete every entry whose `timestamp` is strictly older than `cutoff`,
201    /// then `VACUUM` to reclaim the freed disk space.
202    ///
203    /// `cutoff` is compared lexically against the stored `timestamp` TEXT
204    /// column. This is correct for ISO-8601 / RFC3339 timestamps, which sort
205    /// chronologically as text — the same comparison contract the query
206    /// executor's `last` / `since` clauses rely on. A non-ISO-shaped cutoff
207    /// (or non-ISO timestamps in the index) will compare incorrectly, the
208    /// same known limitation that applies to time-range queries.
209    ///
210    /// The comparison is strict `<`: a row whose timestamp exactly equals
211    /// `cutoff` is **kept**, not deleted.
212    ///
213    /// Returns the number of rows deleted in [`PruneStats::deleted`].
214    ///
215    /// # VACUUM and transactions
216    ///
217    /// SQLite refuses to run `VACUUM` inside an explicit transaction, so this
218    /// method issues the `DELETE` and the `VACUUM` as two separate autocommit
219    /// statements rather than wrapping them in `conn.transaction()`. The
220    /// `DELETE` is a single statement and therefore atomic on its own; a
221    /// crash between the two would leave the rows deleted but the file not
222    /// yet compacted — harmless, since any later `VACUUM` reclaims the space.
223    pub fn prune(&mut self, cutoff: &str) -> Result<PruneStats> {
224        let deleted = self.conn.execute(
225            "DELETE FROM log_entries WHERE timestamp < ?1",
226            params![cutoff],
227        )?;
228        // VACUUM cannot run inside a transaction — issue it on its own.
229        self.conn.execute_batch("VACUUM")?;
230        Ok(PruneStats {
231            deleted: deleted as u64,
232        })
233    }
234
235    /// Read aggregate metadata about the index.
236    ///
237    /// Runs three read-only queries:
238    /// 1. `COUNT(*)` for the row count,
239    /// 2. `MIN(timestamp), MAX(timestamp)` for the time range,
240    /// 3. `SELECT DISTINCT tag ... ORDER BY tag` for the tag list.
241    ///
242    /// On an empty database, returns `entries = 0`, both timestamp bounds
243    /// as `None`, and an empty `tags` vector — not an error.
244    pub fn stats(&self) -> Result<Stats> {
245        // COUNT(*) is always non-negative; cast i64 → u64 is well-defined.
246        let entries_i64: i64 =
247            self.conn
248                .query_row("SELECT COUNT(*) FROM log_entries", [], |row| row.get(0))?;
249        let entries = entries_i64 as u64;
250
251        // Aggregates without GROUP BY always yield exactly one row; MIN/MAX
252        // on an empty table return (NULL, NULL), which maps cleanly to
253        // (None, None) via rusqlite's Option<T> FromSql impl.
254        let (min_timestamp, max_timestamp): (Option<String>, Option<String>) =
255            self.conn.query_row(
256                "SELECT MIN(timestamp), MAX(timestamp) FROM log_entries",
257                [],
258                |row| Ok((row.get(0)?, row.get(1)?)),
259            )?;
260
261        // SQLite's `ORDER BY tag` (default ascending) places NULLs first,
262        // which is exactly the ordering contract advertised on `Stats.tags`.
263        let mut stmt = self
264            .conn
265            .prepare("SELECT DISTINCT tag FROM log_entries ORDER BY tag")?;
266        let rows = stmt.query_map([], |row| row.get::<_, Option<String>>(0))?;
267        let mut tags: Vec<Option<String>> = Vec::new();
268        for row in rows {
269            tags.push(row?);
270        }
271
272        Ok(Stats {
273            entries,
274            min_timestamp,
275            max_timestamp,
276            tags,
277        })
278    }
279}
280
281// ---------------------------------------------------------------------------
282// Internals
283// ---------------------------------------------------------------------------
284
285fn ensure_parent_dir(path: &Path) -> Result<()> {
286    let Some(parent) = path.parent() else {
287        return Ok(());
288    };
289    if parent.as_os_str().is_empty() {
290        // Relative filename with no directory component ("index.db").
291        return Ok(());
292    }
293    std::fs::create_dir_all(parent).map_err(|io_err| LogdiveError::io_at(parent, io_err))
294}
295
296fn init_schema(conn: &Connection) -> Result<()> {
297    // Schema taken verbatim from the project doc's "SQLite schema" section,
298    // with `IF NOT EXISTS` added on every statement so open() is idempotent.
299    conn.execute_batch(
300        "CREATE TABLE IF NOT EXISTS log_entries (
301            id          INTEGER PRIMARY KEY AUTOINCREMENT,
302            timestamp   TEXT NOT NULL,
303            level       TEXT,
304            message     TEXT,
305            tag         TEXT,
306            fields      TEXT,
307            raw         TEXT NOT NULL,
308            raw_hash    TEXT NOT NULL UNIQUE,
309            ingested_at TEXT NOT NULL DEFAULT (datetime('now'))
310        );
311        CREATE INDEX IF NOT EXISTS idx_level      ON log_entries(level);
312        CREATE INDEX IF NOT EXISTS idx_tag        ON log_entries(tag);
313        CREATE INDEX IF NOT EXISTS idx_timestamp  ON log_entries(timestamp);
314        CREATE INDEX IF NOT EXISTS idx_level_norm ON log_entries(lower(level));",
315    )?;
316    Ok(())
317}
318
319fn insert_one_chunk(conn: &mut Connection, entries: &[LogEntry]) -> Result<InsertStats> {
320    let tx = conn.transaction()?;
321    let mut stats = InsertStats::default();
322
323    {
324        let mut stmt = tx.prepare(
325            "INSERT OR IGNORE INTO log_entries
326             (timestamp, level, message, tag, fields, raw, raw_hash)
327             VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)",
328        )?;
329
330        for entry in entries {
331            // NOT NULL enforcement — see module-level docs.
332            let Some(ref ts) = entry.timestamp else {
333                stats.skipped_no_timestamp += 1;
334                continue;
335            };
336
337            // Serializing a `Map<String, Value>` via serde_json is infallible:
338            // every `Value` variant has a defined JSON representation.
339            let fields_json = serde_json::to_string(&entry.fields)
340                .expect("serializing serde_json::Map<String, Value> is infallible");
341            let raw_hash = blake3::hash(entry.raw.as_bytes()).to_hex().to_string();
342
343            let changes = stmt.execute(params![
344                ts,
345                entry.level,
346                entry.message,
347                entry.tag,
348                fields_json,
349                entry.raw,
350                raw_hash,
351            ])?;
352
353            if changes == 0 {
354                stats.deduplicated += 1;
355            } else {
356                stats.inserted += 1;
357            }
358        }
359    }
360
361    tx.commit()?;
362    Ok(stats)
363}
364
365// ---------------------------------------------------------------------------
366// Tests
367// ---------------------------------------------------------------------------
368
369#[cfg(test)]
370mod tests {
371    use super::*;
372    use serde_json::json;
373
374    /// Build a LogEntry whose `raw` is unique per input tuple, guaranteeing
375    /// a distinct `raw_hash` across calls (critical for the chunking test
376    /// where we insert thousands of entries).
377    fn make_entry(ts: &str, level: &str, message: &str) -> LogEntry {
378        let raw = format!(r#"{{"timestamp":"{ts}","level":"{level}","message":"{message}"}}"#);
379        let mut e = LogEntry::new(raw);
380        e.timestamp = Some(ts.to_string());
381        e.level = Some(level.to_string());
382        e.message = Some(message.to_string());
383        e
384    }
385
386    #[test]
387    fn open_in_memory_creates_table_and_three_indexes() {
388        let idx = Indexer::open_in_memory().expect("open in-memory");
389        let table_count: i64 = idx
390            .connection()
391            .query_row(
392                "SELECT COUNT(*) FROM sqlite_master \
393                 WHERE type='table' AND name='log_entries'",
394                [],
395                |row| row.get(0),
396            )
397            .unwrap();
398        assert_eq!(table_count, 1);
399
400        let index_count: i64 = idx
401            .connection()
402            .query_row(
403                "SELECT COUNT(*) FROM sqlite_master \
404                 WHERE type='index' AND name IN \
405                 ('idx_level','idx_tag','idx_timestamp','idx_level_norm')",
406                [],
407                |row| row.get(0),
408            )
409            .unwrap();
410        assert_eq!(index_count, 4);
411    }
412
413    #[test]
414    fn insert_batch_adds_rows_and_reports_stats() {
415        let mut idx = Indexer::open_in_memory().unwrap();
416        let entries = vec![
417            make_entry("2026-04-20T10:00:00Z", "info", "one"),
418            make_entry("2026-04-20T10:00:01Z", "error", "two"),
419        ];
420        let stats = idx.insert_batch(&entries).unwrap();
421
422        assert_eq!(stats.inserted, 2);
423        assert_eq!(stats.deduplicated, 0);
424        assert_eq!(stats.skipped_no_timestamp, 0);
425
426        let count: i64 = idx
427            .connection()
428            .query_row("SELECT COUNT(*) FROM log_entries", [], |row| row.get(0))
429            .unwrap();
430        assert_eq!(count, 2);
431    }
432
433    #[test]
434    fn reinsert_is_deduplicated_by_raw_hash() {
435        let mut idx = Indexer::open_in_memory().unwrap();
436        let entries = vec![make_entry("2026-04-20T10:00:00Z", "info", "hello")];
437
438        let first = idx.insert_batch(&entries).unwrap();
439        assert_eq!(first.inserted, 1);
440        assert_eq!(first.deduplicated, 0);
441
442        let second = idx.insert_batch(&entries).unwrap();
443        assert_eq!(second.inserted, 0);
444        assert_eq!(second.deduplicated, 1);
445
446        let count: i64 = idx
447            .connection()
448            .query_row("SELECT COUNT(*) FROM log_entries", [], |row| row.get(0))
449            .unwrap();
450        assert_eq!(count, 1);
451    }
452
453    #[test]
454    fn entries_without_timestamp_are_skipped_not_fabricated() {
455        let mut idx = Indexer::open_in_memory().unwrap();
456        let mut no_ts = LogEntry::new(r#"{"level":"info"}"#);
457        no_ts.level = Some("info".to_string());
458
459        let stats = idx.insert_batch(&[no_ts]).unwrap();
460        assert_eq!(stats.inserted, 0);
461        assert_eq!(stats.skipped_no_timestamp, 1);
462
463        let count: i64 = idx
464            .connection()
465            .query_row("SELECT COUNT(*) FROM log_entries", [], |row| row.get(0))
466            .unwrap();
467        assert_eq!(count, 0);
468    }
469
470    #[test]
471    fn mixed_batch_counts_each_outcome_category() {
472        let mut idx = Indexer::open_in_memory().unwrap();
473        idx.insert_batch(&[make_entry("2026-04-20T10:00:00Z", "info", "first")])
474            .unwrap();
475
476        let mut no_ts = LogEntry::new(r#"{"level":"warn"}"#);
477        no_ts.level = Some("warn".to_string());
478
479        let mixed = vec![
480            make_entry("2026-04-20T10:00:00Z", "info", "first"),
481            make_entry("2026-04-20T10:00:05Z", "error", "second"),
482            no_ts,
483        ];
484        let stats = idx.insert_batch(&mixed).unwrap();
485        assert_eq!(stats.inserted, 1);
486        assert_eq!(stats.deduplicated, 1);
487        assert_eq!(stats.skipped_no_timestamp, 1);
488    }
489
490    #[test]
491    fn fields_are_stored_as_json_queryable_via_json_extract() {
492        let mut idx = Indexer::open_in_memory().unwrap();
493        let mut e = make_entry("2026-04-20T10:00:00Z", "info", "hi");
494        e.fields.insert("service".to_string(), json!("payments"));
495        e.fields.insert("req_id".to_string(), json!(42));
496        idx.insert_batch(&[e]).unwrap();
497
498        let service: String = idx
499            .connection()
500            .query_row(
501                "SELECT json_extract(fields, '$.service') FROM log_entries",
502                [],
503                |row| row.get(0),
504            )
505            .unwrap();
506        assert_eq!(service, "payments");
507
508        let req_id: i64 = idx
509            .connection()
510            .query_row(
511                "SELECT json_extract(fields, '$.req_id') FROM log_entries",
512                [],
513                |row| row.get(0),
514            )
515            .unwrap();
516        assert_eq!(req_id, 42);
517    }
518
519    #[test]
520    fn empty_fields_round_trip_as_empty_json_object_not_null() {
521        let mut idx = Indexer::open_in_memory().unwrap();
522        idx.insert_batch(&[make_entry("2026-04-20T10:00:00Z", "info", "x")])
523            .unwrap();
524
525        let stored: String = idx
526            .connection()
527            .query_row("SELECT fields FROM log_entries", [], |row| row.get(0))
528            .unwrap();
529        assert_eq!(stored, "{}");
530    }
531
532    #[test]
533    fn raw_hash_is_a_64_char_hex_blake3_digest() {
534        let mut idx = Indexer::open_in_memory().unwrap();
535        idx.insert_batch(&[make_entry("2026-04-20T10:00:00Z", "info", "hash me")])
536            .unwrap();
537
538        let stored_hash: String = idx
539            .connection()
540            .query_row("SELECT raw_hash FROM log_entries", [], |row| row.get(0))
541            .unwrap();
542        assert_eq!(stored_hash.len(), 64);
543        assert!(stored_hash.chars().all(|c| c.is_ascii_hexdigit()));
544    }
545
546    #[test]
547    fn chunking_handles_batches_larger_than_batch_size() {
548        let mut idx = Indexer::open_in_memory().unwrap();
549        let total = BATCH_SIZE + 337;
550        let entries: Vec<_> = (0..total)
551            .map(|i| make_entry("2026-04-20T10:00:00Z", "info", &format!("message-{i}")))
552            .collect();
553
554        let stats = idx.insert_batch(&entries).unwrap();
555        assert_eq!(stats.inserted, total);
556        assert_eq!(stats.deduplicated, 0);
557
558        let count: i64 = idx
559            .connection()
560            .query_row("SELECT COUNT(*) FROM log_entries", [], |row| row.get(0))
561            .unwrap();
562        assert_eq!(count, total as i64);
563    }
564
565    #[test]
566    fn db_path_returns_override_verbatim() {
567        let p = Path::new("/tmp/logdive-test/override.db");
568        assert_eq!(
569            db_path(Some(p)),
570            PathBuf::from("/tmp/logdive-test/override.db")
571        );
572    }
573
574    #[test]
575    fn db_path_default_ends_with_standard_location() {
576        let default = db_path(None);
577        assert!(default.ends_with(".logdive/index.db"));
578    }
579
580    #[test]
581    fn open_creates_parent_directory_and_is_idempotent_across_opens() {
582        let dir = tempfile::tempdir().unwrap();
583        let db = dir.path().join("sub").join("dir").join("index.db");
584
585        {
586            let mut idx = Indexer::open(&db).expect("first open");
587            idx.insert_batch(&[make_entry("2026-04-20T10:00:00Z", "info", "persist me")])
588                .unwrap();
589        }
590
591        {
592            let idx = Indexer::open(&db).expect("second open");
593            let count: i64 = idx
594                .connection()
595                .query_row("SELECT COUNT(*) FROM log_entries", [], |row| row.get(0))
596                .unwrap();
597            assert_eq!(count, 1);
598        }
599    }
600
601    #[test]
602    fn io_error_variant_attaches_parent_path() {
603        // If the parent directory cannot be created (e.g. because it lives
604        // under a regular file), we should get LogdiveError::Io with the
605        // offending path, not a SqliteFailure.
606        let dir = tempfile::tempdir().unwrap();
607        let blocker = dir.path().join("blocker");
608        std::fs::write(&blocker, b"not a directory").unwrap();
609        let bad_db = blocker.join("child").join("index.db");
610
611        let err = Indexer::open(&bad_db).unwrap_err();
612        match err {
613            LogdiveError::Io { path, .. } => {
614                assert!(path.starts_with(dir.path()));
615            }
616            other => panic!("expected Io variant, got {other:?}"),
617        }
618    }
619
620    // -----------------------------------------------------------------
621    // stats()
622    // -----------------------------------------------------------------
623
624    #[test]
625    fn stats_empty_database_returns_zeroed_values() {
626        let idx = Indexer::open_in_memory().unwrap();
627        let stats = idx.stats().unwrap();
628
629        assert_eq!(stats.entries, 0);
630        assert_eq!(stats.min_timestamp, None);
631        assert_eq!(stats.max_timestamp, None);
632        assert!(stats.tags.is_empty());
633    }
634
635    #[test]
636    fn stats_counts_entries() {
637        let mut idx = Indexer::open_in_memory().unwrap();
638        let entries: Vec<_> = (0..5)
639            .map(|i| make_entry("2026-04-20T10:00:00Z", "info", &format!("msg-{i}")))
640            .collect();
641        idx.insert_batch(&entries).unwrap();
642
643        let stats = idx.stats().unwrap();
644        assert_eq!(stats.entries, 5);
645    }
646
647    #[test]
648    fn stats_timestamp_range_uses_lexical_min_and_max() {
649        let mut idx = Indexer::open_in_memory().unwrap();
650        // Insert intentionally out-of-order to confirm MIN/MAX, not insertion
651        // order, drives the bounds.
652        idx.insert_batch(&[
653            make_entry("2026-04-22T15:30:00Z", "error", "second"),
654            make_entry("2026-04-20T10:00:00Z", "info", "first"),
655            make_entry("2026-04-21T12:00:00Z", "warn", "third"),
656        ])
657        .unwrap();
658
659        let stats = idx.stats().unwrap();
660        assert_eq!(stats.min_timestamp.as_deref(), Some("2026-04-20T10:00:00Z"));
661        assert_eq!(stats.max_timestamp.as_deref(), Some("2026-04-22T15:30:00Z"));
662    }
663
664    #[test]
665    fn stats_distinct_tags_place_untagged_first_then_alphabetical() {
666        let mut idx = Indexer::open_in_memory().unwrap();
667
668        // One untagged row.
669        let untagged = make_entry("2026-04-20T10:00:00Z", "info", "untagged-msg");
670
671        // Two distinct rows sharing tag "api" — must collapse via DISTINCT.
672        let mut api1 = make_entry("2026-04-20T10:00:01Z", "info", "api-msg-1");
673        api1.tag = Some("api".to_string());
674        let mut api2 = make_entry("2026-04-20T10:00:02Z", "info", "api-msg-2");
675        api2.tag = Some("api".to_string());
676
677        // One row with tag "payments".
678        let mut payments = make_entry("2026-04-20T10:00:03Z", "info", "payments-msg");
679        payments.tag = Some("payments".to_string());
680
681        idx.insert_batch(&[untagged, api1, api2, payments]).unwrap();
682
683        let stats = idx.stats().unwrap();
684        assert_eq!(stats.tags.len(), 3);
685        // NULL comes first in SQLite's ascending sort.
686        assert_eq!(stats.tags[0], None);
687        assert_eq!(stats.tags[1], Some("api".to_string()));
688        assert_eq!(stats.tags[2], Some("payments".to_string()));
689    }
690
691    #[test]
692    fn stats_entries_count_respects_dedup() {
693        let mut idx = Indexer::open_in_memory().unwrap();
694        // Two batches of the same entry — second is deduplicated away.
695        idx.insert_batch(&[make_entry("2026-04-20T10:00:00Z", "info", "dup")])
696            .unwrap();
697        idx.insert_batch(&[make_entry("2026-04-20T10:00:00Z", "info", "dup")])
698            .unwrap();
699
700        let stats = idx.stats().unwrap();
701        assert_eq!(stats.entries, 1);
702    }
703
704    #[test]
705    fn stats_entries_count_excludes_timestamp_less_entries() {
706        let mut idx = Indexer::open_in_memory().unwrap();
707
708        let mut no_ts = LogEntry::new(r#"{"level":"info"}"#);
709        no_ts.level = Some("info".to_string());
710
711        idx.insert_batch(&[make_entry("2026-04-20T10:00:00Z", "info", "present"), no_ts])
712            .unwrap();
713
714        let stats = idx.stats().unwrap();
715        assert_eq!(stats.entries, 1);
716    }
717
718    // -----------------------------------------------------------------
719    // open_read_only()
720    // -----------------------------------------------------------------
721
722    #[test]
723    fn open_read_only_errors_when_file_is_missing() {
724        let dir = tempfile::tempdir().unwrap();
725        let missing = dir.path().join("does-not-exist.db");
726        let err = Indexer::open_read_only(&missing).unwrap_err();
727        // SQLite returns "unable to open database file" for missing paths in
728        // read-only mode; surfaced through `LogdiveError::Sqlite`.
729        assert!(matches!(err, LogdiveError::Sqlite(_)));
730    }
731
732    #[test]
733    fn open_read_only_can_read_existing_rows() {
734        let dir = tempfile::tempdir().unwrap();
735        let db = dir.path().join("ro.db");
736
737        // Populate via the writable opener.
738        {
739            let mut idx = Indexer::open(&db).unwrap();
740            idx.insert_batch(&[make_entry("2026-04-20T10:00:00Z", "info", "visible")])
741                .unwrap();
742        }
743
744        // Re-open read-only and read back.
745        let ro = Indexer::open_read_only(&db).unwrap();
746        let count: i64 = ro
747            .connection()
748            .query_row("SELECT COUNT(*) FROM log_entries", [], |row| row.get(0))
749            .unwrap();
750        assert_eq!(count, 1);
751
752        let stats = ro.stats().unwrap();
753        assert_eq!(stats.entries, 1);
754    }
755
756    #[test]
757    fn open_read_only_rejects_writes_at_sqlite_level() {
758        let dir = tempfile::tempdir().unwrap();
759        let db = dir.path().join("ro-reject.db");
760
761        // Create and close.
762        {
763            let _ = Indexer::open(&db).unwrap();
764        }
765
766        // Re-open RO and attempt a write via raw SQL — SQLite should block it.
767        let ro = Indexer::open_read_only(&db).unwrap();
768        let result = ro.connection().execute(
769            "INSERT INTO log_entries (timestamp, raw, raw_hash) VALUES ('x', 'y', 'z')",
770            [],
771        );
772        assert!(result.is_err(), "read-only connection must reject writes");
773    }
774
775    #[test]
776    fn open_read_only_rejects_update() {
777        let dir = tempfile::tempdir().unwrap();
778        let db = dir.path().join("ro-update.db");
779        {
780            let _ = Indexer::open(&db).unwrap();
781        }
782        let ro = Indexer::open_read_only(&db).unwrap();
783        let result = ro
784            .connection()
785            .execute("UPDATE log_entries SET level = 'x' WHERE 1=0", []);
786        assert!(result.is_err(), "read-only connection must reject UPDATE");
787    }
788
789    #[test]
790    fn open_read_only_rejects_delete() {
791        let dir = tempfile::tempdir().unwrap();
792        let db = dir.path().join("ro-delete.db");
793        {
794            let _ = Indexer::open(&db).unwrap();
795        }
796        let ro = Indexer::open_read_only(&db).unwrap();
797        let result = ro
798            .connection()
799            .execute("DELETE FROM log_entries WHERE 1=0", []);
800        assert!(result.is_err(), "read-only connection must reject DELETE");
801    }
802
803    #[test]
804    fn open_read_only_rejects_create_table() {
805        let dir = tempfile::tempdir().unwrap();
806        let db = dir.path().join("ro-ddl.db");
807        {
808            let _ = Indexer::open(&db).unwrap();
809        }
810        let ro = Indexer::open_read_only(&db).unwrap();
811        let result = ro
812            .connection()
813            .execute_batch("CREATE TABLE sec_test (x TEXT)");
814        assert!(
815            result.is_err(),
816            "read-only connection must reject CREATE TABLE"
817        );
818    }
819
820    #[test]
821    fn open_read_only_rejects_pragma_user_version_write() {
822        let dir = tempfile::tempdir().unwrap();
823        let db = dir.path().join("ro-pragma.db");
824        {
825            let _ = Indexer::open(&db).unwrap();
826        }
827        let ro = Indexer::open_read_only(&db).unwrap();
828        let result = ro.connection().execute_batch("PRAGMA user_version = 42");
829        assert!(
830            result.is_err(),
831            "read-only connection must reject PRAGMA writes"
832        );
833    }
834
835    #[test]
836    fn open_read_only_does_not_run_schema_migrations() {
837        // If `open_read_only` tried to CREATE IF NOT EXISTS anything, it
838        // would error against a read-only connection. Opening an empty DB
839        // that's NOT been initialized demonstrates open_read_only doesn't
840        // attempt writes of any kind.
841        let dir = tempfile::tempdir().unwrap();
842        let db = dir.path().join("bare.db");
843
844        // Create a totally empty SQLite file (no schema).
845        {
846            let c = Connection::open(&db).unwrap();
847            // Ensure the file exists without creating the log_entries table.
848            c.execute_batch("PRAGMA user_version = 0;").unwrap();
849        }
850
851        // open_read_only must succeed (no migration attempt).
852        let ro = Indexer::open_read_only(&db).expect("open ro on bare db");
853
854        // Table is absent, so a SELECT errors — proving we didn't create it.
855        let err = ro
856            .connection()
857            .query_row("SELECT COUNT(*) FROM log_entries", [], |row| {
858                row.get::<_, i64>(0)
859            });
860        assert!(err.is_err());
861    }
862
863    // -----------------------------------------------------------------
864    // prune()
865    // -----------------------------------------------------------------
866
867    #[test]
868    fn prune_deletes_entries_strictly_older_than_cutoff() {
869        let mut idx = Indexer::open_in_memory().unwrap();
870        idx.insert_batch(&[
871            make_entry("2026-04-01T00:00:00Z", "info", "old one"),
872            make_entry("2026-04-10T00:00:00Z", "info", "old two"),
873            make_entry("2026-04-20T00:00:00Z", "info", "kept"),
874        ])
875        .unwrap();
876
877        let stats = idx.prune("2026-04-15T00:00:00Z").unwrap();
878        assert_eq!(stats.deleted, 2);
879
880        let count: i64 = idx
881            .connection()
882            .query_row("SELECT COUNT(*) FROM log_entries", [], |row| row.get(0))
883            .unwrap();
884        assert_eq!(count, 1);
885
886        // The surviving row is the one newer than the cutoff.
887        let surviving: String = idx
888            .connection()
889            .query_row("SELECT message FROM log_entries", [], |row| row.get(0))
890            .unwrap();
891        assert_eq!(surviving, "kept");
892    }
893
894    #[test]
895    fn prune_keeps_entry_exactly_at_cutoff() {
896        // The comparison is strict `<`, so a row whose timestamp equals the
897        // cutoff is retained, not deleted.
898        let mut idx = Indexer::open_in_memory().unwrap();
899        idx.insert_batch(&[make_entry("2026-04-15T00:00:00Z", "info", "boundary")])
900            .unwrap();
901
902        let stats = idx.prune("2026-04-15T00:00:00Z").unwrap();
903        assert_eq!(stats.deleted, 0);
904
905        let count: i64 = idx
906            .connection()
907            .query_row("SELECT COUNT(*) FROM log_entries", [], |row| row.get(0))
908            .unwrap();
909        assert_eq!(count, 1);
910    }
911
912    #[test]
913    fn prune_on_empty_database_deletes_nothing() {
914        let mut idx = Indexer::open_in_memory().unwrap();
915        let stats = idx.prune("2026-04-15T00:00:00Z").unwrap();
916        assert_eq!(stats.deleted, 0);
917    }
918
919    #[test]
920    fn prune_with_cutoff_before_all_entries_deletes_nothing() {
921        let mut idx = Indexer::open_in_memory().unwrap();
922        idx.insert_batch(&[
923            make_entry("2026-04-20T00:00:00Z", "info", "a"),
924            make_entry("2026-04-21T00:00:00Z", "info", "b"),
925        ])
926        .unwrap();
927
928        let stats = idx.prune("2026-01-01T00:00:00Z").unwrap();
929        assert_eq!(stats.deleted, 0);
930
931        let count: i64 = idx
932            .connection()
933            .query_row("SELECT COUNT(*) FROM log_entries", [], |row| row.get(0))
934            .unwrap();
935        assert_eq!(count, 2);
936    }
937
938    #[test]
939    fn prune_with_cutoff_after_all_entries_deletes_all() {
940        let mut idx = Indexer::open_in_memory().unwrap();
941        idx.insert_batch(&[
942            make_entry("2026-04-20T00:00:00Z", "info", "a"),
943            make_entry("2026-04-21T00:00:00Z", "info", "b"),
944            make_entry("2026-04-22T00:00:00Z", "info", "c"),
945        ])
946        .unwrap();
947
948        let stats = idx.prune("2027-01-01T00:00:00Z").unwrap();
949        assert_eq!(stats.deleted, 3);
950
951        let count: i64 = idx
952            .connection()
953            .query_row("SELECT COUNT(*) FROM log_entries", [], |row| row.get(0))
954            .unwrap();
955        assert_eq!(count, 0);
956    }
957
958    #[test]
959    fn prune_returns_accurate_deleted_count() {
960        let mut idx = Indexer::open_in_memory().unwrap();
961        // Ten entries, one per day from the 1st to the 10th.
962        let entries: Vec<_> = (1..=10)
963            .map(|day| {
964                make_entry(
965                    &format!("2026-04-{day:02}T00:00:00Z"),
966                    "info",
967                    &format!("day-{day}"),
968                )
969            })
970            .collect();
971        idx.insert_batch(&entries).unwrap();
972
973        // Cutoff at the 6th deletes days 1-5 (strictly older): 5 rows.
974        let stats = idx.prune("2026-04-06T00:00:00Z").unwrap();
975        assert_eq!(stats.deleted, 5);
976
977        let count: i64 = idx
978            .connection()
979            .query_row("SELECT COUNT(*) FROM log_entries", [], |row| row.get(0))
980            .unwrap();
981        assert_eq!(count, 5);
982    }
983
984    #[test]
985    fn prune_then_stats_reflects_deletion() {
986        let mut idx = Indexer::open_in_memory().unwrap();
987        idx.insert_batch(&[
988            make_entry("2026-04-01T00:00:00Z", "info", "gone"),
989            make_entry("2026-04-20T00:00:00Z", "info", "stays"),
990        ])
991        .unwrap();
992
993        idx.prune("2026-04-10T00:00:00Z").unwrap();
994
995        let stats = idx.stats().unwrap();
996        assert_eq!(stats.entries, 1);
997        assert_eq!(stats.min_timestamp.as_deref(), Some("2026-04-20T00:00:00Z"));
998        assert_eq!(stats.max_timestamp.as_deref(), Some("2026-04-20T00:00:00Z"));
999    }
1000
1001    #[test]
1002    fn prune_works_on_disk_backed_index() {
1003        // VACUUM exercises a different code path on-disk than in-memory;
1004        // run the real on-disk path to confirm DELETE + VACUUM both succeed.
1005        let dir = tempfile::tempdir().unwrap();
1006        let db = dir.path().join("prune.db");
1007        let mut idx = Indexer::open(&db).unwrap();
1008        idx.insert_batch(&[
1009            make_entry("2026-04-01T00:00:00Z", "info", "old"),
1010            make_entry("2026-04-20T00:00:00Z", "info", "new"),
1011        ])
1012        .unwrap();
1013
1014        let stats = idx.prune("2026-04-10T00:00:00Z").unwrap();
1015        assert_eq!(stats.deleted, 1);
1016
1017        let count: i64 = idx
1018            .connection()
1019            .query_row("SELECT COUNT(*) FROM log_entries", [], |row| row.get(0))
1020            .unwrap();
1021        assert_eq!(count, 1);
1022    }
1023
1024    #[test]
1025    fn prune_one_second_boundary_deletes_only_strictly_older() {
1026        // Two rows 1 second apart; cutoff is the older one's timestamp.
1027        // Only the row strictly before the cutoff must be deleted.
1028        let mut idx = Indexer::open_in_memory().unwrap();
1029        idx.insert_batch(&[
1030            make_entry("2026-04-20T10:00:00Z", "info", "at-cutoff"),
1031            make_entry("2026-04-20T10:00:01Z", "info", "one-second-later"),
1032        ])
1033        .unwrap();
1034
1035        let stats = idx.prune("2026-04-20T10:00:00Z").unwrap();
1036        assert_eq!(
1037            stats.deleted, 0,
1038            "row at cutoff must be retained (strict <)"
1039        );
1040
1041        let stats = idx.prune("2026-04-20T10:00:01Z").unwrap();
1042        assert_eq!(
1043            stats.deleted, 1,
1044            "row strictly before the second cutoff must be deleted"
1045        );
1046    }
1047
1048    #[test]
1049    fn prune_idempotent_second_prune_with_same_cutoff_deletes_nothing() {
1050        // After the first prune removes all eligible rows, a second prune
1051        // with the same cutoff must report 0 deleted — nothing left to remove.
1052        let mut idx = Indexer::open_in_memory().unwrap();
1053        idx.insert_batch(&[
1054            make_entry("2026-04-01T00:00:00Z", "info", "old"),
1055            make_entry("2026-04-20T00:00:00Z", "info", "keep"),
1056        ])
1057        .unwrap();
1058
1059        let first = idx.prune("2026-04-10T00:00:00Z").unwrap();
1060        assert_eq!(first.deleted, 1);
1061
1062        let second = idx.prune("2026-04-10T00:00:00Z").unwrap();
1063        assert_eq!(
1064            second.deleted, 0,
1065            "re-pruning same cutoff must delete nothing"
1066        );
1067    }
1068}