Skip to main content

logdive_core/
indexer.rs

1//! SQLite-backed index for ingested log entries.
2//!
3//! This module owns the persistent storage side of logdive: schema creation,
4//! row-level deduplication via `blake3`, and batched inserts of 1000 rows per
5//! transaction (per the decisions log entry dated 2026-04-19). The schema is
6//! reproduced verbatim from the project doc's "SQLite schema" section with
7//! `IF NOT EXISTS` added so opening an existing database is idempotent.
8//!
9//! `Indexer` is an owning handle over a `rusqlite::Connection`. It can be
10//! constructed against a filesystem path via [`Indexer::open`] or against an
11//! in-memory database via [`Indexer::open_in_memory`] — the latter is used
12//! by the unit tests below and will also serve ad-hoc one-shot scenarios.
13//! For read-only consumers (the HTTP API in milestone 8), [`Indexer::
14//! open_read_only`] opens an existing database without the schema init or
15//! directory-creation side effects of [`Indexer::open`], and enforces
16//! read-only semantics at the SQLite level via `SQLITE_OPEN_READ_ONLY`.
17//!
18//! # Timestamp NOT NULL policy
19//!
20//! The schema declares `timestamp TEXT NOT NULL`, but the parser produces
21//! `LogEntry::timestamp = None` for lines that omit the key. Rather than
22//! fabricating a fallback (which would falsely anchor those rows to
23//! ingestion time and confuse `last Nh` queries), the indexer *skips* such
24//! rows and reports them in [`InsertStats::skipped_no_timestamp`]. This
25//! mirrors the parser's "graceful skip" philosophy — bad data is counted
26//! and dropped, never manufactured.
27
28use std::path::{Path, PathBuf};
29
30use rusqlite::{Connection, OpenFlags, params};
31
32use crate::entry::LogEntry;
33use crate::error::{LogdiveError, Result};
34
35/// Size of a single insert transaction, per the decisions log
36/// (2026-04-19: "batch insert per 1000 lines").
37pub const BATCH_SIZE: usize = 1000;
38
39const DEFAULT_DB_FILENAME: &str = "index.db";
40const LOGDIVE_HOME_DIRNAME: &str = ".logdive";
41
42/// Resolve the path to the index database.
43///
44/// When `override_path` is `Some`, it is used verbatim — this is what the
45/// CLI's `--db` flag wires into. Otherwise the default `~/.logdive/index.db`
46/// is returned per the "Default index location" decision in the project doc.
47///
48/// Purely functional: does not touch the filesystem.
49pub fn db_path(override_path: Option<&Path>) -> PathBuf {
50    if let Some(p) = override_path {
51        return p.to_path_buf();
52    }
53    // POSIX-centric: logdive's Phase 4 release targets are Linux and macOS,
54    // both of which expose HOME. Fall back to CWD if it is unset (containers,
55    // stripped CI environments) rather than panicking.
56    let home = std::env::var("HOME").unwrap_or_else(|_| ".".to_string());
57    PathBuf::from(home)
58        .join(LOGDIVE_HOME_DIRNAME)
59        .join(DEFAULT_DB_FILENAME)
60}
61
62/// Outcome of an insert batch, surfaced to the CLI for progress output
63/// ("lines ingested / lines skipped per second", per milestone 6).
64#[derive(Debug, Default, Clone, Copy, PartialEq, Eq)]
65pub struct InsertStats {
66    /// Rows newly added to the index.
67    pub inserted: usize,
68    /// Rows rejected by `INSERT OR IGNORE` because their `raw_hash` already
69    /// existed — the dedup path per the decisions log.
70    pub deduplicated: usize,
71    /// Rows rejected because they had no `timestamp`. See module docs.
72    pub skipped_no_timestamp: usize,
73}
74
75impl InsertStats {
76    fn extend(&mut self, other: InsertStats) {
77        self.inserted += other.inserted;
78        self.deduplicated += other.deduplicated;
79        self.skipped_no_timestamp += other.skipped_no_timestamp;
80    }
81}
82
83/// Aggregate metadata about the contents of an index.
84///
85/// Produced by [`Indexer::stats`] and consumed by the CLI `stats` subcommand
86/// (milestone 7) and the `GET /stats` HTTP endpoint (milestone 8). The shape
87/// is intentionally minimal and structural; the CLI and HTTP layers format
88/// it for human or machine consumption.
89///
90/// `tags` ordering: `None` (untagged rows) first, then non-null tag strings
91/// in ascending alphabetical order. This ordering is produced directly by
92/// SQLite (`ORDER BY tag` places NULL first in ascending order) and is not
93/// re-sorted in Rust. The CLI renders the `None` slot as "(untagged)".
94///
95/// Marked `#[non_exhaustive]` so additional summary fields (e.g. distinct
96/// level counts) can be added in later milestones without breaking the
97/// public API.
98#[derive(Debug, Clone)]
99#[non_exhaustive]
100pub struct Stats {
101    /// Total number of rows currently in the `log_entries` table.
102    pub entries: u64,
103    /// Lexically smallest `timestamp` value in the index, or `None` on an
104    /// empty database. Lexical ordering is correct for ISO-8601 timestamps;
105    /// see the "live design decisions" section of the project handoff.
106    pub min_timestamp: Option<String>,
107    /// Lexically largest `timestamp` value in the index, or `None` on an
108    /// empty database.
109    pub max_timestamp: Option<String>,
110    /// Distinct tag values observed across all rows. `None` represents rows
111    /// with no tag (SQL NULL) and — when present — is always the first
112    /// element; non-null tags follow in ascending alphabetical order.
113    pub tags: Vec<Option<String>>,
114}
115
116/// Owning handle over a SQLite connection to a logdive index.
117#[derive(Debug)]
118pub struct Indexer {
119    conn: Connection,
120}
121
122impl Indexer {
123    /// Open (or create) a logdive index at `path`.
124    ///
125    /// Creates the parent directory if it does not already exist, opens the
126    /// SQLite database, and runs idempotent schema migrations.
127    pub fn open(path: &Path) -> Result<Self> {
128        ensure_parent_dir(path)?;
129        let conn = Connection::open(path)?;
130        init_schema(&conn)?;
131        Ok(Self { conn })
132    }
133
134    /// Open an in-memory index. Used by tests; also usable for one-shot
135    /// scenarios that don't need persistence.
136    pub fn open_in_memory() -> Result<Self> {
137        let conn = Connection::open_in_memory()?;
138        init_schema(&conn)?;
139        Ok(Self { conn })
140    }
141
142    /// Open an existing logdive index at `path` in read-only mode.
143    ///
144    /// Unlike [`Indexer::open`], this method:
145    ///   1. Does **not** create the database file if it is missing (the
146    ///      `SQLITE_OPEN_READ_ONLY` flag fails rather than creates),
147    ///   2. Does **not** create the parent directory,
148    ///   3. Does **not** run schema migrations — the caller is promising
149    ///      that `path` already points at a valid logdive index.
150    ///
151    /// Enforcement of read-only semantics is at the SQLite level: any
152    /// attempted write through the returned connection raises a runtime
153    /// error. This is defense-in-depth for the HTTP API (milestone 8),
154    /// whose surface is exclusively read.
155    pub fn open_read_only(path: &Path) -> Result<Self> {
156        // `SQLITE_OPEN_URI` is included because it's the safe default
157        // documented by rusqlite; it only affects parsing of `file:...`
158        // URIs, which we never pass in.
159        let flags = OpenFlags::SQLITE_OPEN_READ_ONLY | OpenFlags::SQLITE_OPEN_URI;
160        let conn = Connection::open_with_flags(path, flags)?;
161        Ok(Self { conn })
162    }
163
164    /// Borrow the underlying connection.
165    ///
166    /// Exposed so the query executor can run reads without an extra
167    /// abstraction layer. Read-only borrow keeps ingestion and querying
168    /// from contending over `&mut`.
169    pub fn connection(&self) -> &Connection {
170        &self.conn
171    }
172
173    /// Insert a slice of entries into the index, chunking internally into
174    /// transactions of [`BATCH_SIZE`] rows each.
175    ///
176    /// Returns aggregate stats across all chunks. Entry ordering within
177    /// the index is not guaranteed.
178    pub fn insert_batch(&mut self, entries: &[LogEntry]) -> Result<InsertStats> {
179        let mut total = InsertStats::default();
180        for chunk in entries.chunks(BATCH_SIZE) {
181            let stats = insert_one_chunk(&mut self.conn, chunk)?;
182            total.extend(stats);
183        }
184        Ok(total)
185    }
186
187    /// Read aggregate metadata about the index.
188    ///
189    /// Runs three read-only queries:
190    /// 1. `COUNT(*)` for the row count,
191    /// 2. `MIN(timestamp), MAX(timestamp)` for the time range,
192    /// 3. `SELECT DISTINCT tag ... ORDER BY tag` for the tag list.
193    ///
194    /// On an empty database, returns `entries = 0`, both timestamp bounds
195    /// as `None`, and an empty `tags` vector — not an error.
196    pub fn stats(&self) -> Result<Stats> {
197        // COUNT(*) is always non-negative; cast i64 → u64 is well-defined.
198        let entries_i64: i64 =
199            self.conn
200                .query_row("SELECT COUNT(*) FROM log_entries", [], |row| row.get(0))?;
201        let entries = entries_i64 as u64;
202
203        // Aggregates without GROUP BY always yield exactly one row; MIN/MAX
204        // on an empty table return (NULL, NULL), which maps cleanly to
205        // (None, None) via rusqlite's Option<T> FromSql impl.
206        let (min_timestamp, max_timestamp): (Option<String>, Option<String>) =
207            self.conn.query_row(
208                "SELECT MIN(timestamp), MAX(timestamp) FROM log_entries",
209                [],
210                |row| Ok((row.get(0)?, row.get(1)?)),
211            )?;
212
213        // SQLite's `ORDER BY tag` (default ascending) places NULLs first,
214        // which is exactly the ordering contract advertised on `Stats.tags`.
215        let mut stmt = self
216            .conn
217            .prepare("SELECT DISTINCT tag FROM log_entries ORDER BY tag")?;
218        let rows = stmt.query_map([], |row| row.get::<_, Option<String>>(0))?;
219        let mut tags: Vec<Option<String>> = Vec::new();
220        for row in rows {
221            tags.push(row?);
222        }
223
224        Ok(Stats {
225            entries,
226            min_timestamp,
227            max_timestamp,
228            tags,
229        })
230    }
231}
232
233// ---------------------------------------------------------------------------
234// Internals
235// ---------------------------------------------------------------------------
236
237fn ensure_parent_dir(path: &Path) -> Result<()> {
238    let Some(parent) = path.parent() else {
239        return Ok(());
240    };
241    if parent.as_os_str().is_empty() {
242        // Relative filename with no directory component ("index.db").
243        return Ok(());
244    }
245    std::fs::create_dir_all(parent).map_err(|io_err| LogdiveError::io_at(parent, io_err))
246}
247
248fn init_schema(conn: &Connection) -> Result<()> {
249    // Schema taken verbatim from the project doc's "SQLite schema" section,
250    // with `IF NOT EXISTS` added on every statement so open() is idempotent.
251    conn.execute_batch(
252        "CREATE TABLE IF NOT EXISTS log_entries (
253            id          INTEGER PRIMARY KEY AUTOINCREMENT,
254            timestamp   TEXT NOT NULL,
255            level       TEXT,
256            message     TEXT,
257            tag         TEXT,
258            fields      TEXT,
259            raw         TEXT NOT NULL,
260            raw_hash    TEXT NOT NULL UNIQUE,
261            ingested_at TEXT NOT NULL DEFAULT (datetime('now'))
262        );
263        CREATE INDEX IF NOT EXISTS idx_level     ON log_entries(level);
264        CREATE INDEX IF NOT EXISTS idx_tag       ON log_entries(tag);
265        CREATE INDEX IF NOT EXISTS idx_timestamp ON log_entries(timestamp);",
266    )?;
267    Ok(())
268}
269
270fn insert_one_chunk(conn: &mut Connection, entries: &[LogEntry]) -> Result<InsertStats> {
271    let tx = conn.transaction()?;
272    let mut stats = InsertStats::default();
273
274    {
275        let mut stmt = tx.prepare(
276            "INSERT OR IGNORE INTO log_entries
277             (timestamp, level, message, tag, fields, raw, raw_hash)
278             VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)",
279        )?;
280
281        for entry in entries {
282            // NOT NULL enforcement — see module-level docs.
283            let Some(ref ts) = entry.timestamp else {
284                stats.skipped_no_timestamp += 1;
285                continue;
286            };
287
288            // Serializing a `Map<String, Value>` via serde_json is infallible:
289            // every `Value` variant has a defined JSON representation.
290            let fields_json = serde_json::to_string(&entry.fields)
291                .expect("serializing serde_json::Map<String, Value> is infallible");
292            let raw_hash = blake3::hash(entry.raw.as_bytes()).to_hex().to_string();
293
294            let changes = stmt.execute(params![
295                ts,
296                entry.level,
297                entry.message,
298                entry.tag,
299                fields_json,
300                entry.raw,
301                raw_hash,
302            ])?;
303
304            if changes == 0 {
305                stats.deduplicated += 1;
306            } else {
307                stats.inserted += 1;
308            }
309        }
310    }
311
312    tx.commit()?;
313    Ok(stats)
314}
315
316// ---------------------------------------------------------------------------
317// Tests
318// ---------------------------------------------------------------------------
319
320#[cfg(test)]
321mod tests {
322    use super::*;
323    use serde_json::json;
324
325    /// Build a LogEntry whose `raw` is unique per input tuple, guaranteeing
326    /// a distinct `raw_hash` across calls (critical for the chunking test
327    /// where we insert thousands of entries).
328    fn make_entry(ts: &str, level: &str, message: &str) -> LogEntry {
329        let raw = format!(r#"{{"timestamp":"{ts}","level":"{level}","message":"{message}"}}"#);
330        let mut e = LogEntry::new(raw);
331        e.timestamp = Some(ts.to_string());
332        e.level = Some(level.to_string());
333        e.message = Some(message.to_string());
334        e
335    }
336
337    #[test]
338    fn open_in_memory_creates_table_and_three_indexes() {
339        let idx = Indexer::open_in_memory().expect("open in-memory");
340        let table_count: i64 = idx
341            .connection()
342            .query_row(
343                "SELECT COUNT(*) FROM sqlite_master \
344                 WHERE type='table' AND name='log_entries'",
345                [],
346                |row| row.get(0),
347            )
348            .unwrap();
349        assert_eq!(table_count, 1);
350
351        let index_count: i64 = idx
352            .connection()
353            .query_row(
354                "SELECT COUNT(*) FROM sqlite_master \
355                 WHERE type='index' AND name IN ('idx_level','idx_tag','idx_timestamp')",
356                [],
357                |row| row.get(0),
358            )
359            .unwrap();
360        assert_eq!(index_count, 3);
361    }
362
363    #[test]
364    fn insert_batch_adds_rows_and_reports_stats() {
365        let mut idx = Indexer::open_in_memory().unwrap();
366        let entries = vec![
367            make_entry("2026-04-20T10:00:00Z", "info", "one"),
368            make_entry("2026-04-20T10:00:01Z", "error", "two"),
369        ];
370        let stats = idx.insert_batch(&entries).unwrap();
371
372        assert_eq!(stats.inserted, 2);
373        assert_eq!(stats.deduplicated, 0);
374        assert_eq!(stats.skipped_no_timestamp, 0);
375
376        let count: i64 = idx
377            .connection()
378            .query_row("SELECT COUNT(*) FROM log_entries", [], |row| row.get(0))
379            .unwrap();
380        assert_eq!(count, 2);
381    }
382
383    #[test]
384    fn reinsert_is_deduplicated_by_raw_hash() {
385        let mut idx = Indexer::open_in_memory().unwrap();
386        let entries = vec![make_entry("2026-04-20T10:00:00Z", "info", "hello")];
387
388        let first = idx.insert_batch(&entries).unwrap();
389        assert_eq!(first.inserted, 1);
390        assert_eq!(first.deduplicated, 0);
391
392        let second = idx.insert_batch(&entries).unwrap();
393        assert_eq!(second.inserted, 0);
394        assert_eq!(second.deduplicated, 1);
395
396        let count: i64 = idx
397            .connection()
398            .query_row("SELECT COUNT(*) FROM log_entries", [], |row| row.get(0))
399            .unwrap();
400        assert_eq!(count, 1);
401    }
402
403    #[test]
404    fn entries_without_timestamp_are_skipped_not_fabricated() {
405        let mut idx = Indexer::open_in_memory().unwrap();
406        let mut no_ts = LogEntry::new(r#"{"level":"info"}"#);
407        no_ts.level = Some("info".to_string());
408
409        let stats = idx.insert_batch(&[no_ts]).unwrap();
410        assert_eq!(stats.inserted, 0);
411        assert_eq!(stats.skipped_no_timestamp, 1);
412
413        let count: i64 = idx
414            .connection()
415            .query_row("SELECT COUNT(*) FROM log_entries", [], |row| row.get(0))
416            .unwrap();
417        assert_eq!(count, 0);
418    }
419
420    #[test]
421    fn mixed_batch_counts_each_outcome_category() {
422        let mut idx = Indexer::open_in_memory().unwrap();
423        idx.insert_batch(&[make_entry("2026-04-20T10:00:00Z", "info", "first")])
424            .unwrap();
425
426        let mut no_ts = LogEntry::new(r#"{"level":"warn"}"#);
427        no_ts.level = Some("warn".to_string());
428
429        let mixed = vec![
430            make_entry("2026-04-20T10:00:00Z", "info", "first"),
431            make_entry("2026-04-20T10:00:05Z", "error", "second"),
432            no_ts,
433        ];
434        let stats = idx.insert_batch(&mixed).unwrap();
435        assert_eq!(stats.inserted, 1);
436        assert_eq!(stats.deduplicated, 1);
437        assert_eq!(stats.skipped_no_timestamp, 1);
438    }
439
440    #[test]
441    fn fields_are_stored_as_json_queryable_via_json_extract() {
442        let mut idx = Indexer::open_in_memory().unwrap();
443        let mut e = make_entry("2026-04-20T10:00:00Z", "info", "hi");
444        e.fields.insert("service".to_string(), json!("payments"));
445        e.fields.insert("req_id".to_string(), json!(42));
446        idx.insert_batch(&[e]).unwrap();
447
448        let service: String = idx
449            .connection()
450            .query_row(
451                "SELECT json_extract(fields, '$.service') FROM log_entries",
452                [],
453                |row| row.get(0),
454            )
455            .unwrap();
456        assert_eq!(service, "payments");
457
458        let req_id: i64 = idx
459            .connection()
460            .query_row(
461                "SELECT json_extract(fields, '$.req_id') FROM log_entries",
462                [],
463                |row| row.get(0),
464            )
465            .unwrap();
466        assert_eq!(req_id, 42);
467    }
468
469    #[test]
470    fn empty_fields_round_trip_as_empty_json_object_not_null() {
471        let mut idx = Indexer::open_in_memory().unwrap();
472        idx.insert_batch(&[make_entry("2026-04-20T10:00:00Z", "info", "x")])
473            .unwrap();
474
475        let stored: String = idx
476            .connection()
477            .query_row("SELECT fields FROM log_entries", [], |row| row.get(0))
478            .unwrap();
479        assert_eq!(stored, "{}");
480    }
481
482    #[test]
483    fn raw_hash_is_a_64_char_hex_blake3_digest() {
484        let mut idx = Indexer::open_in_memory().unwrap();
485        idx.insert_batch(&[make_entry("2026-04-20T10:00:00Z", "info", "hash me")])
486            .unwrap();
487
488        let stored_hash: String = idx
489            .connection()
490            .query_row("SELECT raw_hash FROM log_entries", [], |row| row.get(0))
491            .unwrap();
492        assert_eq!(stored_hash.len(), 64);
493        assert!(stored_hash.chars().all(|c| c.is_ascii_hexdigit()));
494    }
495
496    #[test]
497    fn chunking_handles_batches_larger_than_batch_size() {
498        let mut idx = Indexer::open_in_memory().unwrap();
499        let total = BATCH_SIZE + 337;
500        let entries: Vec<_> = (0..total)
501            .map(|i| make_entry("2026-04-20T10:00:00Z", "info", &format!("message-{i}")))
502            .collect();
503
504        let stats = idx.insert_batch(&entries).unwrap();
505        assert_eq!(stats.inserted, total);
506        assert_eq!(stats.deduplicated, 0);
507
508        let count: i64 = idx
509            .connection()
510            .query_row("SELECT COUNT(*) FROM log_entries", [], |row| row.get(0))
511            .unwrap();
512        assert_eq!(count, total as i64);
513    }
514
515    #[test]
516    fn db_path_returns_override_verbatim() {
517        let p = Path::new("/tmp/logdive-test/override.db");
518        assert_eq!(
519            db_path(Some(p)),
520            PathBuf::from("/tmp/logdive-test/override.db")
521        );
522    }
523
524    #[test]
525    fn db_path_default_ends_with_standard_location() {
526        let default = db_path(None);
527        assert!(default.ends_with(".logdive/index.db"));
528    }
529
530    #[test]
531    fn open_creates_parent_directory_and_is_idempotent_across_opens() {
532        let dir = tempfile::tempdir().unwrap();
533        let db = dir.path().join("sub").join("dir").join("index.db");
534
535        {
536            let mut idx = Indexer::open(&db).expect("first open");
537            idx.insert_batch(&[make_entry("2026-04-20T10:00:00Z", "info", "persist me")])
538                .unwrap();
539        }
540
541        {
542            let idx = Indexer::open(&db).expect("second open");
543            let count: i64 = idx
544                .connection()
545                .query_row("SELECT COUNT(*) FROM log_entries", [], |row| row.get(0))
546                .unwrap();
547            assert_eq!(count, 1);
548        }
549    }
550
551    #[test]
552    fn io_error_variant_attaches_parent_path() {
553        // If the parent directory cannot be created (e.g. because it lives
554        // under a regular file), we should get LogdiveError::Io with the
555        // offending path, not a SqliteFailure.
556        let dir = tempfile::tempdir().unwrap();
557        let blocker = dir.path().join("blocker");
558        std::fs::write(&blocker, b"not a directory").unwrap();
559        let bad_db = blocker.join("child").join("index.db");
560
561        let err = Indexer::open(&bad_db).unwrap_err();
562        match err {
563            LogdiveError::Io { path, .. } => {
564                assert!(path.starts_with(dir.path()));
565            }
566            other => panic!("expected Io variant, got {other:?}"),
567        }
568    }
569
570    // -----------------------------------------------------------------
571    // stats()
572    // -----------------------------------------------------------------
573
574    #[test]
575    fn stats_empty_database_returns_zeroed_values() {
576        let idx = Indexer::open_in_memory().unwrap();
577        let stats = idx.stats().unwrap();
578
579        assert_eq!(stats.entries, 0);
580        assert_eq!(stats.min_timestamp, None);
581        assert_eq!(stats.max_timestamp, None);
582        assert!(stats.tags.is_empty());
583    }
584
585    #[test]
586    fn stats_counts_entries() {
587        let mut idx = Indexer::open_in_memory().unwrap();
588        let entries: Vec<_> = (0..5)
589            .map(|i| make_entry("2026-04-20T10:00:00Z", "info", &format!("msg-{i}")))
590            .collect();
591        idx.insert_batch(&entries).unwrap();
592
593        let stats = idx.stats().unwrap();
594        assert_eq!(stats.entries, 5);
595    }
596
597    #[test]
598    fn stats_timestamp_range_uses_lexical_min_and_max() {
599        let mut idx = Indexer::open_in_memory().unwrap();
600        // Insert intentionally out-of-order to confirm MIN/MAX, not insertion
601        // order, drives the bounds.
602        idx.insert_batch(&[
603            make_entry("2026-04-22T15:30:00Z", "error", "second"),
604            make_entry("2026-04-20T10:00:00Z", "info", "first"),
605            make_entry("2026-04-21T12:00:00Z", "warn", "third"),
606        ])
607        .unwrap();
608
609        let stats = idx.stats().unwrap();
610        assert_eq!(stats.min_timestamp.as_deref(), Some("2026-04-20T10:00:00Z"));
611        assert_eq!(stats.max_timestamp.as_deref(), Some("2026-04-22T15:30:00Z"));
612    }
613
614    #[test]
615    fn stats_distinct_tags_place_untagged_first_then_alphabetical() {
616        let mut idx = Indexer::open_in_memory().unwrap();
617
618        // One untagged row.
619        let untagged = make_entry("2026-04-20T10:00:00Z", "info", "untagged-msg");
620
621        // Two distinct rows sharing tag "api" — must collapse via DISTINCT.
622        let mut api1 = make_entry("2026-04-20T10:00:01Z", "info", "api-msg-1");
623        api1.tag = Some("api".to_string());
624        let mut api2 = make_entry("2026-04-20T10:00:02Z", "info", "api-msg-2");
625        api2.tag = Some("api".to_string());
626
627        // One row with tag "payments".
628        let mut payments = make_entry("2026-04-20T10:00:03Z", "info", "payments-msg");
629        payments.tag = Some("payments".to_string());
630
631        idx.insert_batch(&[untagged, api1, api2, payments]).unwrap();
632
633        let stats = idx.stats().unwrap();
634        assert_eq!(stats.tags.len(), 3);
635        // NULL comes first in SQLite's ascending sort.
636        assert_eq!(stats.tags[0], None);
637        assert_eq!(stats.tags[1], Some("api".to_string()));
638        assert_eq!(stats.tags[2], Some("payments".to_string()));
639    }
640
641    #[test]
642    fn stats_entries_count_respects_dedup() {
643        let mut idx = Indexer::open_in_memory().unwrap();
644        // Two batches of the same entry — second is deduplicated away.
645        idx.insert_batch(&[make_entry("2026-04-20T10:00:00Z", "info", "dup")])
646            .unwrap();
647        idx.insert_batch(&[make_entry("2026-04-20T10:00:00Z", "info", "dup")])
648            .unwrap();
649
650        let stats = idx.stats().unwrap();
651        assert_eq!(stats.entries, 1);
652    }
653
654    #[test]
655    fn stats_entries_count_excludes_timestamp_less_entries() {
656        let mut idx = Indexer::open_in_memory().unwrap();
657
658        let mut no_ts = LogEntry::new(r#"{"level":"info"}"#);
659        no_ts.level = Some("info".to_string());
660
661        idx.insert_batch(&[make_entry("2026-04-20T10:00:00Z", "info", "present"), no_ts])
662            .unwrap();
663
664        let stats = idx.stats().unwrap();
665        assert_eq!(stats.entries, 1);
666    }
667
668    // -----------------------------------------------------------------
669    // open_read_only()
670    // -----------------------------------------------------------------
671
672    #[test]
673    fn open_read_only_errors_when_file_is_missing() {
674        let dir = tempfile::tempdir().unwrap();
675        let missing = dir.path().join("does-not-exist.db");
676        let err = Indexer::open_read_only(&missing).unwrap_err();
677        // SQLite returns "unable to open database file" for missing paths in
678        // read-only mode; surfaced through `LogdiveError::Sqlite`.
679        assert!(matches!(err, LogdiveError::Sqlite(_)));
680    }
681
682    #[test]
683    fn open_read_only_can_read_existing_rows() {
684        let dir = tempfile::tempdir().unwrap();
685        let db = dir.path().join("ro.db");
686
687        // Populate via the writable opener.
688        {
689            let mut idx = Indexer::open(&db).unwrap();
690            idx.insert_batch(&[make_entry("2026-04-20T10:00:00Z", "info", "visible")])
691                .unwrap();
692        }
693
694        // Re-open read-only and read back.
695        let ro = Indexer::open_read_only(&db).unwrap();
696        let count: i64 = ro
697            .connection()
698            .query_row("SELECT COUNT(*) FROM log_entries", [], |row| row.get(0))
699            .unwrap();
700        assert_eq!(count, 1);
701
702        let stats = ro.stats().unwrap();
703        assert_eq!(stats.entries, 1);
704    }
705
706    #[test]
707    fn open_read_only_rejects_writes_at_sqlite_level() {
708        let dir = tempfile::tempdir().unwrap();
709        let db = dir.path().join("ro-reject.db");
710
711        // Create and close.
712        {
713            let _ = Indexer::open(&db).unwrap();
714        }
715
716        // Re-open RO and attempt a write via raw SQL — SQLite should block it.
717        let ro = Indexer::open_read_only(&db).unwrap();
718        let result = ro.connection().execute(
719            "INSERT INTO log_entries (timestamp, raw, raw_hash) VALUES ('x', 'y', 'z')",
720            [],
721        );
722        assert!(result.is_err(), "read-only connection must reject writes");
723    }
724
725    #[test]
726    fn open_read_only_does_not_run_schema_migrations() {
727        // If `open_read_only` tried to CREATE IF NOT EXISTS anything, it
728        // would error against a read-only connection. Opening an empty DB
729        // that's NOT been initialized demonstrates open_read_only doesn't
730        // attempt writes of any kind.
731        let dir = tempfile::tempdir().unwrap();
732        let db = dir.path().join("bare.db");
733
734        // Create a totally empty SQLite file (no schema).
735        {
736            let c = Connection::open(&db).unwrap();
737            // Ensure the file exists without creating the log_entries table.
738            c.execute_batch("PRAGMA user_version = 0;").unwrap();
739        }
740
741        // open_read_only must succeed (no migration attempt).
742        let ro = Indexer::open_read_only(&db).expect("open ro on bare db");
743
744        // Table is absent, so a SELECT errors — proving we didn't create it.
745        let err = ro
746            .connection()
747            .query_row("SELECT COUNT(*) FROM log_entries", [], |row| {
748                row.get::<_, i64>(0)
749            });
750        assert!(err.is_err());
751    }
752}