trusty-search 0.3.65

Machine-wide hybrid code search service: BM25 + vector + KG, zero cold-start, MCP server
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
//! redb-backed durable chunk corpus (issue #28).
//!
//! Why: prior to this module the chunk corpus was persisted as a single
//! `chunks.json` file rewritten in full after every committed batch. On a
//! 200k-chunk corpus that JSON blob is ~400 MB; serializing it on every batch
//! commit (a reindex emits one commit per 128 files) caused the
//! memory-explosion documented in `PersistState` and forced a full re-read of
//! the entire file into a `HashMap` on every daemon restart. redb gives us:
//!   * crash-safe, atomic per-batch commits (no half-written file window),
//!   * O(batch) incremental writes instead of O(corpus) full rewrites,
//!   * the option to stream chunks back at startup without holding two copies
//!     (the JSON `Vec<RawChunk>` plus the live `HashMap`) in RAM at once.
//!
//! What: [`CorpusStore`] wraps a `redb::Database` with two tables — one keyed
//! by `chunk_id` holding the serialized [`RawChunk`], one keyed by file path
//! holding the serialized per-file [`RawEntity`] list. Values are serialized
//! with `serde_json` (already a workspace dependency; no new crate, and the
//! human-readable form keeps `redb` dumps debuggable).
//!
//! Test: see the `tests` submodule — `roundtrip` writes chunks + entities and
//! reads them back into a fresh store; `missing_db_is_empty` covers the
//! first-run / post-upgrade fallback; `delete_removes_chunk` covers eviction.

use std::path::Path;

use anyhow::{Context, Result};
use redb::{Database, ReadableTable, ReadableTableMetadata, TableDefinition};

use crate::core::chunker::RawChunk;
use crate::core::entity::RawEntity;

/// Application-level page cache size for the redb corpus database.
///
/// Why (issue #29): redb's default application cache is 1 GiB. On a host
/// indexing a large monorepo the on-disk `index.redb` corpus reaches ~14 GB,
/// so the default cache holds well under 10% of the file — every search
/// query that point-reads chunk text outside that window pays a disk read.
/// trusty-search's `start` command already hard-requires ≥16 GB RAM, and the
/// reference deployment host has 128 GB, so a 16 GiB cache keeps the hot
/// working set resident without risking memory pressure. redb treats this as
/// a ceiling, not a reservation — pages are only cached as they are touched,
/// so smaller corpora never pay the full 16 GiB.
/// What: 16 GiB expressed in bytes, passed to `Database::builder().set_cache_size`.
/// Test: side-effect-only tuning of the redb cache; correctness is unaffected
/// (covered by the existing `tests` submodule round-trips).
const REDB_CACHE_SIZE_BYTES: usize = 16 * 1024 * 1024 * 1024;

/// redb table holding the serialized chunk corpus, keyed by `chunk_id`.
///
/// Why: `chunk_id` (`"{path}:{start}:{end}"`) is the corpus's natural primary
/// key — it is collision-safe and is exactly what the in-memory `HashMap` is
/// keyed by, so a redb row maps 1:1 onto a `HashMap` entry.
/// What: `&str → &[u8]` where the value is `serde_json`-encoded [`RawChunk`].
const CHUNKS_TABLE: TableDefinition<&str, &[u8]> = TableDefinition::new("chunks");

/// redb table holding the per-file entity lists, keyed by file path.
///
/// Why: `entities` are needed to rebuild the symbol graph on warm-boot and are
/// derived per file, so the file path is the natural key.
/// What: `&str → &[u8]` where the value is `serde_json`-encoded
/// `Vec<RawEntity>`.
const ENTITIES_TABLE: TableDefinition<&str, &[u8]> = TableDefinition::new("entities");

/// Durable, redb-backed store for an index's chunk corpus + entity lists.
///
/// Why: see module docs — replaces the full-rewrite `chunks.json` snapshot
/// with an embedded transactional KV store so per-batch commits are O(batch)
/// and crash-safe.
/// What: owns a `redb::Database`; exposes batched upsert, full enumeration,
/// per-id/per-file deletion, and a count. Every mutating call is its own redb
/// write transaction, so a crash between calls never leaves a torn corpus.
/// Test: covered by the `tests` submodule.
pub struct CorpusStore {
    db: Database,
    /// Filesystem path the `db` was opened at. Retained so the atomic
    /// `--force` reindex swap (issue #28, Phase 4) knows which file to rename
    /// without the caller having to pass the path back in.
    path: std::path::PathBuf,
}

impl CorpusStore {
    /// Open (creating if absent) the redb database at `path`.
    ///
    /// Why: the daemon resolves one `index.redb` per index under its data dir;
    /// opening here is the single entry point so table-creation and the
    /// create-if-missing semantics live in one place.
    /// What: opens the database via `Database::builder()` with a 16 GiB
    /// application cache ([`REDB_CACHE_SIZE_BYTES`], issue #29), then runs a
    /// no-op write transaction that `open_table`s both tables so they exist
    /// before any reader runs (redb requires a table to have been created in a
    /// committed write txn before it can be opened read-only). This single
    /// builder call is the only place a corpus `redb::Database` is opened, so
    /// the cache size applies to the live `index.redb` and the `--force`
    /// staging `index.redb.tmp` alike (`open_fresh` delegates here).
    /// Test: `roundtrip` and `missing_db_is_empty` both exercise `open`.
    pub fn open(path: &Path) -> Result<Self> {
        if let Some(parent) = path.parent() {
            std::fs::create_dir_all(parent)
                .with_context(|| format!("create parent of {}", path.display()))?;
        }
        let db = Database::builder()
            .set_cache_size(REDB_CACHE_SIZE_BYTES)
            .create(path)
            .with_context(|| format!("open redb corpus at {}", path.display()))?;
        // Materialize both tables in a committed write txn so later read-only
        // transactions can `open_table` them even on a brand-new database.
        {
            let txn = db.begin_write().context("begin corpus init txn")?;
            {
                txn.open_table(CHUNKS_TABLE).context("init chunks table")?;
                txn.open_table(ENTITIES_TABLE)
                    .context("init entities table")?;
            }
            txn.commit().context("commit corpus init txn")?;
        }
        Ok(Self {
            db,
            path: path.to_path_buf(),
        })
    }

    /// Open a fresh (truncated) redb corpus at `path`, discarding any existing
    /// file first.
    ///
    /// Why: the `--force` reindex (issue #28, Phase 4) stages the rebuilt
    /// corpus in `index.redb.tmp`. A stale `.tmp` left behind by a previously
    /// aborted reindex must not contribute pre-existing rows to the new staged
    /// corpus — the staged file must reflect *only* this reindex's output so
    /// the post-reindex atomic rename produces a corpus identical to a clean
    /// rebuild.
    /// What: best-effort removes any file already at `path`, then delegates to
    /// [`Self::open`]. A `NotFound` removal error is ignored (nothing to
    /// clear); any other removal error is surfaced.
    /// Test: `tests::test_force_reindex_atomic_corpus_swap`.
    pub fn open_fresh(path: &Path) -> Result<Self> {
        match std::fs::remove_file(path) {
            Ok(()) => {}
            Err(e) if e.kind() == std::io::ErrorKind::NotFound => {}
            Err(e) => {
                return Err(e)
                    .with_context(|| format!("clear stale staging corpus at {}", path.display()))
            }
        }
        Self::open(path)
    }

    /// Filesystem path this store's database was opened at.
    ///
    /// Why: the atomic `--force` reindex swap needs to know the staging file's
    /// path to rename it over the live `index.redb`, and the caller would
    /// otherwise have to thread the path alongside every `Arc<CorpusStore>`.
    /// What: returns the stored `PathBuf`.
    /// Test: `tests::test_force_reindex_atomic_corpus_swap` asserts the path.
    pub fn path(&self) -> &Path {
        &self.path
    }

    /// Upsert a batch of chunks in a single redb write transaction.
    ///
    /// Why: a batch commit (`commit_parsed_batch`) lands up to a few hundred
    /// chunks at once. One transaction per batch keeps the write amplification
    /// proportional to the batch size, not the whole corpus, and makes the
    /// batch atomic — a crash mid-commit rolls the whole batch back.
    /// What: serializes each [`RawChunk`] with `serde_json` and inserts it
    /// under its `id`. Existing ids are overwritten (upsert semantics).
    /// Test: `roundtrip` writes then reads; `delete_removes_chunk` re-upserts.
    pub fn upsert_chunks(&self, chunks: &[RawChunk]) -> Result<()> {
        if chunks.is_empty() {
            return Ok(());
        }
        let txn = self.db.begin_write().context("begin chunk upsert txn")?;
        {
            let mut table = txn.open_table(CHUNKS_TABLE)?;
            for chunk in chunks {
                let bytes = serde_json::to_vec(chunk)
                    .with_context(|| format!("serialize chunk {}", chunk.id))?;
                table
                    .insert(chunk.id.as_str(), bytes.as_slice())
                    .with_context(|| format!("insert chunk {}", chunk.id))?;
            }
        }
        txn.commit().context("commit chunk upsert txn")?;
        Ok(())
    }

    /// Upsert a batch of per-file entity lists in a single write transaction.
    ///
    /// Why: entity lists are committed alongside chunks; sharing the same
    /// one-txn-per-batch discipline keeps both tables consistent on a crash.
    /// What: serializes each `Vec<RawEntity>` and inserts it under its file
    /// path key.
    /// Test: `roundtrip` exercises this alongside `upsert_chunks`.
    pub fn upsert_entities(&self, entities: &[(String, Vec<RawEntity>)]) -> Result<()> {
        if entities.is_empty() {
            return Ok(());
        }
        let txn = self.db.begin_write().context("begin entity upsert txn")?;
        {
            let mut table = txn.open_table(ENTITIES_TABLE)?;
            for (file, ents) in entities {
                let bytes = serde_json::to_vec(ents)
                    .with_context(|| format!("serialize entities for {file}"))?;
                table
                    .insert(file.as_str(), bytes.as_slice())
                    .with_context(|| format!("insert entities for {file}"))?;
            }
        }
        txn.commit().context("commit entity upsert txn")?;
        Ok(())
    }

    /// Upsert a batch of chunks **and** their per-file entity lists in a
    /// single redb write transaction (issue #29).
    ///
    /// Why: `upsert_chunks` and `upsert_entities` each opened their own
    /// `begin_write()` transaction. A crash (or SIGTERM) landing between the
    /// two commits left the chunk corpus and the symbol-graph entity table
    /// inconsistent — a warm-boot would rehydrate chunks that the entity table
    /// no longer described, or vice versa. Folding both tables into one
    /// transaction makes the whole batch (chunks + entities) atomic: a crash
    /// either rolls back the entire batch or commits all of it.
    /// What: opens one write transaction, inserts every [`RawChunk`] into
    /// `CHUNKS_TABLE` and every per-file `Vec<RawEntity>` into `ENTITIES_TABLE`
    /// under that transaction, then commits once. Both table handles are
    /// dropped (inner scope closed) before `commit()` — redb requires every
    /// table opened in a write txn to be dropped before the txn can commit.
    /// Empty inputs on **both** sides are a no-op (no transaction opened); a
    /// non-empty input on either side still writes the other table even when
    /// it is empty, so callers get one consistent commit point.
    /// Test: `batch_upsert_is_atomic_roundtrip` writes chunks + entities via
    /// this method and reads them back from a reopened store.
    pub fn upsert_batch(
        &self,
        chunks: &[RawChunk],
        entities: &[(String, Vec<RawEntity>)],
    ) -> Result<()> {
        if chunks.is_empty() && entities.is_empty() {
            return Ok(());
        }
        let txn = self.db.begin_write().context("begin batch upsert txn")?;
        {
            // Single atomic transaction covering both tables. Table handles
            // live only inside this scope so they are dropped before commit.
            let mut chunks_tbl = txn
                .open_table(CHUNKS_TABLE)
                .context("open chunks table for batch upsert")?;
            for chunk in chunks {
                let bytes = serde_json::to_vec(chunk)
                    .with_context(|| format!("serialize chunk {}", chunk.id))?;
                chunks_tbl
                    .insert(chunk.id.as_str(), bytes.as_slice())
                    .with_context(|| format!("insert chunk {}", chunk.id))?;
            }
            let mut entities_tbl = txn
                .open_table(ENTITIES_TABLE)
                .context("open entities table for batch upsert")?;
            for (file, ents) in entities {
                let bytes = serde_json::to_vec(ents)
                    .with_context(|| format!("serialize entities for {file}"))?;
                entities_tbl
                    .insert(file.as_str(), bytes.as_slice())
                    .with_context(|| format!("insert entities for {file}"))?;
            }
        }
        txn.commit().context("commit batch upsert txn")?;
        Ok(())
    }

    /// Delete a set of chunk ids in one write transaction.
    ///
    /// Why: `remove_file` / `remove_chunk` must evict from the durable store
    /// too, or a restart would resurrect deleted chunks.
    /// What: removes each id from `CHUNKS_TABLE`; unknown ids are a silent
    /// no-op (idempotent delete), matching the in-memory `HashMap::remove`.
    /// Test: `delete_removes_chunk`.
    pub fn delete_chunks(&self, ids: &[String]) -> Result<()> {
        if ids.is_empty() {
            return Ok(());
        }
        let txn = self.db.begin_write().context("begin chunk delete txn")?;
        {
            let mut table = txn.open_table(CHUNKS_TABLE)?;
            for id in ids {
                table
                    .remove(id.as_str())
                    .with_context(|| format!("delete chunk {id}"))?;
            }
        }
        txn.commit().context("commit chunk delete txn")?;
        Ok(())
    }

    /// Delete a per-file entity list. Idempotent.
    ///
    /// Why: `remove_file` drops the file's entities; the durable store must
    /// follow or the symbol graph would rebuild stale symbols on restart.
    /// What: removes the file key from `ENTITIES_TABLE`.
    /// Test: covered indirectly by `delete_removes_chunk` (same txn shape).
    pub fn delete_entities(&self, file: &str) -> Result<()> {
        let txn = self.db.begin_write().context("begin entity delete txn")?;
        {
            let mut table = txn.open_table(ENTITIES_TABLE)?;
            table
                .remove(file)
                .with_context(|| format!("delete entities for {file}"))?;
        }
        txn.commit().context("commit entity delete txn")?;
        Ok(())
    }

    /// Load every chunk in the corpus into a `Vec`.
    ///
    /// Why: the warm-boot path rehydrates the in-memory `HashMap` (and rebuilds
    /// BM25 + the symbol graph) from this. A streaming iterator would avoid the
    /// transient `Vec`, but the caller already needs an owned `RawChunk` per
    /// entry to insert into the map, so the `Vec` is not extra peak RAM beyond
    /// the map itself.
    /// What: opens a read transaction, walks `CHUNKS_TABLE`, and deserializes
    /// each value. A single corrupt row is skipped with a `warn` rather than
    /// failing the whole load — one bad chunk must not brick the daemon.
    /// Test: `roundtrip`.
    pub fn load_all_chunks(&self) -> Result<Vec<RawChunk>> {
        let txn = self.db.begin_read().context("begin chunk read txn")?;
        let table = txn.open_table(CHUNKS_TABLE)?;
        let mut out = Vec::new();
        for entry in table.iter().context("iterate chunks table")? {
            let (key, value) = entry.context("read chunk row")?;
            match serde_json::from_slice::<RawChunk>(value.value()) {
                Ok(chunk) => out.push(chunk),
                Err(e) => {
                    tracing::warn!("corpus: skipping corrupt chunk row '{}' ({e})", key.value())
                }
            }
        }
        Ok(out)
    }

    /// Batch point-read a set of chunks by `chunk_id`.
    ///
    /// Why: issue #28 deferred item — the search hot path used to materialize
    /// top-k results by joining fused `(id, score)` pairs against an in-memory
    /// `HashMap<String, RawChunk>` that held *every* chunk's text resident in
    /// the heap permanently (~45 GB RSS on a large monorepo). Reading the
    /// top-k chunk text straight out of redb at materialization time lets the
    /// daemon drop that HashMap from the query path entirely: redb's values are
    /// mmap-backed, so a point lookup is served from the OS page cache rather
    /// than process heap, cutting steady-state RSS to <10 GB. A typical
    /// `top_k=20` query does 20 point reads inside one read transaction —
    /// each is an O(log n) B-tree descent over an mmap'd file, well within the
    /// sub-10 ms query budget.
    /// What: opens a single redb read transaction and fetches each requested
    /// id. Missing ids are skipped (not an error) — a fused id with no redb row
    /// is almost always a benign race against a concurrent removal, and one
    /// missing chunk must not fail the whole query. A corrupt row is likewise
    /// skipped with a `warn`. The returned `Vec` preserves the input `ids`
    /// order for the ids that were found.
    /// Test: `get_chunks_batch_reads_subset` round-trips a corpus and asserts
    /// only the requested ids come back, in order, with missing ids skipped.
    pub fn get_chunks(&self, ids: &[&str]) -> Result<Vec<RawChunk>> {
        if ids.is_empty() {
            return Ok(Vec::new());
        }
        let txn = self.db.begin_read().context("begin chunk point-read txn")?;
        let table = txn.open_table(CHUNKS_TABLE)?;
        let mut out = Vec::with_capacity(ids.len());
        for id in ids {
            let Some(value) = table
                .get(*id)
                .with_context(|| format!("point-read chunk {id}"))?
            else {
                tracing::warn!("corpus: chunk '{id}' not found in redb — skipping");
                continue;
            };
            match serde_json::from_slice::<RawChunk>(value.value()) {
                Ok(chunk) => out.push(chunk),
                Err(e) => {
                    tracing::warn!("corpus: skipping corrupt chunk row '{id}' ({e})")
                }
            }
        }
        Ok(out)
    }

    /// Load every per-file entity list.
    ///
    /// Why: counterpart of [`Self::load_all_chunks`] for the entities table;
    /// the warm-boot path needs both to rebuild the symbol graph.
    /// What: walks `ENTITIES_TABLE`, deserializing each `Vec<RawEntity>`. A
    /// corrupt row is skipped with a `warn`.
    /// Test: `roundtrip`.
    pub fn load_all_entities(&self) -> Result<Vec<(String, Vec<RawEntity>)>> {
        let txn = self.db.begin_read().context("begin entity read txn")?;
        let table = txn.open_table(ENTITIES_TABLE)?;
        let mut out = Vec::new();
        for entry in table.iter().context("iterate entities table")? {
            let (key, value) = entry.context("read entity row")?;
            let file = key.value().to_string();
            match serde_json::from_slice::<Vec<RawEntity>>(value.value()) {
                Ok(ents) => out.push((file, ents)),
                Err(e) => {
                    tracing::warn!("corpus: skipping corrupt entity row '{file}' ({e})")
                }
            }
        }
        Ok(out)
    }

    /// Number of chunks currently stored.
    ///
    /// Why: lets the warm-boot path log a count and lets callers cheaply check
    /// "is the durable corpus empty?" (first-run / post-upgrade case) without
    /// materializing every row.
    /// What: returns `CHUNKS_TABLE.len()`.
    /// Test: `roundtrip` asserts the count after upsert.
    pub fn chunk_count(&self) -> Result<usize> {
        let txn = self.db.begin_read().context("begin count txn")?;
        let table = txn.open_table(CHUNKS_TABLE)?;
        Ok(table.len().context("count chunks")? as usize)
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use crate::core::chunker::{ChunkType, RawChunk};

    /// Build a minimal `RawChunk` for tests.
    fn raw(id: &str, content: &str) -> RawChunk {
        RawChunk {
            id: id.to_string(),
            file: "src/lib.rs".to_string(),
            start_line: 1,
            end_line: 1,
            content: content.to_string(),
            function_name: None,
            language: Some("rust".to_string()),
            chunk_type: ChunkType::Code,
            calls: Vec::new(),
            inherits_from: Vec::new(),
            chunk_depth: 0,
            parent_chunk_id: None,
            child_chunk_ids: Vec::new(),
            nlp_keywords: Vec::new(),
            nlp_code_refs: Vec::new(),
            virtual_terms: Vec::new(),
        }
    }

    #[test]
    fn roundtrip() {
        let dir = tempfile::tempdir().unwrap();
        let store = CorpusStore::open(&dir.path().join("index.redb")).unwrap();

        let chunks = vec![raw("a:1:1", "fn a() {}"), raw("b:1:1", "fn b() {}")];
        store.upsert_chunks(&chunks).unwrap();
        store
            .upsert_entities(&[("src/lib.rs".to_string(), Vec::new())])
            .unwrap();
        assert_eq!(store.chunk_count().unwrap(), 2);

        // Reopen to simulate a daemon restart.
        drop(store);
        let store = CorpusStore::open(&dir.path().join("index.redb")).unwrap();
        let mut loaded = store.load_all_chunks().unwrap();
        loaded.sort_by(|x, y| x.id.cmp(&y.id));
        assert_eq!(loaded.len(), 2);
        assert_eq!(loaded[0].id, "a:1:1");
        assert_eq!(loaded[0].content, "fn a() {}");

        let entities = store.load_all_entities().unwrap();
        assert_eq!(entities.len(), 1);
        assert_eq!(entities[0].0, "src/lib.rs");
    }

    #[test]
    fn batch_upsert_is_atomic_roundtrip() {
        // Issue #29: `upsert_batch` writes chunks + entities in one redb
        // transaction. A reopened store must see both, exactly as the
        // separate-call `roundtrip` test asserts for `upsert_chunks` /
        // `upsert_entities`.
        let dir = tempfile::tempdir().unwrap();
        let path = dir.path().join("index.redb");
        {
            let store = CorpusStore::open(&path).unwrap();
            store
                .upsert_batch(
                    &[raw("a:1:1", "fn a() {}"), raw("b:1:1", "fn b() {}")],
                    &[("src/lib.rs".to_string(), Vec::new())],
                )
                .unwrap();
            assert_eq!(store.chunk_count().unwrap(), 2);
        }
        // Reopen to simulate a daemon restart — both tables must be intact.
        let store = CorpusStore::open(&path).unwrap();
        let mut loaded = store.load_all_chunks().unwrap();
        loaded.sort_by(|x, y| x.id.cmp(&y.id));
        assert_eq!(loaded.len(), 2);
        assert_eq!(loaded[0].id, "a:1:1");
        let entities = store.load_all_entities().unwrap();
        assert_eq!(entities.len(), 1);
        assert_eq!(entities[0].0, "src/lib.rs");

        // A batch with only chunks still writes the chunks table.
        store
            .upsert_batch(&[raw("c:1:1", "fn c() {}")], &[])
            .unwrap();
        assert_eq!(store.chunk_count().unwrap(), 3);

        // A batch with only entities still writes the entities table.
        store
            .upsert_batch(&[], &[("src/other.rs".to_string(), Vec::new())])
            .unwrap();
        assert_eq!(store.load_all_entities().unwrap().len(), 2);

        // A fully-empty batch is a silent no-op.
        store.upsert_batch(&[], &[]).unwrap();
        assert_eq!(store.chunk_count().unwrap(), 3);
    }

    #[test]
    fn get_chunks_batch_reads_subset() {
        // Issue #28 deferred item: the query hot path materializes top-k
        // results via `get_chunks`. It must return only the requested ids, in
        // input order, and silently skip ids absent from the corpus.
        let dir = tempfile::tempdir().unwrap();
        let store = CorpusStore::open(&dir.path().join("index.redb")).unwrap();
        store
            .upsert_chunks(&[
                raw("a:1:1", "fn a() {}"),
                raw("b:1:1", "fn b() {}"),
                raw("c:1:1", "fn c() {}"),
            ])
            .unwrap();

        // Request a subset out of corpus order, with one unknown id mixed in.
        let got = store
            .get_chunks(&["c:1:1", "missing:0:0", "a:1:1"])
            .unwrap();
        assert_eq!(got.len(), 2, "unknown id must be skipped, not error");
        assert_eq!(got[0].id, "c:1:1", "input order must be preserved");
        assert_eq!(got[0].content, "fn c() {}");
        assert_eq!(got[1].id, "a:1:1");

        // Empty input is a no-op.
        assert!(store.get_chunks(&[]).unwrap().is_empty());

        // All-missing input yields an empty vec, never an error.
        assert!(store.get_chunks(&["nope:0:0"]).unwrap().is_empty());
    }

    #[test]
    fn missing_db_is_empty() {
        // A brand-new database (post-upgrade / first-run) must open cleanly
        // and report an empty corpus rather than erroring.
        let dir = tempfile::tempdir().unwrap();
        let store = CorpusStore::open(&dir.path().join("fresh.redb")).unwrap();
        assert_eq!(store.chunk_count().unwrap(), 0);
        assert!(store.load_all_chunks().unwrap().is_empty());
        assert!(store.load_all_entities().unwrap().is_empty());
    }

    #[test]
    fn delete_removes_chunk() {
        let dir = tempfile::tempdir().unwrap();
        let store = CorpusStore::open(&dir.path().join("index.redb")).unwrap();
        store
            .upsert_chunks(&[raw("a:1:1", "x"), raw("b:1:1", "y")])
            .unwrap();
        store.delete_chunks(&["a:1:1".to_string()]).unwrap();
        assert_eq!(store.chunk_count().unwrap(), 1);
        let loaded = store.load_all_chunks().unwrap();
        assert_eq!(loaded.len(), 1);
        assert_eq!(loaded[0].id, "b:1:1");
        // Deleting an unknown id is a silent no-op.
        store.delete_chunks(&["nope:0:0".to_string()]).unwrap();
        assert_eq!(store.chunk_count().unwrap(), 1);
    }

    #[test]
    fn empty_batches_are_noops() {
        let dir = tempfile::tempdir().unwrap();
        let store = CorpusStore::open(&dir.path().join("index.redb")).unwrap();
        store.upsert_chunks(&[]).unwrap();
        store.upsert_entities(&[]).unwrap();
        store.delete_chunks(&[]).unwrap();
        assert_eq!(store.chunk_count().unwrap(), 0);
    }

    #[test]
    fn delete_entities_removes_file_row() {
        let dir = tempfile::tempdir().unwrap();
        let store = CorpusStore::open(&dir.path().join("index.redb")).unwrap();
        store
            .upsert_entities(&[
                ("src/a.rs".to_string(), Vec::new()),
                ("src/b.rs".to_string(), Vec::new()),
            ])
            .unwrap();
        assert_eq!(store.load_all_entities().unwrap().len(), 2);
        store.delete_entities("src/a.rs").unwrap();
        let remaining = store.load_all_entities().unwrap();
        assert_eq!(remaining.len(), 1);
        assert_eq!(remaining[0].0, "src/b.rs");
        // Deleting an unknown file is a silent no-op.
        store.delete_entities("src/never.rs").unwrap();
        assert_eq!(store.load_all_entities().unwrap().len(), 1);
    }

    #[test]
    fn path_accessor_returns_open_path() {
        // Issue #28 Phase 4: the atomic-swap path reads `path()` to know which
        // file to rename. It must echo back exactly what `open` was given.
        let dir = tempfile::tempdir().unwrap();
        let p = dir.path().join("index.redb");
        let store = CorpusStore::open(&p).unwrap();
        assert_eq!(store.path(), p.as_path());
    }

    #[test]
    fn open_fresh_truncates_stale_staging_file() {
        // Issue #28 Phase 4: a stale `index.redb.tmp` left by an aborted
        // reindex must not contribute pre-existing rows to the next staged
        // corpus — `open_fresh` discards the old file first.
        let dir = tempfile::tempdir().unwrap();
        let p = dir.path().join("index.redb.tmp");

        // Populate, then drop so the file is closed and persisted on disk.
        {
            let store = CorpusStore::open(&p).unwrap();
            store.upsert_chunks(&[raw("stale:1:1", "old")]).unwrap();
            assert_eq!(store.chunk_count().unwrap(), 1);
        }
        assert!(p.exists());

        // `open_fresh` must yield an empty corpus despite the existing file.
        let fresh = CorpusStore::open_fresh(&p).unwrap();
        assert_eq!(fresh.chunk_count().unwrap(), 0);
        assert_eq!(fresh.path(), p.as_path());

        // And `open_fresh` on a path that does not exist is also fine.
        let fresh2 = CorpusStore::open_fresh(&dir.path().join("never.redb.tmp")).unwrap();
        assert_eq!(fresh2.chunk_count().unwrap(), 0);
    }
}