trusty-search 0.22.0

Machine-wide hybrid code search service: BM25 + vector + KG, zero cold-start, MCP server
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
//! redb-backed durable chunk corpus (issue #28).
//!
//! Why: prior to this module the chunk corpus was persisted as a single
//! `chunks.json` file rewritten in full after every committed batch. On a
//! 200k-chunk corpus that JSON blob is ~400 MB; serializing it on every batch
//! commit (a reindex emits one commit per 128 files) caused the
//! memory-explosion documented in `PersistState` and forced a full re-read of
//! the entire file into a `HashMap` on every daemon restart. redb gives us:
//!   * crash-safe, atomic per-batch commits (no half-written file window),
//!   * O(batch) incremental writes instead of O(corpus) full rewrites,
//!   * the option to stream chunks back at startup without holding two copies
//!     (the JSON `Vec<RawChunk>` plus the live `HashMap`) in RAM at once.
//!
//! What: [`CorpusStore`] wraps a `redb::Database` with two tables — one keyed
//! by `chunk_id` holding the serialized [`RawChunk`], one keyed by file path
//! holding the serialized per-file [`RawEntity`] list. Values are serialized
//! with `serde_json` (already a workspace dependency; no new crate, and the
//! human-readable form keeps `redb` dumps debuggable).
//!
//! Test: see the `tests` submodule — `roundtrip` writes chunks + entities and
//! reads them back into a fresh store; `missing_db_is_empty` covers the
//! first-run / post-upgrade fallback; `delete_removes_chunk` covers eviction.

use std::path::Path;

use anyhow::{Context, Result};
use redb::{Database, ReadableTable, ReadableTableMetadata, TableDefinition};

use crate::core::chunker::RawChunk;
use crate::core::entity::RawEntity;

/// Default application-level page cache size for the redb corpus database, in
/// megabytes (64 MB).
///
/// Why (B.2 quick-win, issue #329): redb treats `set_cache_size` as a *ceiling*
/// that fills lazily as pages are touched. Empirical profiling of the
/// trusty-tools corpus (23,513 chunks) showed the actual redb working set is
/// ~87 MB: a clean 512 MB cap run peaked at 557 MB RSS while an 8 MB cap run
/// peaked at 470 MB — a difference of exactly 87 MB. The 512 MB ceiling was
/// massively over-provisioned; 64 MB captures the full working set with ~27 MB
/// of headroom for B-tree internal nodes and future corpus growth without the
/// 33% indexing speed penalty observed at 8 MB (where I/O pressure becomes the
/// bottleneck). Lowering from 512 → 64 MB saves ~87 MB of peak RSS during a
/// force reindex. The previous value of 512 MB was the "smaller-than-16-GiB"
/// step from the original hardcoded 16 GiB; this is the next measured step.
/// The trade-off is explicit: a *larger* cache means fewer disk reads for warm
/// queries against a big corpus; a *smaller* cache means lower idle RSS at the
/// cost of more page faults on cold reads. Operators on large-corpus hosts can
/// raise it via `TRUSTY_REDB_CACHE_MB`.
/// What: 64, multiplied by 1 MiB in [`redb_cache_size_bytes`].
/// Test: `redb_cache_size_default_and_env_override` covers default + override.
const DEFAULT_REDB_CACHE_MB: usize = 64;

/// Resolve the redb application page-cache size (in bytes) from the
/// environment, falling back to [`DEFAULT_REDB_CACHE_MB`].
///
/// Why: the cache size is the single biggest lever on the daemon's idle RSS
/// (see [`DEFAULT_REDB_CACHE_MB`]). Making it configurable lets operators tune
/// the warm-query-latency vs. idle-memory trade-off per host without a
/// recompile — large-corpus hosts raise it, memory-constrained dev machines
/// keep the small default.
/// What: reads `TRUSTY_REDB_CACHE_MB`, parses it as `usize` megabytes, and
/// returns the value in bytes (`mb * 1024 * 1024`). Falls back to
/// [`DEFAULT_REDB_CACHE_MB`] when the var is unset, empty, unparseable, or
/// zero, logging a `warn` on a non-empty unparseable value so typos surface.
/// Test: `redb_cache_size_default_and_env_override`.
fn redb_cache_size_bytes() -> usize {
    let mb = match std::env::var("TRUSTY_REDB_CACHE_MB") {
        Ok(v) if !v.is_empty() => match v.parse::<usize>() {
            Ok(n) if n > 0 => n,
            Ok(_) => DEFAULT_REDB_CACHE_MB,
            Err(_) => {
                tracing::warn!(
                    "corpus: TRUSTY_REDB_CACHE_MB={v:?} is not a valid usize; \
                     using default ({DEFAULT_REDB_CACHE_MB} MB)"
                );
                DEFAULT_REDB_CACHE_MB
            }
        },
        _ => DEFAULT_REDB_CACHE_MB,
    };
    mb * 1024 * 1024
}

/// redb table holding the serialized chunk corpus, keyed by `chunk_id`.
///
/// Why: `chunk_id` (`"{path}:{start}:{end}"`) is the corpus's natural primary
/// key — it is collision-safe and is exactly what the in-memory `HashMap` is
/// keyed by, so a redb row maps 1:1 onto a `HashMap` entry.
/// What: `&str → &[u8]` where the value is `serde_json`-encoded [`RawChunk`].
const CHUNKS_TABLE: TableDefinition<&str, &[u8]> = TableDefinition::new("chunks");

/// redb table holding the per-file entity lists, keyed by file path.
///
/// Why: `entities` are needed to rebuild the symbol graph on warm-boot and are
/// derived per file, so the file path is the natural key.
/// What: `&str → &[u8]` where the value is `serde_json`-encoded
/// `Vec<RawEntity>`.
const ENTITIES_TABLE: TableDefinition<&str, &[u8]> = TableDefinition::new("entities");

/// redb table holding the persisted `SymbolGraph` nodes (issue #41 phase 2).
///
/// Why: cold-start graph rebuild from the chunk corpus is O(N chunks) and
/// loses Phase B/C edges. Persisting the graph adjacency lists alongside the
/// chunk corpus lets warm-boot rehydrate the KG in O(nodes + edges) without
/// re-running `build_from_chunks`.
/// What: `symbol → &[u8]` where the value is `serde_json`-encoded
/// [`PersistedKgNode`] (carries `chunk_id` + `file` for round-trip equality).
pub(crate) const KG_NODES_TABLE: TableDefinition<&str, &[u8]> = TableDefinition::new("kg_nodes");

/// redb table holding the forward (source → targets) KG adjacency list.
///
/// Why: BFS expansion walks outgoing edges by symbol; storing the full edge
/// list under the source key gives O(1) load of all outgoing edges per node.
/// What: `source_symbol → &[u8]` where the value is `serde_json`-encoded
/// `Vec<(EdgeKind, target_symbol)>`. One row per source symbol; empty
/// adjacency lists are omitted.
pub(crate) const KG_EDGES_TABLE: TableDefinition<&str, &[u8]> = TableDefinition::new("kg_edges");

/// redb table holding the reverse (target → sources) KG adjacency list.
///
/// Why: `callers_of` expansions walk *incoming* edges by symbol; a separate
/// reverse adjacency keeps that lookup O(1) instead of forcing a full
/// forward-edge scan.
/// What: `target_symbol → &[u8]` where the value is `serde_json`-encoded
/// `Vec<(EdgeKind, source_symbol)>`.
pub(crate) const KG_EDGES_REV_TABLE: TableDefinition<&str, &[u8]> =
    TableDefinition::new("kg_edges_rev");

/// redb table holding persisted Louvain community records (migration tolerance).
///
/// Why: kept for backward-compat with on-disk indexes created before v0.10.0
/// (issues #41 / #152). The Louvain community detection and `community_cohesion`
/// ranking were removed in v0.10.0 (PROVENANCE-ONLY decision, issue #145).
/// This table definition is retained so the redb schema initialisation does not
/// fail when opening old databases that already have the table.
/// What: `community_id (u64) → &[u8]` (was serde_json-encoded CommunityRecord).
/// The table is no longer written or read by the active search path.
pub(crate) const KG_COMMUNITIES_TABLE: TableDefinition<u64, &[u8]> =
    TableDefinition::new("kg_communities");

/// redb table mapping symbol → community id (migration tolerance).
///
/// Why: same as `KG_COMMUNITIES_TABLE` — retained to avoid schema errors on
/// old indexes. Not written or read by the active search path as of v0.10.0.
/// What: `symbol (str) → community_id (u64)`.
pub(crate) const KG_SYMBOL_COMMUNITY_TABLE: TableDefinition<&str, u64> =
    TableDefinition::new("kg_symbol_community");

/// Durable, redb-backed store for an index's chunk corpus + entity lists.
///
/// Why: see module docs — replaces the full-rewrite `chunks.json` snapshot
/// with an embedded transactional KV store so per-batch commits are O(batch)
/// and crash-safe.
/// What: owns a `redb::Database`; exposes batched upsert, full enumeration,
/// per-id/per-file deletion, and a count. Every mutating call is its own redb
/// write transaction, so a crash between calls never leaves a torn corpus.
/// Test: covered by the `tests` submodule.
pub struct CorpusStore {
    db: Database,
    /// Filesystem path the `db` was opened at. Retained so the atomic
    /// `--force` reindex swap (issue #28, Phase 4) knows which file to rename
    /// without the caller having to pass the path back in.
    path: std::path::PathBuf,
}

impl CorpusStore {
    /// Open (creating if absent) the redb database at `path`.
    ///
    /// Why: the daemon resolves one `index.redb` per index under its data dir;
    /// opening here is the single entry point so table-creation and the
    /// create-if-missing semantics live in one place.
    /// What: opens the database via `Database::builder()` with an application
    /// page cache sized by [`redb_cache_size_bytes`] (default
    /// [`DEFAULT_REDB_CACHE_MB`] MB, overridable via `TRUSTY_REDB_CACHE_MB`),
    /// then runs a no-op write transaction that `open_table`s both tables so
    /// they exist before any reader runs (redb requires a table to have been
    /// created in a committed write txn before it can be opened read-only).
    /// This single builder call is the only place a corpus `redb::Database` is
    /// opened, so the cache size applies to the live `index.redb` and the
    /// `--force` staging `index.redb.tmp` alike (`open_fresh` delegates here).
    /// The effective cache size is logged at `info` so operators can confirm
    /// the resolved value at daemon startup.
    /// Test: `roundtrip` and `missing_db_is_empty` both exercise `open`.
    pub fn open(path: &Path) -> Result<Self> {
        if let Some(parent) = path.parent() {
            std::fs::create_dir_all(parent)
                .with_context(|| format!("create parent of {}", path.display()))?;
        }
        let cache_bytes = redb_cache_size_bytes();
        tracing::info!(
            "corpus: opening {} with redb page cache = {} MB \
             (set TRUSTY_REDB_CACHE_MB to override)",
            path.display(),
            cache_bytes / (1024 * 1024),
        );
        let db = Database::builder()
            .set_cache_size(cache_bytes)
            .create(path)
            .with_context(|| format!("open redb corpus at {}", path.display()))?;
        // Materialize both tables in a committed write txn so later read-only
        // transactions can `open_table` them even on a brand-new database.
        {
            let txn = db.begin_write().context("begin corpus init txn")?;
            {
                txn.open_table(CHUNKS_TABLE).context("init chunks table")?;
                txn.open_table(ENTITIES_TABLE)
                    .context("init entities table")?;
                // Issue #41 phase 2: materialize the KG persistence tables
                // alongside the chunk/entity tables so warm-boot reads never
                // race a missing-table error on a fresh database.
                txn.open_table(KG_NODES_TABLE)
                    .context("init kg_nodes table")?;
                txn.open_table(KG_EDGES_TABLE)
                    .context("init kg_edges table")?;
                txn.open_table(KG_EDGES_REV_TABLE)
                    .context("init kg_edges_rev table")?;
                // Issue #41 phase 3: materialize the community persistence
                // tables alongside the KG tables so warm-boot reads never race
                // a missing-table error on a fresh database.
                txn.open_table(KG_COMMUNITIES_TABLE)
                    .context("init kg_communities table")?;
                txn.open_table(KG_SYMBOL_COMMUNITY_TABLE)
                    .context("init kg_symbol_community table")?;
                // Migration framework: materialize `_meta` so the schema-version
                // read never races a missing-table error on fresh databases.
                txn.open_table(crate::core::migration::META_TABLE)
                    .context("init _meta table")?;
            }
            txn.commit().context("commit corpus init txn")?;
        }
        Ok(Self {
            db,
            path: path.to_path_buf(),
        })
    }

    /// Open a fresh (truncated) redb corpus at `path`, discarding any existing
    /// file first.
    ///
    /// Why: the `--force` reindex (issue #28, Phase 4) stages the rebuilt
    /// corpus in `index.redb.tmp`. A stale `.tmp` left behind by a previously
    /// aborted reindex must not contribute pre-existing rows to the new staged
    /// corpus — the staged file must reflect *only* this reindex's output so
    /// the post-reindex atomic rename produces a corpus identical to a clean
    /// rebuild.
    /// What: best-effort removes any file already at `path`, then delegates to
    /// [`Self::open`]. A `NotFound` removal error is ignored (nothing to
    /// clear); any other removal error is surfaced.
    /// Test: `tests::test_force_reindex_atomic_corpus_swap`.
    pub fn open_fresh(path: &Path) -> Result<Self> {
        match std::fs::remove_file(path) {
            Ok(()) => {}
            Err(e) if e.kind() == std::io::ErrorKind::NotFound => {}
            Err(e) => {
                return Err(e)
                    .with_context(|| format!("clear stale staging corpus at {}", path.display()))
            }
        }
        Self::open(path)
    }

    /// Filesystem path this store's database was opened at.
    ///
    /// Why: the atomic `--force` reindex swap needs to know the staging file's
    /// path to rename it over the live `index.redb`, and the caller would
    /// otherwise have to thread the path alongside every `Arc<CorpusStore>`.
    /// What: returns the stored `PathBuf`.
    /// Test: `tests::test_force_reindex_atomic_corpus_swap` asserts the path.
    pub fn path(&self) -> &Path {
        &self.path
    }

    /// Upsert a batch of chunks in a single redb write transaction.
    ///
    /// Why: a batch commit (`commit_parsed_batch`) lands up to a few hundred
    /// chunks at once. One transaction per batch keeps the write amplification
    /// proportional to the batch size, not the whole corpus, and makes the
    /// batch atomic — a crash mid-commit rolls the whole batch back.
    /// What: serializes each [`RawChunk`] with `serde_json` and inserts it
    /// under its `id`. Existing ids are overwritten (upsert semantics).
    /// Test: `roundtrip` writes then reads; `delete_removes_chunk` re-upserts.
    pub fn upsert_chunks(&self, chunks: &[RawChunk]) -> Result<()> {
        if chunks.is_empty() {
            return Ok(());
        }
        let txn = self.db.begin_write().context("begin chunk upsert txn")?;
        {
            let mut table = txn.open_table(CHUNKS_TABLE)?;
            for chunk in chunks {
                let bytes = serde_json::to_vec(chunk)
                    .with_context(|| format!("serialize chunk {}", chunk.id))?;
                table
                    .insert(chunk.id.as_str(), bytes.as_slice())
                    .with_context(|| format!("insert chunk {}", chunk.id))?;
            }
        }
        txn.commit().context("commit chunk upsert txn")?;
        Ok(())
    }

    /// Upsert a batch of per-file entity lists in a single write transaction.
    ///
    /// Why: entity lists are committed alongside chunks; sharing the same
    /// one-txn-per-batch discipline keeps both tables consistent on a crash.
    /// What: serializes each `Vec<RawEntity>` and inserts it under its file
    /// path key.
    /// Test: `roundtrip` exercises this alongside `upsert_chunks`.
    pub fn upsert_entities(&self, entities: &[(String, Vec<RawEntity>)]) -> Result<()> {
        if entities.is_empty() {
            return Ok(());
        }
        let txn = self.db.begin_write().context("begin entity upsert txn")?;
        {
            let mut table = txn.open_table(ENTITIES_TABLE)?;
            for (file, ents) in entities {
                let bytes = serde_json::to_vec(ents)
                    .with_context(|| format!("serialize entities for {file}"))?;
                table
                    .insert(file.as_str(), bytes.as_slice())
                    .with_context(|| format!("insert entities for {file}"))?;
            }
        }
        txn.commit().context("commit entity upsert txn")?;
        Ok(())
    }

    /// Upsert a batch of chunks **and** their per-file entity lists in a
    /// single redb write transaction (issue #29).
    ///
    /// Why: `upsert_chunks` and `upsert_entities` each opened their own
    /// `begin_write()` transaction. A crash (or SIGTERM) landing between the
    /// two commits left the chunk corpus and the symbol-graph entity table
    /// inconsistent — a warm-boot would rehydrate chunks that the entity table
    /// no longer described, or vice versa. Folding both tables into one
    /// transaction makes the whole batch (chunks + entities) atomic: a crash
    /// either rolls back the entire batch or commits all of it.
    /// What: opens one write transaction, inserts every [`RawChunk`] into
    /// `CHUNKS_TABLE` and every per-file `Vec<RawEntity>` into `ENTITIES_TABLE`
    /// under that transaction, then commits once. Both table handles are
    /// dropped (inner scope closed) before `commit()` — redb requires every
    /// table opened in a write txn to be dropped before the txn can commit.
    /// Empty inputs on **both** sides are a no-op (no transaction opened); a
    /// non-empty input on either side still writes the other table even when
    /// it is empty, so callers get one consistent commit point.
    /// Test: `batch_upsert_is_atomic_roundtrip` writes chunks + entities via
    /// this method and reads them back from a reopened store.
    pub fn upsert_batch(
        &self,
        chunks: &[RawChunk],
        entities: &[(String, Vec<RawEntity>)],
    ) -> Result<()> {
        if chunks.is_empty() && entities.is_empty() {
            return Ok(());
        }
        let txn = self.db.begin_write().context("begin batch upsert txn")?;
        {
            // Single atomic transaction covering both tables. Table handles
            // live only inside this scope so they are dropped before commit.
            let mut chunks_tbl = txn
                .open_table(CHUNKS_TABLE)
                .context("open chunks table for batch upsert")?;
            for chunk in chunks {
                let bytes = serde_json::to_vec(chunk)
                    .with_context(|| format!("serialize chunk {}", chunk.id))?;
                chunks_tbl
                    .insert(chunk.id.as_str(), bytes.as_slice())
                    .with_context(|| format!("insert chunk {}", chunk.id))?;
            }
            let mut entities_tbl = txn
                .open_table(ENTITIES_TABLE)
                .context("open entities table for batch upsert")?;
            for (file, ents) in entities {
                let bytes = serde_json::to_vec(ents)
                    .with_context(|| format!("serialize entities for {file}"))?;
                entities_tbl
                    .insert(file.as_str(), bytes.as_slice())
                    .with_context(|| format!("insert entities for {file}"))?;
            }
        }
        txn.commit().context("commit batch upsert txn")?;
        Ok(())
    }

    /// Delete a set of chunk ids in one write transaction.
    ///
    /// Why: `remove_file` / `remove_chunk` must evict from the durable store
    /// too, or a restart would resurrect deleted chunks.
    /// What: removes each id from `CHUNKS_TABLE`; unknown ids are a silent
    /// no-op (idempotent delete), matching the in-memory `HashMap::remove`.
    /// Test: `delete_removes_chunk`.
    pub fn delete_chunks(&self, ids: &[String]) -> Result<()> {
        if ids.is_empty() {
            return Ok(());
        }
        let txn = self.db.begin_write().context("begin chunk delete txn")?;
        {
            let mut table = txn.open_table(CHUNKS_TABLE)?;
            for id in ids {
                table
                    .remove(id.as_str())
                    .with_context(|| format!("delete chunk {id}"))?;
            }
        }
        txn.commit().context("commit chunk delete txn")?;
        Ok(())
    }

    /// Delete a per-file entity list. Idempotent.
    ///
    /// Why: `remove_file` drops the file's entities; the durable store must
    /// follow or the symbol graph would rebuild stale symbols on restart.
    /// What: removes the file key from `ENTITIES_TABLE`.
    /// Test: covered indirectly by `delete_removes_chunk` (same txn shape).
    pub fn delete_entities(&self, file: &str) -> Result<()> {
        let txn = self.db.begin_write().context("begin entity delete txn")?;
        {
            let mut table = txn.open_table(ENTITIES_TABLE)?;
            table
                .remove(file)
                .with_context(|| format!("delete entities for {file}"))?;
        }
        txn.commit().context("commit entity delete txn")?;
        Ok(())
    }

    /// Load every chunk in the corpus into a `Vec`.
    ///
    /// Why: the warm-boot path rehydrates the in-memory `HashMap` (and rebuilds
    /// BM25 + the symbol graph) from this. A streaming iterator would avoid the
    /// transient `Vec`, but the caller already needs an owned `RawChunk` per
    /// entry to insert into the map, so the `Vec` is not extra peak RAM beyond
    /// the map itself.
    /// What: opens a read transaction, walks `CHUNKS_TABLE`, and deserializes
    /// each value. A single corrupt row is skipped with a `warn` rather than
    /// failing the whole load — one bad chunk must not brick the daemon.
    /// Test: `roundtrip`.
    pub fn load_all_chunks(&self) -> Result<Vec<RawChunk>> {
        let txn = self.db.begin_read().context("begin chunk read txn")?;
        let table = txn.open_table(CHUNKS_TABLE)?;
        let mut out = Vec::new();
        for entry in table.iter().context("iterate chunks table")? {
            let (key, value) = entry.context("read chunk row")?;
            match serde_json::from_slice::<RawChunk>(value.value()) {
                Ok(chunk) => out.push(chunk),
                Err(e) => {
                    tracing::warn!("corpus: skipping corrupt chunk row '{}' ({e})", key.value())
                }
            }
        }
        Ok(out)
    }

    /// Batch point-read a set of chunks by `chunk_id`.
    ///
    /// Why: issue #28 deferred item — the search hot path used to materialize
    /// top-k results by joining fused `(id, score)` pairs against an in-memory
    /// `HashMap<String, RawChunk>` that held *every* chunk's text resident in
    /// the heap permanently (~45 GB RSS on a large monorepo). Reading the
    /// top-k chunk text straight out of redb at materialization time lets the
    /// daemon drop that HashMap from the query path entirely: redb's values are
    /// mmap-backed, so a point lookup is served from the OS page cache rather
    /// than process heap, cutting steady-state RSS to <10 GB. A typical
    /// `top_k=20` query does 20 point reads inside one read transaction —
    /// each is an O(log n) B-tree descent over an mmap'd file, well within the
    /// sub-10 ms query budget.
    /// What: opens a single redb read transaction and fetches each requested
    /// id. Missing ids are skipped (not an error) — a fused id with no redb row
    /// is almost always a benign race against a concurrent removal, and one
    /// missing chunk must not fail the whole query. A corrupt row is likewise
    /// skipped with a `warn`. The returned `Vec` preserves the input `ids`
    /// order for the ids that were found.
    /// Test: `get_chunks_batch_reads_subset` round-trips a corpus and asserts
    /// only the requested ids come back, in order, with missing ids skipped.
    pub fn get_chunks(&self, ids: &[&str]) -> Result<Vec<RawChunk>> {
        if ids.is_empty() {
            return Ok(Vec::new());
        }
        let txn = self.db.begin_read().context("begin chunk point-read txn")?;
        let table = txn.open_table(CHUNKS_TABLE)?;
        let mut out = Vec::with_capacity(ids.len());
        for id in ids {
            let Some(value) = table
                .get(*id)
                .with_context(|| format!("point-read chunk {id}"))?
            else {
                tracing::warn!("corpus: chunk '{id}' not found in redb — skipping");
                continue;
            };
            match serde_json::from_slice::<RawChunk>(value.value()) {
                Ok(chunk) => out.push(chunk),
                Err(e) => {
                    tracing::warn!("corpus: skipping corrupt chunk row '{id}' ({e})")
                }
            }
        }
        Ok(out)
    }

    /// Load every per-file entity list.
    ///
    /// Why: counterpart of [`Self::load_all_chunks`] for the entities table;
    /// the warm-boot path needs both to rebuild the symbol graph.
    /// What: walks `ENTITIES_TABLE`, deserializing each `Vec<RawEntity>`. A
    /// corrupt row is skipped with a `warn`.
    /// Test: `roundtrip`.
    pub fn load_all_entities(&self) -> Result<Vec<(String, Vec<RawEntity>)>> {
        let txn = self.db.begin_read().context("begin entity read txn")?;
        let table = txn.open_table(ENTITIES_TABLE)?;
        let mut out = Vec::new();
        for entry in table.iter().context("iterate entities table")? {
            let (key, value) = entry.context("read entity row")?;
            let file = key.value().to_string();
            match serde_json::from_slice::<Vec<RawEntity>>(value.value()) {
                Ok(ents) => out.push((file, ents)),
                Err(e) => {
                    tracing::warn!("corpus: skipping corrupt entity row '{file}' ({e})")
                }
            }
        }
        Ok(out)
    }

    /// Number of chunks currently stored.
    ///
    /// Why: lets the warm-boot path log a count and lets callers cheaply check
    /// "is the durable corpus empty?" (first-run / post-upgrade case) without
    /// materializing every row.
    /// What: returns `CHUNKS_TABLE.len()`.
    /// Test: `roundtrip` asserts the count after upsert.
    pub fn chunk_count(&self) -> Result<usize> {
        let txn = self.db.begin_read().context("begin count txn")?;
        let table = txn.open_table(CHUNKS_TABLE)?;
        Ok(table.len().context("count chunks")? as usize)
    }

    /// Borrow the underlying `redb::Database` (issue #41 phase 2).
    ///
    /// Why: the `SymbolGraph` persistence helpers (`save_to_corpus`,
    /// `load_from_corpus`, …) need direct access to the KG tables that live
    /// alongside the chunk corpus in the same redb file. Exposing the
    /// `Database` here means we don't duplicate the file-open dance on every
    /// graph save and avoids opening a second .redb file per index.
    /// What: returns a borrow of `self.db`. Callers can begin read/write
    /// transactions against the KG tables exported as
    /// `pub(crate) const KG_*_TABLE` in this module.
    /// Test: covered indirectly by every `SymbolGraph::*_corpus` test.
    #[allow(dead_code)]
    pub(crate) fn db(&self) -> &Database {
        &self.db
    }

    /// Replace the persisted KG node set + forward/reverse adjacency lists in
    /// one atomic transaction (issue #41 phase 2).
    ///
    /// Why: persisting the symbol graph alongside the chunk corpus lets
    /// warm-boot skip the full `build_from_chunks` rebuild. Doing the whole
    /// write under one transaction guarantees readers never observe a
    /// half-rewritten graph.
    /// What: clears the three KG tables then re-inserts the supplied nodes and
    /// forward/reverse adjacencies. Each value is `serde_json`-encoded. An
    /// `(adj_fwd, adj_rev)` row whose vector is empty is skipped to keep the
    /// stored graph minimal.
    /// Test: `save_load_kg_roundtrip` round-trips a synthetic graph through
    /// `save_kg_graph` + `load_kg_graph` and asserts equality.
    pub fn save_kg_graph(
        &self,
        nodes: &[(String, PersistedKgNode)],
        adj_fwd: &[(String, Vec<(String, String)>)],
        adj_rev: &[(String, Vec<(String, String)>)],
    ) -> Result<()> {
        let txn = self.db.begin_write().context("begin kg graph upsert txn")?;
        {
            let mut nodes_tbl = txn.open_table(KG_NODES_TABLE)?;
            // Drain stale rows first so a shrinking graph doesn't leave orphans.
            nodes_tbl.retain(|_, _| false).context("clear kg_nodes")?;
            for (symbol, node) in nodes {
                let bytes = serde_json::to_vec(node)
                    .with_context(|| format!("serialize kg node {symbol}"))?;
                nodes_tbl
                    .insert(symbol.as_str(), bytes.as_slice())
                    .with_context(|| format!("insert kg node {symbol}"))?;
            }

            let mut fwd_tbl = txn.open_table(KG_EDGES_TABLE)?;
            fwd_tbl.retain(|_, _| false).context("clear kg_edges")?;
            for (src, targets) in adj_fwd {
                if targets.is_empty() {
                    continue;
                }
                let bytes = serde_json::to_vec(targets)
                    .with_context(|| format!("serialize kg fwd adjacency for {src}"))?;
                fwd_tbl
                    .insert(src.as_str(), bytes.as_slice())
                    .with_context(|| format!("insert kg fwd adjacency for {src}"))?;
            }

            let mut rev_tbl = txn.open_table(KG_EDGES_REV_TABLE)?;
            rev_tbl.retain(|_, _| false).context("clear kg_edges_rev")?;
            for (tgt, sources) in adj_rev {
                if sources.is_empty() {
                    continue;
                }
                let bytes = serde_json::to_vec(sources)
                    .with_context(|| format!("serialize kg rev adjacency for {tgt}"))?;
                rev_tbl
                    .insert(tgt.as_str(), bytes.as_slice())
                    .with_context(|| format!("insert kg rev adjacency for {tgt}"))?;
            }
        }
        txn.commit().context("commit kg graph upsert txn")?;
        Ok(())
    }

    /// Load the persisted symbol graph (issue #41 phase 2).
    ///
    /// Why: warm-boot wants to bring the KG back online without paying the
    /// `build_from_chunks` cost. Returning the raw node + adjacency lists lets
    /// the caller (`SymbolGraph::load_from_corpus`) rebuild the in-memory
    /// `petgraph` without re-touching the chunk corpus.
    /// What: returns `(nodes, adj_fwd, adj_rev)` where each list is the
    /// deserialized contents of the three KG tables. An empty (or fresh)
    /// database yields three empty vectors. Corrupt rows are skipped with a
    /// `warn` rather than failing the whole load.
    /// Test: `save_load_kg_roundtrip`.
    #[allow(clippy::type_complexity)]
    pub fn load_kg_graph(
        &self,
    ) -> Result<(
        Vec<(String, PersistedKgNode)>,
        Vec<(String, Vec<(String, String)>)>,
        Vec<(String, Vec<(String, String)>)>,
    )> {
        let txn = self.db.begin_read().context("begin kg graph read txn")?;

        let mut nodes: Vec<(String, PersistedKgNode)> = Vec::new();
        {
            let nodes_tbl = txn.open_table(KG_NODES_TABLE)?;
            for entry in nodes_tbl.iter().context("iterate kg_nodes table")? {
                let (key, value) = entry.context("read kg_nodes row")?;
                let symbol = key.value().to_string();
                match serde_json::from_slice::<PersistedKgNode>(value.value()) {
                    Ok(node) => nodes.push((symbol, node)),
                    Err(e) => tracing::warn!("kg: skipping corrupt kg_nodes row '{symbol}' ({e})"),
                }
            }
        }

        let adj_fwd = load_adjacency(&txn, KG_EDGES_TABLE, "kg_edges")?;
        let adj_rev = load_adjacency(&txn, KG_EDGES_REV_TABLE, "kg_edges_rev")?;
        Ok((nodes, adj_fwd, adj_rev))
    }

    /// Number of persisted KG nodes currently stored.
    ///
    /// Why: warm-boot uses this as a cheap "is the persisted graph populated?"
    /// probe before deciding whether to fall back to `build_from_chunks`.
    /// What: returns the row count of `KG_NODES_TABLE`.
    /// Test: covered by `save_load_kg_roundtrip` (asserts count after save).
    pub fn kg_node_count(&self) -> Result<usize> {
        let txn = self.db.begin_read().context("begin kg count txn")?;
        let table = txn.open_table(KG_NODES_TABLE)?;
        Ok(table.len().context("count kg_nodes")? as usize)
    }

    /// Replace the persisted community records + symbol→community map (migration
    /// tolerance, not called by the active search path as of v0.10.0).
    ///
    /// Why: retained so old tooling that still calls this (e.g. test helpers,
    /// migration utilities) compiles. The Louvain pipeline was removed in
    /// v0.10.0 (issue #152); this method is no longer called by the daemon.
    /// What: clears the two migration-tolerance community tables then re-inserts
    /// the supplied records and per-symbol mappings in one atomic transaction.
    /// Test: `save_load_communities_roundtrip` round-trips a synthetic partition.
    pub fn save_communities(
        &self,
        records: &[(u64, Vec<u8>)],
        symbol_to_community: &[(String, u64)],
    ) -> Result<()> {
        let txn = self
            .db
            .begin_write()
            .context("begin communities upsert txn")?;
        {
            let mut comm_tbl = txn.open_table(KG_COMMUNITIES_TABLE)?;
            comm_tbl
                .retain(|_, _| false)
                .context("clear kg_communities")?;
            for (id, bytes) in records {
                comm_tbl
                    .insert(id, bytes.as_slice())
                    .with_context(|| format!("insert community {id}"))?;
            }
            let mut sym_tbl = txn.open_table(KG_SYMBOL_COMMUNITY_TABLE)?;
            sym_tbl
                .retain(|_, _| false)
                .context("clear kg_symbol_community")?;
            for (sym, id) in symbol_to_community {
                sym_tbl
                    .insert(sym.as_str(), id)
                    .with_context(|| format!("insert symbol→community for {sym}"))?;
            }
        }
        txn.commit().context("commit communities upsert txn")?;
        Ok(())
    }

    /// Load persisted community records (migration tolerance, not called by
    /// the active search path as of v0.10.0).
    ///
    /// Why: retained for parity with `save_communities` so old code that calls
    /// both still compiles. The `/communities` HTTP endpoint was removed in
    /// v0.10.0 (issue #152).
    /// What: returns `Vec<(community_id, serialized_record_bytes)>` from the
    /// migration-tolerance `kg_communities` redb table.
    /// Test: `save_load_communities_roundtrip`.
    pub fn load_communities(&self) -> Result<Vec<(u64, Vec<u8>)>> {
        let txn = self.db.begin_read().context("begin communities read txn")?;
        let table = txn.open_table(KG_COMMUNITIES_TABLE)?;
        let mut out: Vec<(u64, Vec<u8>)> = Vec::new();
        for entry in table.iter().context("iterate kg_communities table")? {
            let (key, value) = entry.context("read kg_communities row")?;
            out.push((key.value(), value.value().to_vec()));
        }
        Ok(out)
    }

    /// Look up the community id for a single symbol (migration tolerance, not
    /// called by the active search path as of v0.10.0).
    ///
    /// Why: retained for parity with `save_communities` / `load_communities`
    /// so any surviving callers compile. Community id lookups were removed from
    /// the search materialisation path in v0.10.0 (issue #152).
    /// What: returns `Ok(Some(id))` when the symbol has an entry in the legacy
    /// `kg_symbol_community` table; `Ok(None)` otherwise.
    /// Test: `save_load_communities_roundtrip` asserts point reads.
    pub fn symbol_community(&self, symbol: &str) -> Result<Option<u64>> {
        let txn = self
            .db
            .begin_read()
            .context("begin symbol_community read txn")?;
        let table = txn.open_table(KG_SYMBOL_COMMUNITY_TABLE)?;
        Ok(table
            .get(symbol)
            .context("get symbol_community row")?
            .map(|v| v.value()))
    }

    /// Read the `schema_version` entry from the `_meta` table (migration
    /// framework, issue #migration).
    ///
    /// Why: the migration runner needs to know the index's current schema
    /// version before deciding which migrations to apply. Keeping the read
    /// synchronous (like all other `CorpusStore` methods) lets callers manage
    /// the async boundary via `spawn_blocking`.
    /// What: opens a read transaction on `_meta`, looks up
    /// `META_KEY_SCHEMA_VERSION`, and decodes the 4-byte little-endian value.
    /// Returns `0` when the table or key is absent (legacy indexes created
    /// before the migration framework was introduced).
    /// Test: `test_meta_schema_version_roundtrip` in `corpus::tests`.
    pub(crate) fn read_schema_version_sync(&self) -> Result<u32> {
        use crate::core::migration::{META_KEY_SCHEMA_VERSION, META_TABLE};
        let txn = self.db.begin_read().context("begin _meta read txn")?;
        let table = match txn.open_table(META_TABLE) {
            Ok(t) => t,
            Err(redb::TableError::TableDoesNotExist(_)) => return Ok(0),
            Err(e) => return Err(anyhow::anyhow!("open _meta table: {e}")),
        };
        match table
            .get(META_KEY_SCHEMA_VERSION)
            .context("read schema_version")?
        {
            Some(v) => {
                let bytes = v.value();
                if bytes.len() == 4 {
                    Ok(u32::from_le_bytes([bytes[0], bytes[1], bytes[2], bytes[3]]))
                } else {
                    Ok(0)
                }
            }
            None => Ok(0),
        }
    }

    /// Write the `schema_version` entry to the `_meta` table (migration
    /// framework, issue #migration).
    ///
    /// Why: the migration runner writes the new version after a successful
    /// `apply` so the version advances durably. Crash between `apply` and this
    /// write → retry next startup (idempotent `apply` makes that safe).
    /// What: opens a write transaction, creates `_meta` if absent, and upserts
    /// `schema_version` as a 4-byte little-endian value.
    /// Test: `test_meta_schema_version_roundtrip` in `corpus::tests`.
    pub(crate) fn write_schema_version_sync(&self, version: u32) -> Result<()> {
        use crate::core::migration::{META_KEY_SCHEMA_VERSION, META_TABLE};
        let txn = self.db.begin_write().context("begin _meta write txn")?;
        {
            let mut table = txn.open_table(META_TABLE).context("open _meta table")?;
            let bytes = version.to_le_bytes();
            table
                .insert(META_KEY_SCHEMA_VERSION, bytes.as_slice())
                .context("insert schema_version")?;
        }
        txn.commit().context("commit _meta write txn")?;
        Ok(())
    }
}

/// Iterate one of the KG adjacency tables and deserialize each row.
///
/// Why: `KG_EDGES_TABLE` and `KG_EDGES_REV_TABLE` have identical shapes
/// (`symbol → Vec<(edge_kind, peer_symbol)>`); centralising the read avoids
/// duplicating the corrupt-row tolerance and `serde_json` decode boilerplate.
/// What: walks the table on the supplied read transaction and returns a
/// `Vec<(key, adjacency)>`. Corrupt rows are logged at `warn` and skipped.
/// Test: covered transitively by `save_load_kg_roundtrip`.
#[allow(clippy::type_complexity)]
fn load_adjacency(
    txn: &redb::ReadTransaction,
    table_def: TableDefinition<'_, &str, &[u8]>,
    label: &str,
) -> Result<Vec<(String, Vec<(String, String)>)>> {
    let table = txn.open_table(table_def)?;
    let mut out: Vec<(String, Vec<(String, String)>)> = Vec::new();
    for entry in table
        .iter()
        .with_context(|| format!("iterate {label} table"))?
    {
        let (key, value) = entry.with_context(|| format!("read {label} row"))?;
        let sym = key.value().to_string();
        match serde_json::from_slice::<Vec<(String, String)>>(value.value()) {
            Ok(adj) => out.push((sym, adj)),
            Err(e) => tracing::warn!("kg: skipping corrupt {label} row '{sym}' ({e})"),
        }
    }
    Ok(out)
}

/// Compact on-disk representation of a [`crate::core::symbol_graph::SymbolNode`]
/// (issue #41 phase 2).
///
/// Why: the runtime `SymbolNode` carries the symbol name three times (as the
/// `petgraph` node weight, the `by_symbol` map key, and inside the node
/// itself). Storing only `chunk_id + file` (with the symbol implied by the
/// row key) keeps the on-disk size lean and avoids a String redundancy.
/// What: serde-derived JSON payload stored under `KG_NODES_TABLE[symbol]`.
/// Test: covered by `save_load_kg_roundtrip` in this module and by the
/// `SymbolGraph` round-trip test in `core::symbol_graph::tests`.
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq)]
pub struct PersistedKgNode {
    pub chunk_id: String,
    pub file: String,
}

#[cfg(test)]
mod tests {
    use super::*;
    use crate::core::chunker::{ChunkType, RawChunk};

    /// Build a minimal `RawChunk` for tests.
    fn raw(id: &str, content: &str) -> RawChunk {
        RawChunk {
            id: id.to_string(),
            file: "src/lib.rs".to_string(),
            start_line: 1,
            end_line: 1,
            content: content.to_string(),
            function_name: None,
            language: Some("rust".to_string()),
            chunk_type: ChunkType::Code,
            calls: Vec::new(),
            inherits_from: Vec::new(),
            chunk_depth: 0,
            parent_chunk_id: None,
            child_chunk_ids: Vec::new(),
            nlp_keywords: Vec::new(),
            nlp_code_refs: Vec::new(),
            virtual_terms: Vec::new(),
        }
    }

    #[test]
    fn redb_cache_size_default_and_env_override() {
        // Idle-memory audit: the redb page cache defaults to 64 MB (issue #329
        // B.2 quick-win; was 512 MB before empirical profiling confirmed actual
        // fill of ~87 MB) and is overridable via TRUSTY_REDB_CACHE_MB. This test
        // mutates a process-global env var, so it is intentionally self-contained
        // (save/restore the prior value) — no other test in this module reads
        // TRUSTY_REDB_CACHE_MB.
        let prior = std::env::var("TRUSTY_REDB_CACHE_MB").ok();

        // Default: unset → 64 MB.
        // SAFETY: corpus tests do not mutate this env var concurrently.
        unsafe { std::env::remove_var("TRUSTY_REDB_CACHE_MB") };
        assert_eq!(redb_cache_size_bytes(), DEFAULT_REDB_CACHE_MB * 1024 * 1024);

        // Valid override wins.
        // SAFETY: see above.
        unsafe { std::env::set_var("TRUSTY_REDB_CACHE_MB", "1024") };
        assert_eq!(redb_cache_size_bytes(), 1024 * 1024 * 1024);

        // Zero falls back to the default.
        // SAFETY: see above.
        unsafe { std::env::set_var("TRUSTY_REDB_CACHE_MB", "0") };
        assert_eq!(redb_cache_size_bytes(), DEFAULT_REDB_CACHE_MB * 1024 * 1024);

        // Garbage falls back to the default (with a warn).
        // SAFETY: see above.
        unsafe { std::env::set_var("TRUSTY_REDB_CACHE_MB", "not-a-number") };
        assert_eq!(redb_cache_size_bytes(), DEFAULT_REDB_CACHE_MB * 1024 * 1024);

        // Restore.
        // SAFETY: see above.
        unsafe {
            match prior {
                Some(v) => std::env::set_var("TRUSTY_REDB_CACHE_MB", v),
                None => std::env::remove_var("TRUSTY_REDB_CACHE_MB"),
            }
        }
    }

    #[test]
    fn roundtrip() {
        let dir = tempfile::tempdir().unwrap();
        let store = CorpusStore::open(&dir.path().join("index.redb")).unwrap();

        let chunks = vec![raw("a:1:1", "fn a() {}"), raw("b:1:1", "fn b() {}")];
        store.upsert_chunks(&chunks).unwrap();
        store
            .upsert_entities(&[("src/lib.rs".to_string(), Vec::new())])
            .unwrap();
        assert_eq!(store.chunk_count().unwrap(), 2);

        // Reopen to simulate a daemon restart.
        drop(store);
        let store = CorpusStore::open(&dir.path().join("index.redb")).unwrap();
        let mut loaded = store.load_all_chunks().unwrap();
        loaded.sort_by(|x, y| x.id.cmp(&y.id));
        assert_eq!(loaded.len(), 2);
        assert_eq!(loaded[0].id, "a:1:1");
        assert_eq!(loaded[0].content, "fn a() {}");

        let entities = store.load_all_entities().unwrap();
        assert_eq!(entities.len(), 1);
        assert_eq!(entities[0].0, "src/lib.rs");
    }

    #[test]
    fn batch_upsert_is_atomic_roundtrip() {
        // Issue #29: `upsert_batch` writes chunks + entities in one redb
        // transaction. A reopened store must see both, exactly as the
        // separate-call `roundtrip` test asserts for `upsert_chunks` /
        // `upsert_entities`.
        let dir = tempfile::tempdir().unwrap();
        let path = dir.path().join("index.redb");
        {
            let store = CorpusStore::open(&path).unwrap();
            store
                .upsert_batch(
                    &[raw("a:1:1", "fn a() {}"), raw("b:1:1", "fn b() {}")],
                    &[("src/lib.rs".to_string(), Vec::new())],
                )
                .unwrap();
            assert_eq!(store.chunk_count().unwrap(), 2);
        }
        // Reopen to simulate a daemon restart — both tables must be intact.
        let store = CorpusStore::open(&path).unwrap();
        let mut loaded = store.load_all_chunks().unwrap();
        loaded.sort_by(|x, y| x.id.cmp(&y.id));
        assert_eq!(loaded.len(), 2);
        assert_eq!(loaded[0].id, "a:1:1");
        let entities = store.load_all_entities().unwrap();
        assert_eq!(entities.len(), 1);
        assert_eq!(entities[0].0, "src/lib.rs");

        // A batch with only chunks still writes the chunks table.
        store
            .upsert_batch(&[raw("c:1:1", "fn c() {}")], &[])
            .unwrap();
        assert_eq!(store.chunk_count().unwrap(), 3);

        // A batch with only entities still writes the entities table.
        store
            .upsert_batch(&[], &[("src/other.rs".to_string(), Vec::new())])
            .unwrap();
        assert_eq!(store.load_all_entities().unwrap().len(), 2);

        // A fully-empty batch is a silent no-op.
        store.upsert_batch(&[], &[]).unwrap();
        assert_eq!(store.chunk_count().unwrap(), 3);
    }

    #[test]
    fn get_chunks_batch_reads_subset() {
        // Issue #28 deferred item: the query hot path materializes top-k
        // results via `get_chunks`. It must return only the requested ids, in
        // input order, and silently skip ids absent from the corpus.
        let dir = tempfile::tempdir().unwrap();
        let store = CorpusStore::open(&dir.path().join("index.redb")).unwrap();
        store
            .upsert_chunks(&[
                raw("a:1:1", "fn a() {}"),
                raw("b:1:1", "fn b() {}"),
                raw("c:1:1", "fn c() {}"),
            ])
            .unwrap();

        // Request a subset out of corpus order, with one unknown id mixed in.
        let got = store
            .get_chunks(&["c:1:1", "missing:0:0", "a:1:1"])
            .unwrap();
        assert_eq!(got.len(), 2, "unknown id must be skipped, not error");
        assert_eq!(got[0].id, "c:1:1", "input order must be preserved");
        assert_eq!(got[0].content, "fn c() {}");
        assert_eq!(got[1].id, "a:1:1");

        // Empty input is a no-op.
        assert!(store.get_chunks(&[]).unwrap().is_empty());

        // All-missing input yields an empty vec, never an error.
        assert!(store.get_chunks(&["nope:0:0"]).unwrap().is_empty());
    }

    #[test]
    fn missing_db_is_empty() {
        // A brand-new database (post-upgrade / first-run) must open cleanly
        // and report an empty corpus rather than erroring.
        let dir = tempfile::tempdir().unwrap();
        let store = CorpusStore::open(&dir.path().join("fresh.redb")).unwrap();
        assert_eq!(store.chunk_count().unwrap(), 0);
        assert!(store.load_all_chunks().unwrap().is_empty());
        assert!(store.load_all_entities().unwrap().is_empty());
    }

    #[test]
    fn delete_removes_chunk() {
        let dir = tempfile::tempdir().unwrap();
        let store = CorpusStore::open(&dir.path().join("index.redb")).unwrap();
        store
            .upsert_chunks(&[raw("a:1:1", "x"), raw("b:1:1", "y")])
            .unwrap();
        store.delete_chunks(&["a:1:1".to_string()]).unwrap();
        assert_eq!(store.chunk_count().unwrap(), 1);
        let loaded = store.load_all_chunks().unwrap();
        assert_eq!(loaded.len(), 1);
        assert_eq!(loaded[0].id, "b:1:1");
        // Deleting an unknown id is a silent no-op.
        store.delete_chunks(&["nope:0:0".to_string()]).unwrap();
        assert_eq!(store.chunk_count().unwrap(), 1);
    }

    #[test]
    fn empty_batches_are_noops() {
        let dir = tempfile::tempdir().unwrap();
        let store = CorpusStore::open(&dir.path().join("index.redb")).unwrap();
        store.upsert_chunks(&[]).unwrap();
        store.upsert_entities(&[]).unwrap();
        store.delete_chunks(&[]).unwrap();
        assert_eq!(store.chunk_count().unwrap(), 0);
    }

    #[test]
    fn delete_entities_removes_file_row() {
        let dir = tempfile::tempdir().unwrap();
        let store = CorpusStore::open(&dir.path().join("index.redb")).unwrap();
        store
            .upsert_entities(&[
                ("src/a.rs".to_string(), Vec::new()),
                ("src/b.rs".to_string(), Vec::new()),
            ])
            .unwrap();
        assert_eq!(store.load_all_entities().unwrap().len(), 2);
        store.delete_entities("src/a.rs").unwrap();
        let remaining = store.load_all_entities().unwrap();
        assert_eq!(remaining.len(), 1);
        assert_eq!(remaining[0].0, "src/b.rs");
        // Deleting an unknown file is a silent no-op.
        store.delete_entities("src/never.rs").unwrap();
        assert_eq!(store.load_all_entities().unwrap().len(), 1);
    }

    #[test]
    fn path_accessor_returns_open_path() {
        // Issue #28 Phase 4: the atomic-swap path reads `path()` to know which
        // file to rename. It must echo back exactly what `open` was given.
        let dir = tempfile::tempdir().unwrap();
        let p = dir.path().join("index.redb");
        let store = CorpusStore::open(&p).unwrap();
        assert_eq!(store.path(), p.as_path());
    }

    #[test]
    fn open_fresh_truncates_stale_staging_file() {
        // Issue #28 Phase 4: a stale `index.redb.tmp` left by an aborted
        // reindex must not contribute pre-existing rows to the next staged
        // corpus — `open_fresh` discards the old file first.
        let dir = tempfile::tempdir().unwrap();
        let p = dir.path().join("index.redb.tmp");

        // Populate, then drop so the file is closed and persisted on disk.
        {
            let store = CorpusStore::open(&p).unwrap();
            store.upsert_chunks(&[raw("stale:1:1", "old")]).unwrap();
            assert_eq!(store.chunk_count().unwrap(), 1);
        }
        assert!(p.exists());

        // `open_fresh` must yield an empty corpus despite the existing file.
        let fresh = CorpusStore::open_fresh(&p).unwrap();
        assert_eq!(fresh.chunk_count().unwrap(), 0);
        assert_eq!(fresh.path(), p.as_path());

        // And `open_fresh` on a path that does not exist is also fine.
        let fresh2 = CorpusStore::open_fresh(&dir.path().join("never.redb.tmp")).unwrap();
        assert_eq!(fresh2.chunk_count().unwrap(), 0);
    }

    /// Issue #41 phase 2: round-trip a tiny KG through `save_kg_graph` and
    /// `load_kg_graph`. Closes (and reopens) the store between save and load
    /// to prove the data is durable, not just held in process memory.
    #[test]
    fn save_load_kg_roundtrip() {
        let dir = tempfile::tempdir().unwrap();
        let path = dir.path().join("index.redb");

        let nodes = vec![
            (
                "alpha".to_string(),
                PersistedKgNode {
                    chunk_id: "a:1:1".into(),
                    file: "a.rs".into(),
                },
            ),
            (
                "beta".to_string(),
                PersistedKgNode {
                    chunk_id: "b:1:1".into(),
                    file: "b.rs".into(),
                },
            ),
        ];
        let adj_fwd = vec![(
            "alpha".to_string(),
            vec![("CallsFunction".to_string(), "beta".to_string())],
        )];
        let adj_rev = vec![(
            "beta".to_string(),
            vec![("CallsFunction".to_string(), "alpha".to_string())],
        )];

        {
            let store = CorpusStore::open(&path).unwrap();
            store
                .save_kg_graph(&nodes, &adj_fwd, &adj_rev)
                .expect("save kg");
            assert_eq!(store.kg_node_count().unwrap(), 2);
        }

        // Reopen and assert every row survived.
        let store = CorpusStore::open(&path).unwrap();
        let (loaded_nodes, loaded_fwd, loaded_rev) = store.load_kg_graph().unwrap();
        assert_eq!(loaded_nodes.len(), 2);
        assert_eq!(loaded_fwd, adj_fwd);
        assert_eq!(loaded_rev, adj_rev);

        // Saving an empty graph clears every table.
        store.save_kg_graph(&[], &[], &[]).unwrap();
        assert_eq!(store.kg_node_count().unwrap(), 0);
        let (n, f, r) = store.load_kg_graph().unwrap();
        assert!(n.is_empty() && f.is_empty() && r.is_empty());
    }
}