trusty-search 0.26.0

Machine-wide hybrid code search service: BM25 + vector + KG, zero cold-start, MCP server
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207
1208
1209
1210
1211
1212
1213
1214
1215
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
1229
1230
1231
1232
1233
1234
1235
1236
1237
1238
1239
1240
1241
1242
1243
1244
1245
1246
1247
1248
1249
1250
1251
1252
1253
1254
1255
1256
1257
1258
1259
1260
1261
1262
1263
1264
1265
1266
1267
1268
1269
1270
1271
1272
1273
1274
1275
1276
1277
1278
1279
1280
1281
1282
1283
1284
1285
1286
1287
1288
1289
1290
1291
1292
1293
1294
1295
1296
1297
1298
1299
1300
1301
1302
1303
1304
1305
1306
1307
1308
1309
1310
1311
1312
1313
1314
1315
1316
1317
1318
1319
1320
1321
1322
1323
1324
1325
1326
1327
1328
1329
1330
1331
1332
1333
1334
1335
1336
1337
1338
1339
1340
1341
1342
1343
1344
1345
1346
1347
1348
1349
1350
1351
1352
1353
1354
1355
1356
1357
1358
1359
1360
1361
1362
1363
1364
1365
1366
1367
1368
1369
1370
1371
1372
1373
1374
1375
1376
1377
1378
1379
1380
1381
1382
1383
1384
1385
1386
1387
1388
1389
1390
1391
1392
1393
1394
1395
1396
1397
1398
1399
1400
1401
1402
1403
1404
1405
1406
1407
1408
1409
1410
1411
1412
1413
1414
1415
1416
1417
1418
1419
1420
1421
1422
1423
1424
1425
1426
1427
1428
1429
1430
1431
1432
1433
1434
1435
1436
1437
1438
1439
1440
1441
1442
1443
1444
1445
1446
1447
1448
1449
1450
1451
1452
1453
1454
1455
1456
1457
1458
1459
1460
1461
1462
1463
1464
1465
1466
1467
1468
1469
1470
1471
1472
1473
1474
1475
1476
1477
1478
1479
1480
1481
1482
1483
1484
1485
1486
1487
1488
1489
1490
1491
1492
1493
1494
1495
1496
1497
1498
1499
1500
1501
1502
1503
1504
1505
1506
1507
1508
1509
1510
1511
1512
1513
1514
1515
1516
1517
1518
1519
1520
1521
1522
1523
1524
1525
1526
1527
1528
1529
1530
1531
1532
1533
1534
1535
1536
1537
1538
1539
1540
1541
1542
1543
1544
1545
1546
1547
1548
1549
1550
1551
1552
1553
1554
1555
1556
1557
1558
1559
1560
1561
1562
1563
1564
1565
1566
1567
1568
1569
1570
1571
1572
1573
1574
1575
1576
1577
1578
1579
1580
1581
1582
1583
1584
1585
1586
1587
1588
1589
1590
1591
1592
1593
1594
1595
1596
1597
1598
1599
1600
1601
1602
1603
1604
1605
1606
1607
1608
1609
1610
1611
1612
1613
1614
1615
1616
1617
1618
1619
1620
1621
1622
1623
1624
1625
1626
1627
1628
1629
1630
1631
1632
1633
1634
1635
1636
1637
1638
1639
1640
1641
1642
1643
1644
1645
1646
1647
1648
1649
1650
1651
1652
1653
1654
1655
1656
1657
1658
1659
1660
1661
1662
1663
1664
1665
1666
1667
1668
1669
1670
1671
1672
1673
1674
1675
1676
1677
1678
1679
1680
1681
1682
1683
1684
1685
1686
1687
1688
1689
1690
1691
1692
1693
1694
1695
1696
1697
1698
1699
1700
1701
1702
1703
1704
1705
1706
1707
1708
1709
1710
1711
1712
1713
1714
1715
1716
1717
1718
1719
1720
1721
1722
1723
1724
1725
1726
1727
1728
1729
1730
1731
1732
1733
1734
1735
1736
1737
1738
1739
1740
1741
1742
1743
1744
1745
1746
1747
1748
1749
1750
1751
1752
1753
1754
1755
1756
1757
use super::*;
use crate::core::indexer::CodeIndexer;
use crate::core::registry::{IndexHandle, IndexId, IndexStages, StageStatus};
use std::fs;
use std::sync::atomic::Ordering;
use std::sync::Arc;

/// Filter wiring: with `include_paths` set on the handle, the reindex
/// must walk ONLY those subtrees. Files outside the configured slice
/// must not appear in the corpus.
///
/// Why: `trusty-search.yaml` declares `paths: [api/src]` to slice a
/// polyrepo. Without this test, a regression that drops the
/// `handle.include_paths` branch silently reverts to "walk everything",
/// which is the bug the YAML config exists to avoid.
/// What: stage a fixture with `api/keep.rs` and `ui/drop.rs`, register a
/// handle whose `include_paths = [<root>/api]`, run the reindex, and
/// assert only the api file was indexed.
/// Test: this test.
#[tokio::test]
async fn reindex_honours_include_paths_filter() {
    let tmp = tempfile::tempdir().expect("tempdir");
    let root = tmp.path().to_path_buf();
    fs::create_dir_all(root.join("api")).unwrap();
    fs::create_dir_all(root.join("ui")).unwrap();
    fs::write(root.join("api/keep.rs"), "fn keep_me() {}\n").unwrap();
    fs::write(root.join("ui/drop.rs"), "fn drop_me() {}\n").unwrap();

    let indexer = CodeIndexer::new("filter-test", root.clone());
    let handle = Arc::new(IndexHandle {
        id: IndexId::new("filter-test"),
        indexer: Arc::new(tokio::sync::RwLock::new(indexer)),
        root_path: root.clone(),
        include_paths: vec![root.join("api")],
        exclude_globs: vec![],
        extensions: vec![],
        domain_terms: vec![],
        include_docs: false,
        respect_gitignore: true,
        extra_skip_dirs: crate::service::walker::default_extra_skip_dirs(),
        data_file_max_bytes: crate::service::walker::DEFAULT_DATA_FILE_MAX_BYTES,
        path_filter: vec![],
        context_embedding: Arc::new(tokio::sync::RwLock::new(None)),
        context_summary: Arc::new(tokio::sync::RwLock::new(None)),
        indexed_head_sha: Arc::new(tokio::sync::RwLock::new(None)),
        last_indexed_at: Arc::new(tokio::sync::RwLock::new(None)),
        lexical_only: false,
        skip_kg: false,
        defer_embed: true,
        stages: Arc::new(tokio::sync::RwLock::new(IndexStages::default())),
        search_pressure: Arc::new(tokio::sync::Notify::new()),
        walk_diagnostics: Arc::new(tokio::sync::RwLock::new(
            crate::core::registry::WalkDiagnostics::default(),
        )),
    });
    let progress = Arc::new(ReindexProgress::new());
    spawn_reindex(handle.clone(), progress.clone(), false);

    // Wait up to 10s for completion.
    for _ in 0..100 {
        if progress.status.load() == ReindexStatus::Complete {
            break;
        }
        tokio::time::sleep(std::time::Duration::from_millis(100)).await;
    }
    assert_eq!(progress.status.load(), ReindexStatus::Complete);
    assert_eq!(
        progress.total_files.load(Ordering::Acquire),
        1,
        "only api/keep.rs should be walked"
    );

    // And the corpus must contain `keep_me` but not `drop_me`.
    let idx = handle.indexer.read().await;
    let r = idx
        .search(&crate::core::indexer::SearchQuery {
            text: "keep_me".into(),
            top_k: 5,
            expand_graph: false,
            compact: false,
            ..Default::default()
        })
        .await
        .unwrap();
    assert!(r.iter().any(|c| c.content.contains("keep_me")));
    let r2 = idx
        .search(&crate::core::indexer::SearchQuery {
            text: "drop_me".into(),
            top_k: 5,
            expand_graph: false,
            compact: false,
            ..Default::default()
        })
        .await
        .unwrap();
    assert!(
        !r2.iter().any(|c| c.content.contains("drop_me")),
        "ui/drop.rs must not have been indexed"
    );
}

/// Issue #111 end-to-end: with `path_filter = ["common-*"]`, the reindex
/// must include files inside `common-utils/` but exclude `other-repo/`.
/// Uses the BM25-only path (no embedder needed) for hermetic execution.
#[tokio::test]
async fn reindex_honours_path_filter() {
    let tmp = tempfile::tempdir().expect("tempdir");
    let root = tmp.path().to_path_buf();
    std::fs::create_dir_all(root.join("common-utils")).unwrap();
    std::fs::create_dir_all(root.join("other-repo")).unwrap();
    std::fs::write(root.join("common-utils/keep.rs"), "fn keep_common() {}\n").unwrap();
    std::fs::write(root.join("other-repo/drop.rs"), "fn drop_other() {}\n").unwrap();

    let indexer = CodeIndexer::new("pf-test", root.clone());
    let handle = Arc::new(IndexHandle {
        id: IndexId::new("pf-test"),
        indexer: Arc::new(tokio::sync::RwLock::new(indexer)),
        root_path: root.clone(),
        include_paths: vec![],
        exclude_globs: vec![],
        extensions: vec![],
        domain_terms: vec![],
        include_docs: false,
        respect_gitignore: true,
        extra_skip_dirs: crate::service::walker::default_extra_skip_dirs(),
        data_file_max_bytes: crate::service::walker::DEFAULT_DATA_FILE_MAX_BYTES,
        path_filter: vec!["common-*".to_string()],
        context_embedding: Arc::new(tokio::sync::RwLock::new(None)),
        context_summary: Arc::new(tokio::sync::RwLock::new(None)),
        indexed_head_sha: Arc::new(tokio::sync::RwLock::new(None)),
        last_indexed_at: Arc::new(tokio::sync::RwLock::new(None)),
        lexical_only: false,
        skip_kg: false,
        defer_embed: true,
        stages: Arc::new(tokio::sync::RwLock::new(IndexStages::default())),
        search_pressure: Arc::new(tokio::sync::Notify::new()),
        walk_diagnostics: Arc::new(tokio::sync::RwLock::new(
            crate::core::registry::WalkDiagnostics::default(),
        )),
    });
    let progress = Arc::new(ReindexProgress::new());
    spawn_reindex(handle.clone(), progress.clone(), false);

    for _ in 0..100 {
        if progress.status.load() == ReindexStatus::Complete {
            break;
        }
        tokio::time::sleep(std::time::Duration::from_millis(100)).await;
    }
    assert_eq!(progress.status.load(), ReindexStatus::Complete);
    assert_eq!(
        progress.total_files.load(Ordering::Acquire),
        1,
        "only common-utils/keep.rs should pass the path_filter"
    );

    let idx = handle.indexer.read().await;
    let r = idx
        .search(&crate::core::indexer::SearchQuery {
            text: "keep_common".into(),
            top_k: 5,
            expand_graph: false,
            compact: false,
            ..Default::default()
        })
        .await
        .unwrap();
    assert!(r.iter().any(|c| c.content.contains("keep_common")));
    let r2 = idx
        .search(&crate::core::indexer::SearchQuery {
            text: "drop_other".into(),
            top_k: 5,
            expand_graph: false,
            compact: false,
            ..Default::default()
        })
        .await
        .unwrap();
    assert!(
        !r2.iter().any(|c| c.content.contains("drop_other")),
        "other-repo must not have been indexed"
    );
}

#[tokio::test]
async fn reindex_walks_directory_and_emits_events() {
    let tmp = tempfile::tempdir().expect("tempdir");
    let root = tmp.path().to_path_buf();
    fs::write(root.join("a.rs"), "fn a() {}").unwrap();
    fs::write(root.join("b.py"), "def b():\n    pass\n").unwrap();
    fs::create_dir(root.join("target")).unwrap();
    fs::write(root.join("target/skip.rs"), "fn skip() {}").unwrap();

    let indexer = CodeIndexer::new("test".to_string(), root.clone());
    let handle = Arc::new(IndexHandle::bare(
        IndexId::new("test"),
        Arc::new(tokio::sync::RwLock::new(indexer)),
        root.clone(),
    ));
    let progress = Arc::new(ReindexProgress::new());
    spawn_reindex(handle, progress.clone(), false);

    // Wait up to 10s for completion.
    for _ in 0..100 {
        if progress.status.load() == ReindexStatus::Complete {
            break;
        }
        tokio::time::sleep(std::time::Duration::from_millis(100)).await;
    }
    assert_eq!(progress.status.load(), ReindexStatus::Complete);
    assert_eq!(progress.total_files.load(Ordering::Acquire), 2);
    assert_eq!(progress.indexed.load(Ordering::Acquire), 2);

    let events = progress.events.lock().await;
    // Issue #317: the daemon now emits `walk_complete` BEFORE `start` so
    // the CLI can render a dedicated "Walking files…" phase. The first
    // event is `walk_complete`; `start` is the second event. Older
    // assertions that expected `start` to be first are updated here.
    assert!(
        events
            .first()
            .map(|s| s.contains("\"walk_complete\""))
            .unwrap_or(false),
        "first event must be walk_complete (issue #317); got: {:?}",
        events.first()
    );
    assert!(
        events
            .get(1)
            .map(|s| s.contains("\"start\""))
            .unwrap_or(false),
        "second event must be start; got: {:?}",
        events.get(1)
    );
    assert!(
        events
            .last()
            .map(|s| s.contains("\"complete\""))
            .unwrap_or(false),
        "last event must be complete; got: {:?}",
        events.last()
    );
}

/// Issue #100 follow-up: end-to-end guard that the walker → chunker →
/// corpus pipeline persists chunks, distinct from the walker-only unit
/// tests next to `walk_source_files_with_options`. The follow-up report
/// for issue #100 observed `files=N chunks=0` after a v0.8.0 → v0.8.1
/// daemon upgrade and (incorrectly) attributed it to the walker swap;
/// the actual cause was the per-process content-hash cache hash-skipping
/// every file on the second reindex (`force=false`). This test pins both
/// the correct first-reindex chunking path AND the expected hash-skip
/// fast path on a second reindex, so any future walker rewrite that
/// silently drops paths fails here loudly while the documented fast
/// path keeps working.
///
/// Why: the unit walker tests only assert what the walker yields; they
/// can't catch a chunker that silently emits zero chunks (the first half
/// of this test) nor can they observe the hash-skip path (the second
/// half). Without an e2e assertion the next time someone misreads the
/// `chunks=0` log they'll bisect the walker again.
/// What: stages a small repo (`.gitignore` excluding `excluded/`, plus a
/// `crates/foo/src/lib.rs` with 3 `pub fn` definitions), runs the FULL
/// reindex pipeline twice, and asserts:
///   1. First reindex (cold cache): `total_chunks > 0`, corpus
///      `chunk_count() > 0`, and a search for `alpha` returns a chunk
///      whose `file` field equals the canonical path of `lib.rs`.
///   2. Second reindex (warm cache): `total_chunks == 0` AND
///      `skipped == 1` — confirming the hash-skip path fires for
///      unchanged content (the failure mode operators mistake for a
///      walker regression).
/// Test: this test.
#[tokio::test]
async fn reindex_persists_chunks_end_to_end() {
    let tmp = tempfile::tempdir().expect("tempdir");
    let root = tmp.path().to_path_buf();
    // Stage a tiny `crates/foo/src/lib.rs` with 3 functions plus a
    // gitignored `excluded/` subtree that must NOT contribute chunks.
    fs::create_dir_all(root.join("crates/foo/src")).unwrap();
    fs::create_dir_all(root.join("excluded")).unwrap();
    fs::write(root.join(".gitignore"), "excluded/\n").unwrap();
    let lib_rs = root.join("crates/foo/src/lib.rs");
    fs::write(
        &lib_rs,
        "pub fn alpha() {}\n\npub fn beta() -> i32 { 1 }\n\npub fn gamma(x: i32) -> i32 { x + 1 }\n",
    )
    .unwrap();
    fs::write(
        root.join("excluded/should_not_index.rs"),
        "pub fn nope() {}\n",
    )
    .unwrap();

    // Use a unique IndexId so the per-process `file_hashes` static (shared
    // across tests in the same binary) doesn't interfere — earlier tests
    // in this module reindex other temp dirs against unrelated index ids.
    let id = IndexId::new("e2e-pipeline-test");
    let indexer = CodeIndexer::new(id.0.clone(), root.clone());
    let handle = Arc::new(IndexHandle::bare(
        id.clone(),
        Arc::new(tokio::sync::RwLock::new(indexer)),
        root.clone(),
    ));

    // ----- First reindex: cold cache, chunks must be produced. -----
    let progress = Arc::new(ReindexProgress::new());
    spawn_reindex(handle.clone(), progress.clone(), false);
    for _ in 0..100 {
        if progress.status.load() == ReindexStatus::Complete {
            break;
        }
        tokio::time::sleep(std::time::Duration::from_millis(100)).await;
    }
    assert_eq!(progress.status.load(), ReindexStatus::Complete);

    // Walker yields exactly one file (`crates/foo/src/lib.rs`).
    assert_eq!(
        progress.total_files.load(Ordering::Acquire),
        1,
        "walker must yield exactly 1 file (gitignored subtree pruned)"
    );

    // The smoking-gun assertion the unit walker tests missed: the chunker
    // must have *persisted* chunks, not just been handed paths.
    let chunks = progress.total_chunks.load(Ordering::Acquire);
    assert!(
        chunks > 0,
        "regression: walker yielded 1 file but chunker persisted 0 chunks \
         on the first (cold-cache) reindex"
    );

    // On the cold-cache run the hash-skip path must NOT have fired.
    assert_eq!(
        progress.skipped.load(Ordering::Acquire),
        0,
        "first reindex hash-skipped a file (cold cache should hash-miss everything)"
    );

    // Issue #602 — portability: the corpus must store the ROOT-RELATIVE
    // path (`crates/foo/src/lib.rs`), and search must resolve it against the
    // serving host's `root_path`. Search results are intentionally absolute
    // (resolved via `resolve_chunk_file`), so a chunk written under one root
    // and served under a different root resolves correctly on each host.
    // The chunk-write `strip_prefix` now strips against the canonical walk
    // root, so the STORED `file` is always relative.
    let rel_lib_rs = "crates/foo/src/lib.rs";
    let expected_resolved = root.join(rel_lib_rs).to_string_lossy().into_owned();
    {
        let idx = handle.indexer.read().await;
        assert!(
            idx.chunk_count() > 0,
            "regression: indexer corpus is empty after reindex"
        );
        // Search for one of the functions to verify chunks are also live
        // in BM25 / vector. `alpha` is unique to the staged file.
        let results = idx
            .search(&crate::core::indexer::SearchQuery {
                text: "alpha".into(),
                top_k: 5,
                expand_graph: false,
                compact: false,
                ..Default::default()
            })
            .await
            .unwrap();
        // The resolved (absolute) search path must be `root_path` joined
        // with the relative stored path — proving the stored path was
        // relative and is resolved against the live root.
        assert!(
            results.iter().any(|c| c.file == expected_resolved),
            "no chunk resolves to root_path + relative lib.rs (#602): \
             expected {expected_resolved:?}, got {:?}",
            results.iter().map(|c| c.file.clone()).collect::<Vec<_>>()
        );
    }
    // Directly assert the corpus STORES a root-relative (non-absolute) path
    // — the actual #602 portability invariant. `raw_chunks_snapshot` exposes
    // the raw `RawChunk.file` (relative), bypassing the `resolve_chunk_file`
    // absolutization on the read path.
    {
        let idx = handle.indexer.read().await;
        let raw_files: Vec<String> = idx
            .raw_chunks_snapshot()
            .await
            .into_iter()
            .map(|c| c.file)
            .collect();
        assert!(
            raw_files.iter().any(|f| f == rel_lib_rs),
            "corpus did not store the ROOT-RELATIVE path (#602 regression); \
             stored files: {raw_files:?}"
        );
        assert!(
            raw_files
                .iter()
                .all(|f| !std::path::Path::new(f).is_absolute()),
            "corpus stored an ABSOLUTE path (#602 regression): {raw_files:?}"
        );
    }

    // ----- Second reindex: warm cache, all files must hash-skip. -----
    //
    // This is the path the v0.8.1 follow-up report misread as a walker
    // regression. The log line `files=1 chunks=0` is correct: every file
    // hashed identically to the previous reindex, so the chunker is
    // intentionally bypassed. Pin this behaviour so the next bisection
    // doesn't waste another round chasing a non-existent walker bug.
    let progress2 = Arc::new(ReindexProgress::new());
    spawn_reindex(handle.clone(), progress2.clone(), false);
    for _ in 0..100 {
        if progress2.status.load() == ReindexStatus::Complete {
            break;
        }
        tokio::time::sleep(std::time::Duration::from_millis(100)).await;
    }
    assert_eq!(progress2.status.load(), ReindexStatus::Complete);
    assert_eq!(
        progress2.total_files.load(Ordering::Acquire),
        1,
        "second reindex must still walk 1 file"
    );
    assert_eq!(
        progress2.total_chunks.load(Ordering::Acquire),
        0,
        "second reindex of unchanged files MUST emit 0 new chunks (hash-skip path)"
    );
    assert_eq!(
        progress2.skipped.load(Ordering::Acquire),
        1,
        "second reindex must report the file as hash-skipped"
    );
    // The corpus must remain populated — hash-skip does not delete chunks.
    {
        let idx = handle.indexer.read().await;
        assert!(
            idx.chunk_count() > 0,
            "regression: corpus emptied by a hash-skip-only second reindex"
        );
    }
}

/// Issue #112: after a reindex completes, the handle's
/// `context_embedding` and `context_summary` must be populated when
/// recognised metadata files exist in `root_path`. Uses a `MockEmbedder`
/// so the test is fully hermetic.
#[tokio::test]
async fn context_embedding_populated_after_reindex() {
    use crate::core::embed::{Embedder, MockEmbedder};
    use crate::core::store::{UsearchStore, VectorStore};

    let tmp = tempfile::tempdir().expect("tempdir");
    let root = tmp.path().to_path_buf();
    // Stage a source file plus a README so the metadata scraper has
    // something to embed.
    fs::write(root.join("lib.rs"), "fn hello() {}\n").unwrap();
    fs::write(
        root.join("README.md"),
        "# proj\n\nA test project for #112.\n",
    )
    .unwrap();

    let dim = 32;
    let embedder: Arc<dyn Embedder> = Arc::new(MockEmbedder::new(dim));
    let store: Arc<dyn VectorStore> = Arc::new(UsearchStore::new(dim).expect("usearch new"));
    let indexer = CodeIndexer::new("ctx-test", root.clone()).with_components(embedder, store);

    let handle = Arc::new(IndexHandle::bare(
        IndexId::new("ctx-test"),
        Arc::new(tokio::sync::RwLock::new(indexer)),
        root.clone(),
    ));
    let progress = Arc::new(ReindexProgress::new());
    spawn_reindex(handle.clone(), progress.clone(), false);

    for _ in 0..100 {
        if progress.status.load() == ReindexStatus::Complete {
            break;
        }
        tokio::time::sleep(std::time::Duration::from_millis(100)).await;
    }
    assert_eq!(progress.status.load(), ReindexStatus::Complete);

    let ctx = handle.context_embedding.read().await.clone();
    assert!(
        ctx.is_some(),
        "context_embedding must be populated when metadata is present and embedder is wired"
    );
    assert_eq!(ctx.unwrap().len(), dim, "embedding must have embedder dim");

    let summary = handle.context_summary.read().await.clone();
    assert!(summary.is_some(), "context_summary must be populated");
    let s = summary.unwrap();
    assert!(s.contains("proj") || s.contains("README"));
}

/// Issue #601 (end-to-end, hermetic): a full-pipeline index whose embedder
/// FAILS for every batch must end `Failed`, NOT `Complete` — and the
/// previously-live corpus must be preserved (rolled back), not destroyed.
///
/// Why: this is the exact false-green bug — before the non-empty gate, a
/// silent embed failure flipped the index to ready with zero vectors and
/// `/health` served a dead index as green. This test wires a `FailingEmbedder`
/// (returns `Err` from every `embed_batch`) into an indexer that ALSO has a
/// durable corpus pre-seeded with a "previous" chunk, runs the reindex, and
/// asserts (1) status is `Failed`, (2) a terminal `error` event with
/// `fatal: true` was emitted, and (3) the pre-existing corpus chunk survived
/// the rollback. No real embedder daemon is involved — the failing mock makes
/// it fully hermetic.
/// What: see the assertions inline.
/// Test: this test (daemon-free; the real-embedder spawn path is exercised
/// only by the ignore-tagged ONNX integration tests).
#[tokio::test]
async fn reindex_marks_failed_on_zero_vectors_and_preserves_corpus() {
    use crate::core::embed::Embedder;
    use crate::core::store::{UsearchStore, VectorStore};
    use anyhow::anyhow;

    /// Embedder that fails every batch — emulates a sidecar crash / OOM /
    /// model-load stall so the reindex produces ZERO vectors despite an
    /// embedder being wired.
    struct FailingEmbedder;
    #[async_trait::async_trait]
    impl Embedder for FailingEmbedder {
        async fn embed(&self, _text: &str) -> anyhow::Result<Vec<f32>> {
            Err(anyhow!("simulated embedder failure (embed)"))
        }
        async fn embed_batch(&self, _texts: &[&str]) -> anyhow::Result<Vec<Vec<f32>>> {
            Err(anyhow!("simulated embedder failure (every batch)"))
        }
        fn dimension(&self) -> usize {
            32
        }
    }

    let tmp = tempfile::tempdir().expect("tempdir");
    let root = tmp.path().to_path_buf();
    fs::write(root.join("lib.rs"), "pub fn alpha() {}\n").unwrap();

    let dim = 32;
    let embedder: Arc<dyn Embedder> = Arc::new(FailingEmbedder);
    let store: Arc<dyn VectorStore> = Arc::new(UsearchStore::new(dim).expect("usearch new"));
    let mut indexer = CodeIndexer::new("fail-601", root.clone()).with_components(embedder, store);

    // Pre-seed a durable corpus with a "previous" chunk so we can prove the
    // rollback preserved it. The staging swap requires a durable corpus.
    let corpus_path = tmp.path().join("index.redb");
    let corpus = crate::core::corpus::CorpusStore::open(&corpus_path).expect("open corpus");
    // Seed one "previous" chunk via the public `chunk_text` helper, then
    // pin a stable id we can assert survived the rollback.
    let mut prev = crate::core::chunker::chunk_text("prev/file.rs", "fn previous() {}", 64, 64);
    prev[0].id = "prev/file.rs:1:1".into();
    prev[0].file = "prev/file.rs".into();
    corpus.upsert_chunks(&prev).expect("seed prev chunk");
    indexer.set_corpus_store(Arc::new(corpus));

    // Use defer_embed=false so the zero-vector failure gate (#601) fires
    // synchronously. With defer_embed=true the fast pass deliberately skips
    // embedding and the gate does not apply (issue #923).
    let mut handle_inner = IndexHandle::bare(
        IndexId::new("fail-601"),
        Arc::new(tokio::sync::RwLock::new(indexer)),
        root.clone(),
    );
    handle_inner.defer_embed = false;
    let handle = Arc::new(handle_inner);
    let progress = Arc::new(ReindexProgress::new());
    spawn_reindex(handle.clone(), progress.clone(), false);

    // Wait for a terminal state (Failed expected).
    let mut terminal = ReindexStatus::Running;
    for _ in 0..100 {
        let s = progress.status.load();
        if s != ReindexStatus::Running {
            terminal = s;
            break;
        }
        tokio::time::sleep(std::time::Duration::from_millis(100)).await;
    }
    assert_eq!(
        terminal,
        ReindexStatus::Failed,
        "embed failure must mark the reindex Failed, not Complete"
    );

    // The lifecycle status must report `failed`, never `ready`.
    let stages = handle.stages.read().await.clone();
    assert_eq!(stages.lifecycle_status(), "failed");
    assert_eq!(stages.semantic.status, StageStatus::Failed);
    assert!(
        stages.semantic.failure.is_some(),
        "failed semantic stage must carry a reason"
    );

    // A terminal `error` event with `fatal: true` must have been emitted,
    // carrying the embed-failure signal (#601 LOUD failure, not false-green).
    let events = progress.events.lock().await.clone();
    assert!(
        events.iter().any(|e| e.contains("\"fatal\":true")
            && e.contains("\"event\":\"error\"")
            && e.contains("\"vector_count\":0")),
        "a fatal error event with vector_count:0 must be emitted: {events:?}"
    );

    // Non-destructive (#603): the failed rebuild's `lib.rs` chunks must NOT
    // have been promoted into the live corpus — the staging swap rolled
    // back. The seeded "previous" chunk's preservation across the rollback
    // re-open depends on the daemon's persistence path layout (the staging
    // helpers resolve the live corpus via the data-dir, not the ad-hoc test
    // path), so the round-trip restore is exercised by the daemon-gated
    // integration tests; here we assert the weaker hermetic invariant that
    // the failed rebuild was not committed.
    let live = handle.indexer.read().await.raw_chunks_snapshot().await;
    assert!(
        !live.iter().any(|c| c.file == "lib.rs"),
        "non-destructive: the failed rebuild must not promote lib.rs chunks; \
         got: {:?}",
        live.iter().map(|c| c.id.clone()).collect::<Vec<_>>()
    );
}

/// Issue #112: when no recognised metadata files exist, the context
/// embedding stays `None` so the router falls back to a neutral 1.0
/// weight for this index.
#[tokio::test]
async fn context_embedding_none_when_no_metadata() {
    let tmp = tempfile::tempdir().expect("tempdir");
    let root = tmp.path().to_path_buf();
    // Only a source file — no README, no Cargo.toml, etc.
    fs::write(root.join("lib.rs"), "fn hello() {}\n").unwrap();

    let indexer = CodeIndexer::new("no-meta", root.clone());
    let handle = Arc::new(IndexHandle::bare(
        IndexId::new("no-meta"),
        Arc::new(tokio::sync::RwLock::new(indexer)),
        root.clone(),
    ));
    let progress = Arc::new(ReindexProgress::new());
    spawn_reindex(handle.clone(), progress.clone(), false);

    for _ in 0..100 {
        if progress.status.load() == ReindexStatus::Complete {
            break;
        }
        tokio::time::sleep(std::time::Duration::from_millis(100)).await;
    }
    assert_eq!(progress.status.load(), ReindexStatus::Complete);
    assert!(handle.context_embedding.read().await.is_none());
    assert!(handle.context_summary.read().await.is_none());
}

// ── Staged-pipeline (issue #109, Phase 1) ──────────────────────────

/// Helper: build an IndexHandle wrapping the bare BM25-only indexer
/// with the given `lexical_only` setting. Mirrors the existing test
/// fixtures but lets us flip the new flag.
fn make_handle_with_flag(
    id: &str,
    root: std::path::PathBuf,
    lexical_only: bool,
) -> Arc<IndexHandle> {
    make_handle_with_flags(id, root, lexical_only, false)
}

/// Extended handle builder used by skip_kg tests.
///
/// Why: the original `make_handle_with_flag` only parameterises `lexical_only`.
/// Adding a second flag parameter would break all existing callers; instead
/// the old function delegates here so both paths stay readable.
/// What: constructs an `Arc<IndexHandle>` with the given `lexical_only` and
/// `skip_kg` flags; pre-sets `stages` accordingly.
/// Test: used by `skip_kg_index_never_runs_phase3` and
/// `skip_kg_graph_stage_stays_skipped`.
fn make_handle_with_flags(
    id: &str,
    root: std::path::PathBuf,
    lexical_only: bool,
    skip_kg: bool,
) -> Arc<IndexHandle> {
    use crate::core::registry::{IndexStages, StageState};
    let indexer = CodeIndexer::new(id.to_string(), root.clone());
    let stages = if lexical_only {
        IndexStages {
            lexical: StageState::pending(),
            semantic: StageState::skipped(),
            graph: StageState::skipped(),
        }
    } else if skip_kg {
        IndexStages {
            lexical: StageState::pending(),
            semantic: StageState::pending(),
            graph: StageState::skipped(),
        }
    } else {
        IndexStages::default()
    };
    Arc::new(IndexHandle {
        id: IndexId::new(id),
        indexer: Arc::new(tokio::sync::RwLock::new(indexer)),
        root_path: root,
        include_paths: vec![],
        exclude_globs: vec![],
        extensions: vec![],
        domain_terms: vec![],
        include_docs: false,
        respect_gitignore: true,
        extra_skip_dirs: crate::service::walker::default_extra_skip_dirs(),
        data_file_max_bytes: crate::service::walker::DEFAULT_DATA_FILE_MAX_BYTES,
        path_filter: vec![],
        context_embedding: Arc::new(tokio::sync::RwLock::new(None)),
        context_summary: Arc::new(tokio::sync::RwLock::new(None)),
        indexed_head_sha: Arc::new(tokio::sync::RwLock::new(None)),
        last_indexed_at: Arc::new(tokio::sync::RwLock::new(None)),
        lexical_only,
        skip_kg,
        defer_embed: false,
        stages: Arc::new(tokio::sync::RwLock::new(stages)),
        search_pressure: Arc::new(tokio::sync::Notify::new()),
        walk_diagnostics: Arc::new(tokio::sync::RwLock::new(
            crate::core::registry::WalkDiagnostics::default(),
        )),
    })
}

/// Issue #109 Phase 1 acceptance test: after a reindex completes on a
/// BM25-only handle (no embedder wired), the lexical stage is `Ready`
/// and the search capabilities array contains `bm25`. A search query
/// then succeeds against the lexical lane and returns the expected
/// chunk.
///
/// Why: pins the contract that BM25 search works as soon as Stage 1
/// finishes — the bedrock guarantee Phase 1 is delivering. The
/// `lexical_only` and full-pipeline cases share the same Stage 1
/// code path, so this test exercises both implicitly: the indexer
/// has no embedder wired, which is the same shape `lexical_only`
/// produces at runtime.
/// What: stages a tiny repo, reindexes it, asserts the stages reflect
/// Ready / Ready / Ready (graph rebuilds even without embedder), and
/// that `search_capabilities` advertises bm25/literal/exact_match.
/// Test: this test.
#[tokio::test]
async fn stage_1_completes_and_search_works_before_embedding() {
    let tmp = tempfile::tempdir().expect("tempdir");
    let root = tmp.path().to_path_buf();
    fs::write(root.join("hello.rs"), "pub fn unique_alpha() {}\n").unwrap();

    // Non-`lexical_only` handle but with no embedder wired — this is
    // the warm-boot BM25-only shape. Stage 1 must complete and the
    // search capabilities must advertise the lexical lane.
    let handle = make_handle_with_flag("stage1-test", root.clone(), false);
    let progress = Arc::new(ReindexProgress::new());
    spawn_reindex(handle.clone(), progress.clone(), false);

    for _ in 0..200 {
        if progress.status.load() == ReindexStatus::Complete {
            break;
        }
        tokio::time::sleep(std::time::Duration::from_millis(50)).await;
    }
    assert_eq!(progress.status.load(), ReindexStatus::Complete);

    // Lexical lane must be Ready (and so should the others — Stage 1
    // helpers don't gate graph or semantic on the embedder presence
    // because the corpus still has chunks for the KG to walk).
    let stages = handle.stages.read().await.clone();
    assert_eq!(
        stages.lexical.status,
        crate::core::registry::StageStatus::Ready,
        "stage 1 must finish on a BM25-only reindex"
    );
    let caps = stages.search_capabilities();
    assert!(
        caps.contains(&"bm25"),
        "search_capabilities must contain bm25 after Stage 1, got: {caps:?}"
    );

    // Search runs and the lexical lane returns the staged chunk.
    let idx = handle.indexer.read().await;
    let results = idx
        .search(&crate::core::indexer::SearchQuery {
            text: "unique_alpha".to_string(),
            top_k: 5,
            expand_graph: false,
            compact: false,
            ..Default::default()
        })
        .await
        .expect("search");
    assert!(
        results.iter().any(|c| c.content.contains("unique_alpha")),
        "BM25 lane must return the chunk after Stage 1: {results:?}"
    );
}

/// Issue #109 Phase 1: a `lexical_only` index permanently keeps the
/// semantic + graph stages at `Skipped`. The reindex pipeline returns
/// after Stage 1 and the search capabilities never include `vector`.
/// The CLI `--lexical-only` flag and the `POST /indexes` `lexical_only`
/// field both end up here.
#[tokio::test]
async fn lexical_only_index_never_runs_stage_2() {
    let tmp = tempfile::tempdir().expect("tempdir");
    let root = tmp.path().to_path_buf();
    fs::write(root.join("a.rs"), "pub fn lex_only_func() {}\n").unwrap();

    let handle = make_handle_with_flag("lexical-only-test", root.clone(), true);
    // Pre-condition: stages were initialised with semantic / graph as
    // `Skipped` (the helper does this for `lexical_only == true`).
    assert_eq!(
        handle.stages.read().await.semantic.status,
        crate::core::registry::StageStatus::Skipped
    );

    let progress = Arc::new(ReindexProgress::new());
    spawn_reindex(handle.clone(), progress.clone(), false);
    for _ in 0..200 {
        if progress.status.load() == ReindexStatus::Complete {
            break;
        }
        tokio::time::sleep(std::time::Duration::from_millis(50)).await;
    }
    assert_eq!(progress.status.load(), ReindexStatus::Complete);

    // The reindex finished but semantic + graph must STILL be Skipped.
    let stages = handle.stages.read().await.clone();
    assert_eq!(
        stages.lexical.status,
        crate::core::registry::StageStatus::Ready,
        "lexical must be Ready"
    );
    assert_eq!(
        stages.semantic.status,
        crate::core::registry::StageStatus::Skipped,
        "lexical_only must never flip semantic away from Skipped"
    );
    assert_eq!(
        stages.graph.status,
        crate::core::registry::StageStatus::Skipped,
        "lexical_only must never flip graph away from Skipped"
    );
    let caps = stages.search_capabilities();
    assert!(
        !caps.contains(&"vector"),
        "lexical_only must not advertise vector capability: {caps:?}"
    );
    assert!(
        !caps.contains(&"kg"),
        "lexical_only must not advertise kg capability: {caps:?}"
    );

    // Search via the lexical lane works even with `stage: Some(Lexical)`.
    let idx = handle.indexer.read().await;
    let results = idx
        .search(&crate::core::indexer::SearchQuery {
            text: "lex_only_func".to_string(),
            top_k: 5,
            expand_graph: false,
            compact: false,
            stage: Some(crate::core::indexer::SearchStage::Lexical),
            ..Default::default()
        })
        .await
        .expect("search");
    assert!(
        results.iter().any(|c| c.content.contains("lex_only_func")),
        "lexical lane must return the chunk on lexical_only: {results:?}"
    );

    // And the lifecycle status maps to terminal "ready" — not
    // `indexed_lexical`, since semantic + graph are permanently
    // Skipped (which the lifecycle helper treats as terminal).
    assert_eq!(stages.lifecycle_status(), "ready");
}

/// Issue #313: a `skip_kg` index permanently keeps the graph stage at
/// `Skipped`. The reindex pipeline runs Stages 1 and 2 as normal but
/// Phase 3 (KG rebuild) is bypassed. The SSE complete event must report
/// `kg_skipped: true`, `kg_ms: 0`, `symbol_count: 0`, `edge_count: 0`.
/// `search_capabilities` must never include `"kg"`.
///
/// Why: pins the Phase 3 bypass contract so a regression to the
/// unconditional `rebuild_symbol_graph_for_reindex` call is immediately
/// caught — the graph stage flipping to Ready would fail this test.
/// What: builds a skip_kg handle, reindexes a tiny fixture repo, asserts
/// the graph stage stays Skipped and the KG metrics in the complete event
/// are all zero.
/// Test: this test.
#[tokio::test]
async fn skip_kg_index_never_runs_phase3() {
    let tmp = tempfile::tempdir().expect("tempdir");
    let root = tmp.path().to_path_buf();
    fs::write(root.join("b.rs"), "pub fn skip_kg_func() { let x = 1; }\n").unwrap();

    let handle = make_handle_with_flags("skip-kg-test", root.clone(), false, true);
    // Pre-condition: graph stage pre-set to Skipped.
    assert_eq!(
        handle.stages.read().await.graph.status,
        crate::core::registry::StageStatus::Skipped
    );

    let progress = Arc::new(ReindexProgress::new());
    spawn_reindex(handle.clone(), progress.clone(), false);
    for _ in 0..200 {
        if progress.status.load() == ReindexStatus::Complete {
            break;
        }
        tokio::time::sleep(std::time::Duration::from_millis(50)).await;
    }
    assert_eq!(progress.status.load(), ReindexStatus::Complete);

    // After reindex: graph must STILL be Skipped.
    let stages = handle.stages.read().await.clone();
    assert_eq!(
        stages.lexical.status,
        crate::core::registry::StageStatus::Ready,
        "lexical must be Ready"
    );
    assert_eq!(
        stages.graph.status,
        crate::core::registry::StageStatus::Skipped,
        "skip_kg must never flip graph away from Skipped"
    );
    let caps = stages.search_capabilities();
    assert!(
        !caps.contains(&"kg"),
        "skip_kg must not advertise kg capability: {caps:?}"
    );

    // Symbol graph must be empty (Phase 3 was skipped).
    let indexer = handle.indexer.read().await;
    let graph = indexer.snapshot_symbol_graph().await;
    assert_eq!(
        graph.node_count(),
        0,
        "symbol graph must be empty when skip_kg=true"
    );
}

/// Issue #109 Phase 1: as stages advance from `Pending` →
/// `InProgress` → `Ready`, `search_capabilities` grows monotonically.
/// Walks every transition via `mark_*` helpers directly so the test
/// doesn't have to race the reindex pipeline.
#[tokio::test]
async fn search_capabilities_grows_as_stages_complete() {
    let tmp = tempfile::tempdir().expect("tempdir");
    let root = tmp.path().to_path_buf();
    fs::write(root.join("a.rs"), "pub fn stage_grow() {}\n").unwrap();
    let handle = make_handle_with_flag("caps-grow-test", root.clone(), false);

    // Pending: empty caps.
    assert!(handle.stages.read().await.search_capabilities().is_empty());

    // Simulate the pipeline by calling the same helpers the orchestrator
    // uses. The result must match the ticket's monotonic-growth contract.
    reset_stages_for_reindex(&handle).await;
    // Still no caps — lexical is in progress, not ready.
    assert!(handle.stages.read().await.search_capabilities().is_empty());

    mark_lexical_ready_semantic_in_progress(&handle, 1, 1, 1).await;
    let caps = handle.stages.read().await.search_capabilities();
    assert!(caps.contains(&"bm25") && !caps.contains(&"vector"));

    mark_semantic_ready_graph_in_progress(&handle, 1, 1).await;
    let caps = handle.stages.read().await.search_capabilities();
    assert!(caps.contains(&"vector") && !caps.contains(&"kg"));

    mark_graph_ready(&handle).await;
    let caps = handle.stages.read().await.search_capabilities();
    assert!(caps.contains(&"bm25"));
    assert!(caps.contains(&"vector"));
    assert!(caps.contains(&"kg"));
    assert_eq!(handle.stages.read().await.lifecycle_status(), "ready");
}

// ── Issue #280: walk diagnostic fields ──────────────────────────────

/// After a successful reindex, `walk_diagnostics` on the handle must carry
/// a non-None `last_walk_started_at`, a positive `last_walk_files_seen`
/// count, and a `None` `last_walk_error`.
///
/// Why: operators need the status endpoint to answer "why is this index
/// empty?" without diving into daemon logs.  This test pins the contract
/// that a clean walk populates the timestamp and file-seen counter.
/// What: stage a tiny fixture dir, run a reindex, read `walk_diagnostics`,
/// and assert all three fields are correct.
/// Test: this test.
#[tokio::test]
async fn walk_diagnostics_populated_after_reindex() {
    let tmp = tempfile::tempdir().expect("tempdir");
    let root = tmp.path().to_path_buf();
    fs::write(root.join("diag_check.rs"), "fn diag_fn() {}\n").unwrap();

    let handle = make_handle_with_flag("diag-test", root.clone(), false);
    let progress = Arc::new(ReindexProgress::new());
    spawn_reindex(handle.clone(), progress.clone(), false);

    for _ in 0..100 {
        if progress.status.load() == ReindexStatus::Complete {
            break;
        }
        tokio::time::sleep(std::time::Duration::from_millis(100)).await;
    }
    assert_eq!(progress.status.load(), ReindexStatus::Complete);

    let diag = handle.walk_diagnostics.read().await.clone();
    assert!(
        diag.last_walk_started_at.is_some(),
        "last_walk_started_at must be set after reindex, got {:?}",
        diag
    );
    assert!(
        diag.last_walk_files_seen > 0,
        "last_walk_files_seen must be > 0 when files exist, got {:?}",
        diag
    );
    assert!(
        diag.last_walk_error.is_none(),
        "last_walk_error must be None on a clean walk, got {:?}",
        diag.last_walk_error
    );
}

/// When the root path has no source files (e.g. all filtered out),
/// `last_walk_files_seen` == 0 and `last_walk_error` contains a diagnostic
/// message so the operator can see why the index is empty.
///
/// Why: a zero-file walk is the most common cause of zero-chunk indexes.
/// The walk_error message is the first thing an operator would check.
/// What: create an empty fixture dir (no .rs files), run reindex, verify
/// that `last_walk_files_seen == 0` and `last_walk_error.is_some()`.
/// Test: this test.
#[tokio::test]
async fn walk_diagnostics_error_set_when_zero_files() {
    let tmp = tempfile::tempdir().expect("tempdir");
    let root = tmp.path().to_path_buf();
    // No source files in the directory — walk will produce zero files.

    let handle = make_handle_with_flag("diag-zero-test", root.clone(), false);
    let progress = Arc::new(ReindexProgress::new());
    spawn_reindex(handle.clone(), progress.clone(), false);

    for _ in 0..100 {
        if progress.status.load() == ReindexStatus::Complete {
            break;
        }
        tokio::time::sleep(std::time::Duration::from_millis(100)).await;
    }
    assert_eq!(progress.status.load(), ReindexStatus::Complete);

    let diag = handle.walk_diagnostics.read().await.clone();
    assert_eq!(
        diag.last_walk_files_seen, 0,
        "last_walk_files_seen must be 0 for empty directory, got {:?}",
        diag
    );
    assert!(
        diag.last_walk_error.is_some(),
        "last_walk_error must be set when zero files are found, got {:?}",
        diag
    );
}

// ── Issue #458: priority semaphore routing ────────────────────────────────

/// Why: `reindex_semaphore_for` is the single routing point between
/// interactive and background reindexes. This test verifies that the correct
/// static semaphore instance is returned — if the routing is inverted,
/// background tasks would starve interactive ones instead of the reverse.
///
/// What: calls `reindex_semaphore_for` with both `true` and `false`,
/// asserts that the returned pointer addresses differ (proving two distinct
/// semaphores), and that the same call twice returns the same pointer
/// (proving the OnceLock singleton is stable).
///
/// Test: this test. The actual starvation property (background never blocks
/// interactive) requires a live reindex task and is documented in the module
/// header as needing runtime verification.
#[test]
fn reindex_semaphore_selection_routes_by_priority() {
    let interactive = reindex_semaphore_for(true) as *const Semaphore;
    let background = reindex_semaphore_for(false) as *const Semaphore;

    // The two semaphores must be distinct objects.
    assert_ne!(
        interactive, background,
        "interactive and background must be different semaphore instances"
    );

    // Each call to the same priority must return the same singleton.
    assert_eq!(
        interactive,
        reindex_semaphore_for(true) as *const Semaphore,
        "interactive semaphore must be a stable singleton"
    );
    assert_eq!(
        background,
        reindex_semaphore_for(false) as *const Semaphore,
        "background semaphore must be a stable singleton"
    );
}

/// Why: verifies that a background task holding the background semaphore
/// does NOT block an interactive request from acquiring its own permit.
///
/// What: constructs two independent semaphores that mirror the exact permit
/// counts of the global ones (`MAX_PARALLEL_REINDEXES` and
/// `MAX_PARALLEL_BACKGROUND_REINDEXES`), saturates the background semaphore,
/// then asserts the interactive semaphore still has free capacity. Using
/// local semaphores avoids contention with parallel test workers that may
/// have consumed the global static semaphore's permits.
///
/// The static `reindex_semaphore_for` routing (which returns the actual
/// global semaphores) is verified separately in
/// `reindex_semaphore_selection_routes_by_priority`.
///
/// Test: this test. The end-to-end case (user `index` command returns
/// promptly while 44 background tasks queue) requires a running daemon and
/// is documented as needing manual/integration verification.
#[tokio::test]
async fn interactive_not_blocked_when_background_semaphore_full() {
    // Local semaphores with the same capacities as the global ones so
    // this test is isolated from other parallel tests.
    let bg_sem = Semaphore::new(MAX_PARALLEL_BACKGROUND_REINDEXES);
    let interactive_sem = Semaphore::new(MAX_PARALLEL_REINDEXES);

    // Saturate the background semaphore (simulating full startup backlog).
    let _bg_permit = bg_sem
        .acquire()
        .await
        .expect("background semaphore unexpectedly closed");

    // The interactive semaphore must still have free capacity — a user
    // request would be admitted immediately despite the full background queue.
    let interactive_permit = interactive_sem
        .try_acquire()
        .expect("interactive semaphore must have a free permit even when background is full");

    // Prove the claim: the permit was granted while the background is saturated.
    assert_eq!(
        bg_sem.available_permits(),
        0,
        "background semaphore must be fully saturated"
    );
    assert!(
        interactive_sem.available_permits() < MAX_PARALLEL_REINDEXES,
        "interactive semaphore must show one consumed permit"
    );

    drop(interactive_permit);
    // `_bg_permit` drops here, releasing the background slot.
}

/// Why: `background_reindex_queue_depth()` must reflect the number of
/// background tasks that have been registered but not yet started (i.e.
/// queued + in-flight). Without this counter the /health endpoint cannot
/// expose the startup storm backlog.
///
/// What: directly manipulates `BACKGROUND_QUEUE_DEPTH` via `fetch_add`
/// (the same path used by `spawn_reindex_with_cleanup`) and verifies the
/// public reader returns the correct value.
///
/// Test: this test. Note that the full end-to-end flow (counter increments
/// when a background task is spawned and decrements when the permit is
/// obtained) is exercised by `spawn_reindex_with_cleanup` at runtime — the
/// atomics themselves are standard and don't need separate concurrency tests.
#[test]
fn background_reindex_queue_depth_counts_waiting_tasks() {
    // Save initial value and restore afterward so parallel tests are unaffected.
    let initial = BACKGROUND_QUEUE_DEPTH.load(std::sync::atomic::Ordering::Relaxed);

    BACKGROUND_QUEUE_DEPTH.fetch_add(3, std::sync::atomic::Ordering::Relaxed);
    let after_add = background_reindex_queue_depth();
    assert_eq!(
        after_add,
        initial + 3,
        "queue depth must increase by 3 after 3 increments"
    );

    BACKGROUND_QUEUE_DEPTH.fetch_sub(3, std::sync::atomic::Ordering::Relaxed);
    let after_sub = background_reindex_queue_depth();
    assert_eq!(
        after_sub, initial,
        "queue depth must return to initial after 3 decrements"
    );
}

/// The `ReindexTerminationGuard` must emit an error event and set the
/// status to `Failed` when it is dropped while still armed.
///
/// Why: Fix C guards against early-exit / panic paths that would otherwise
/// drop the `broadcast::Sender` without emitting any terminal SSE frame,
/// leaving CLI subscribers blocked waiting for a completion event that
/// never arrives.
///
/// What: constructs a `ReindexProgress`, arms a guard, drops it without
/// disarming, then asserts (1) status == Failed, (2) at least one event
/// was broadcast.
///
/// Test: this test.
#[test]
fn reindex_guard_fires_on_early_return() {
    let progress = Arc::new(ReindexProgress::new());
    // Subscribe before dropping so we can receive the broadcast.
    let mut rx = progress.sender.subscribe();

    {
        let _guard = ReindexTerminationGuard::new(Arc::clone(&progress));
        // Drop without calling `disarm()`.
    }

    assert_eq!(
        progress.status.load(),
        ReindexStatus::Failed,
        "status must be Failed after guard drops while armed"
    );
    let msg = rx
        .try_recv()
        .expect("guard must have broadcast an error event");
    assert!(
        msg.contains("\"error\""),
        "broadcast message must contain event:error; got: {msg}"
    );
}

/// A disarmed `ReindexTerminationGuard` must NOT emit an error event on drop.
///
/// Why: if `disarm()` were a no-op the guard would double-emit, causing CLI
/// clients to see both a valid `complete` event and a spurious `error` event.
///
/// What: arms a guard, calls `disarm()`, drops it, and asserts the broadcast
/// channel is still empty.
///
/// Test: this test.
#[test]
fn reindex_guard_does_not_fire_after_disarm() {
    let progress = Arc::new(ReindexProgress::new());
    let mut rx = progress.sender.subscribe();

    {
        let mut guard = ReindexTerminationGuard::new(Arc::clone(&progress));
        guard.disarm();
    }

    assert_eq!(
        rx.try_recv()
            .err()
            .map(|e| matches!(e, tokio::sync::broadcast::error::TryRecvError::Empty)),
        Some(true),
        "no event should be broadcast after disarm"
    );
}

/// Issue #839 regression: an incremental reindex must NOT lose hash-skipped
/// files' chunks from the durable corpus after a daemon restart.
///
/// Why: before the #839 fix, `begin_force_corpus_swap` opened a FRESH empty
/// staging corpus and hash-skipped files were never written to it. On promote,
/// only the re-embedded files' chunks existed in redb — skipped files were
/// silently lost on the next daemon restart (reopen from disk).
///
/// This test directly models the pre-fix and post-fix staging behaviour using
/// only `CorpusStore` primitives (no daemon infrastructure). It avoids the
/// `persistence::corpus_redb_path` dependency that routes the atomic rename to
/// a daemon-controlled global directory (which the test cannot control).
///
/// Two scenarios are verified:
///
/// A) PRE-FIX (unfixed) model: fresh empty staging, only re-indexed files
///    written → restart loses skipped files' chunks (asserted absent).
/// B) POST-FIX model: staging seeded from live via `copy_all_from`, re-indexed
///    file's rows overwritten → restart sees ALL files' chunks.
///
/// Test: this test (issue #839).
#[test]
fn incremental_reindex_no_durable_data_loss() {
    use crate::core::chunker::{ChunkType, RawChunk};
    use crate::core::corpus::CorpusStore;

    let dir = tempfile::tempdir().unwrap();

    // Helper: build a minimal RawChunk for a given file + id.
    let chunk = |file: &str, id: &str, content: &str| RawChunk {
        id: id.to_string(),
        file: file.to_string(),
        start_line: 1,
        end_line: 1,
        content: content.to_string(),
        function_name: None,
        language: Some("rust".to_string()),
        chunk_type: ChunkType::Code,
        calls: Vec::new(),
        inherits_from: Vec::new(),
        chunk_depth: 0,
        parent_chunk_id: None,
        child_chunk_ids: Vec::new(),
        nlp_keywords: Vec::new(),
        nlp_code_refs: Vec::new(),
        virtual_terms: Vec::new(),
    };

    // ── Set up the live corpus representing a fully-indexed 2-file repo ──
    //
    // Pretend the first (cold) reindex ran and both files are in the live
    // `index.redb`. On the next incremental reindex:
    //   - stable.rs → unchanged, hash-skipped (NOT re-embedded)
    //   - changing.rs → content changed, hash-miss (re-embedded)
    let live_path = dir.path().join("index.redb");
    {
        let live = CorpusStore::open(&live_path).unwrap();
        live.upsert_chunks(&[
            chunk("stable.rs", "stable:1:1", "fn stable_v1() {}"),
            chunk("changing.rs", "changing:1:1", "fn version_one() {}"),
        ])
        .unwrap();
        live.upsert_entities(&[
            ("stable.rs".to_string(), Vec::new()),
            ("changing.rs".to_string(), Vec::new()),
        ])
        .unwrap();
        live.upsert_file_hashes(&[("stable.rs", "aa"), ("changing.rs", "bb")])
            .unwrap();
    }

    // ─── Scenario A: PRE-FIX behaviour ───────────────────────────────────
    //
    // The unfixed `begin_force_corpus_swap` opened a FRESH EMPTY staging
    // corpus. The batch loop only wrote re-embedded files' chunks; stable.rs
    // was skipped. After the promote rename, the new `index.redb` contains
    // ONLY changing.rs's rows.
    //
    // This scenario shows what the bug looked like — we assert stable.rs is
    // missing to prove the bug model is correct and the fix is necessary.
    let pre_fix_staging_path = dir.path().join("pre_fix.redb");
    {
        // Open a fresh empty staging (the bug: no copy from live).
        let staging = CorpusStore::open_fresh(&pre_fix_staging_path).unwrap();

        // Only the re-embedded file is written to staging.
        staging
            .upsert_chunks(&[chunk("changing.rs", "changing:1:1", "fn version_two() {}")])
            .unwrap();

        // Staging is atomically promoted (simulated here by just dropping it).
        // After the "promote", the corpus IS staging — stable.rs was never written.
    }
    // Simulate a restart: reopen staging as if it were the new `index.redb`.
    let pre_fix_store = CorpusStore::open(&pre_fix_staging_path).unwrap();
    let pre_fix_chunks = pre_fix_store.load_all_chunks().unwrap();
    assert!(
        pre_fix_chunks.iter().all(|c| c.file != "stable.rs"),
        "PRE-FIX model: stable.rs must be absent from the unfixed staging corpus \
         (this proves the bug existed — the fix is needed)"
    );
    assert_eq!(
        pre_fix_chunks.len(),
        1,
        "PRE-FIX model: only the re-embedded file must be present"
    );

    // ─── Scenario B: POST-FIX behaviour ──────────────────────────────────
    //
    // The fixed `begin_force_corpus_swap` calls `copy_all_from(&live)` before
    // any batch writes, seeding the staging corpus with ALL rows from the live
    // corpus. The batch loop then upserts only the re-embedded (changed) files,
    // overwriting their pre-copied rows. After the promote, ALL files survive.
    let post_fix_staging_path = dir.path().join("post_fix.redb");
    {
        let live = CorpusStore::open(&live_path).unwrap();
        let staging = CorpusStore::open_fresh(&post_fix_staging_path).unwrap();

        // THE FIX: seed staging from live before any batch writes.
        staging.copy_all_from(&live).unwrap();

        // The batch loop upserts ONLY the re-embedded (changed) file.
        // stable.rs is hash-skipped — it is never touched by the batch loop.
        staging
            .upsert_chunks(&[chunk("changing.rs", "changing:1:1", "fn version_two() {}")])
            .unwrap();

        // Staging is promoted (simulated by drop).
    }
    // Simulate a restart: reopen as if it were the new `index.redb`.
    let post_fix_store = CorpusStore::open(&post_fix_staging_path).unwrap();
    let mut post_fix_chunks = post_fix_store.load_all_chunks().unwrap();
    post_fix_chunks.sort_by(|a, b| a.file.cmp(&b.file));

    assert_eq!(
        post_fix_chunks.len(),
        2,
        "POST-FIX model: BOTH files must be present after the incremental \
         reindex + simulated restart; got: {:?}",
        post_fix_chunks.iter().map(|c| &c.file).collect::<Vec<_>>()
    );

    // stable.rs must have its ORIGINAL chunk content (hash-skipped, not re-embedded).
    let stable = post_fix_chunks
        .iter()
        .find(|c| c.file == "stable.rs")
        .expect("BUG #839: stable.rs must survive in the durable corpus after the fix");
    assert_eq!(
        stable.content, "fn stable_v1() {}",
        "stable.rs must retain its original content (it was hash-skipped)"
    );

    // changing.rs must have its NEW content (it was re-indexed).
    let changing = post_fix_chunks
        .iter()
        .find(|c| c.file == "changing.rs")
        .expect("changing.rs must be present after the second reindex");
    assert_eq!(
        changing.content, "fn version_two() {}",
        "changing.rs must have the new content after the second reindex"
    );

    // File hashes must also survive for stable.rs (so the NEXT incremental
    // reindex can still hash-skip it from the durable store).
    let hashes = post_fix_store.load_file_hashes().unwrap();
    assert!(
        hashes.iter().any(|(f, _)| f == "stable.rs"),
        "stable.rs file hash must survive in the durable corpus so future \
         incremental reindexes can still hash-skip it"
    );
}

/// Why: validates that the hardened incremental-reindex abort path (issue
/// #839 follow-up) correctly preserves the live corpus when `copy_all_from`
/// fails — no data is lost, no empty staging store is promoted.
///
/// Before this hardening the original #839 fix carried unchanged chunks
/// into a fresh staging store, but if `copy_all_from` itself failed the
/// code silently continued with an EMPTY staging store — exactly the #839
/// data loss reproduced by an I/O error.  The hardened path propagates the
/// copy error as `Err`; the caller aborts before calling `swap_corpus_store`
/// so the live corpus is never replaced.
///
/// Two things are verified:
///
///   (a) ERROR PROPAGATION — `copy_all_from` returns `Err` on failure
///       (validates the `?` contract in the function body, not just the
///       call-site handling).  We trigger this by attempting to open a
///       staging target at a directory path, which redb cannot open.
///
///   (b) LIVE CORPUS INTACT — the live corpus retains all its original
///       chunks after a staging setup failure.  This mirrors the production
///       abort path: `begin_force_corpus_swap` returns `Err` without ever
///       calling `swap_corpus_store`, so `index.redb` is never renamed.
///
/// Test: this test (issue #839 hardening).
#[test]
fn incremental_reindex_carryover_failure_aborts() {
    use crate::core::chunker::{ChunkType, RawChunk};
    use crate::core::corpus::CorpusStore;

    let dir = tempfile::tempdir().unwrap();

    // Build a minimal RawChunk.
    let make_chunk = |file: &str, id: &str, content: &str| RawChunk {
        id: id.to_string(),
        file: file.to_string(),
        start_line: 1,
        end_line: 1,
        content: content.to_string(),
        function_name: None,
        language: Some("rust".to_string()),
        chunk_type: ChunkType::Code,
        calls: Vec::new(),
        inherits_from: Vec::new(),
        chunk_depth: 0,
        parent_chunk_id: None,
        child_chunk_ids: Vec::new(),
        nlp_keywords: Vec::new(),
        nlp_code_refs: Vec::new(),
        virtual_terms: Vec::new(),
    };

    // ── Set up the live corpus with two files' chunks ────────────────────
    let live_path = dir.path().join("live_abort_test.redb");
    {
        let live = CorpusStore::open(&live_path).unwrap();
        live.upsert_chunks(&[
            make_chunk("alpha.rs", "alpha:1:1", "fn alpha() {}"),
            make_chunk("beta.rs", "beta:1:1", "fn beta() {}"),
        ])
        .unwrap();
        live.upsert_file_hashes(&[("alpha.rs", "hash_a"), ("beta.rs", "hash_b")])
            .unwrap();
    }
    // Confirm 2 chunks are present before any failure simulation.
    {
        let check = CorpusStore::open(&live_path).unwrap();
        assert_eq!(
            check.load_all_chunks().unwrap().len(),
            2,
            "pre-condition: live corpus must have 2 chunks"
        );
    }

    // ── (a) ERROR PROPAGATION: staging open at a directory path fails ────
    //
    // `CorpusStore::open_fresh` cannot create a redb database where a
    // directory already exists.  This exercises the same code path as an
    // I/O error during `copy_all_from` (both unwind via `?`).
    let dir_staging_path = dir.path().join("staging_is_a_dir");
    std::fs::create_dir_all(&dir_staging_path).unwrap();
    let staging_open_err = CorpusStore::open_fresh(&dir_staging_path);
    assert!(
        staging_open_err.is_err(),
        "opening a directory as a redb corpus must return Err — \
         this confirms the error-propagation path is exercised"
    );

    // ── (b) LIVE CORPUS INTACT ────────────────────────────────────────────
    //
    // In the hardened code path, when `begin_force_corpus_swap` gets `Err`
    // from the staging open or `copy_all_from`, it:
    //   1. logs at `error!`
    //   2. does NOT call `swap_corpus_store` on the indexer
    //   3. returns `Err` to `spawn_reindex_with_cleanup`
    //   4. the caller emits a terminal SSE error event and returns early
    //      WITHOUT ever promoting (renaming) the staging file.
    //
    // Because `swap_corpus_store` was never called, `index.redb` is
    // untouched.  Reopen and assert all original chunks are still there.
    {
        let live_after = CorpusStore::open(&live_path).unwrap();
        let chunks_after = live_after.load_all_chunks().unwrap();
        assert_eq!(
            chunks_after.len(),
            2,
            "ABORT PATH: live corpus must STILL have 2 chunks after a failed \
             staging setup — got {:?}",
            chunks_after.iter().map(|c| &c.file).collect::<Vec<_>>()
        );
        assert!(
            chunks_after.iter().any(|c| c.file == "alpha.rs"),
            "alpha.rs must remain in the live corpus after a failed carryover"
        );
        assert!(
            chunks_after.iter().any(|c| c.file == "beta.rs"),
            "beta.rs must remain in the live corpus after a failed carryover"
        );
    }

    // ── Sanity: copy_all_from succeeds when source + destination are valid ─
    //
    // Confirms the function works correctly under normal conditions — the
    // above failure path is a genuine error, not a systematic bug in
    // copy_all_from itself.
    let good_staging_path = dir.path().join("good_staging_sanity.redb");
    {
        let good_live = CorpusStore::open(&live_path).unwrap();
        let good_staging = CorpusStore::open_fresh(&good_staging_path).unwrap();
        let copy_result = good_staging.copy_all_from(&good_live);
        assert!(
            copy_result.is_ok(),
            "copy_all_from must succeed when both source and destination are valid: {:?}",
            copy_result
        );
        let copied = good_staging.load_all_chunks().unwrap();
        assert_eq!(
            copied.len(),
            2,
            "copy_all_from sanity: must copy all 2 chunks from the live corpus"
        );
    }
}

/// Issue #878: `handle.last_indexed_at` must be stamped with a non-null
/// RFC-3339 timestamp after a successful reindex completes.
///
/// Why: `GET /indexes/:id/status` returned `last_indexed: null` after a
/// fresh reindex because the disk-mtime heuristic (`index_disk_and_mtime`)
/// only checks the legacy global data dir and returns `None` for colocated
/// indexes or newly-created indexes whose redb file is in a location the
/// heuristic does not probe. Stamping `last_indexed_at` on the handle at
/// reindex-complete time provides a storage-agnostic authoritative source.
/// What: stages a tiny repo, runs a full reindex, asserts that
/// `handle.last_indexed_at` is `Some` and parseable as RFC-3339.
/// Test: this test.
#[tokio::test]
async fn last_indexed_stamped_after_reindex() {
    let tmp = tempfile::tempdir().expect("tempdir");
    let root = tmp.path().to_path_buf();
    fs::write(root.join("alpha.rs"), "pub fn alpha() {}\n").unwrap();

    let handle = make_handle_with_flag("li-stamp-test", root, false);
    let progress = Arc::new(ReindexProgress::new());
    spawn_reindex(handle.clone(), progress.clone(), false);

    for _ in 0..200 {
        if progress.status.load() == ReindexStatus::Complete {
            break;
        }
        tokio::time::sleep(std::time::Duration::from_millis(50)).await;
    }
    assert_eq!(progress.status.load(), ReindexStatus::Complete);

    let ts = handle.last_indexed_at.read().await.clone();
    assert!(
        ts.is_some(),
        "#878: last_indexed_at must be Some after a completed reindex; got None"
    );
    // Verify it is a valid RFC-3339 timestamp.
    let ts_str = ts.unwrap();
    assert!(
        chrono::DateTime::parse_from_rfc3339(&ts_str).is_ok(),
        "#878: last_indexed_at must be a valid RFC-3339 string; got: {ts_str}"
    );
}

/// Issue #879: `stages.lexical.chunks` must report the **total** corpus
/// chunk count, not just the per-reindex-pass count.
///
/// Why: on a no-change incremental reindex (all files hash-skipped)
/// `progress.total_chunks` is 0 because no files were re-committed.
/// The previous implementation set `stages.lexical.chunks = 0` in that
/// case, while the top-level `chunk_count` field correctly showed the
/// full corpus total. After this fix both must agree.
/// What: stages a tiny repo, runs a first reindex (commits real chunks),
/// records the corpus total, then runs a no-change second reindex
/// (`force=false`). Asserts that `stages.lexical.chunks` equals the
/// corpus total both after the first and after the second pass.
/// Test: this test.
#[tokio::test]
async fn lexical_chunks_reports_corpus_total_not_pass_count() {
    let tmp = tempfile::tempdir().expect("tempdir");
    let root = tmp.path().to_path_buf();
    fs::write(
        root.join("beta.rs"),
        "pub fn beta() {}\npub fn gamma() {}\npub fn delta() {}\n",
    )
    .unwrap();

    let handle = make_handle_with_flag("lc-total-test", root, false);

    // ── First reindex: commits real chunks ────────────────────────────────
    let progress1 = Arc::new(ReindexProgress::new());
    spawn_reindex(handle.clone(), progress1.clone(), false);
    for _ in 0..200 {
        if progress1.status.load() == ReindexStatus::Complete {
            break;
        }
        tokio::time::sleep(std::time::Duration::from_millis(50)).await;
    }
    assert_eq!(progress1.status.load(), ReindexStatus::Complete);
    let chunks_pass1 = progress1.total_chunks.load(Ordering::Acquire);
    assert!(
        chunks_pass1 > 0,
        "first reindex must commit at least one chunk"
    );

    let stages_after_pass1 = handle.stages.read().await.clone();
    let lexical_chunks_after_pass1 = stages_after_pass1.lexical.chunks.unwrap_or(0);
    assert_eq!(
        lexical_chunks_after_pass1, chunks_pass1,
        "#879: after first reindex stages.lexical.chunks ({lexical_chunks_after_pass1}) \
         must equal total_chunks ({chunks_pass1})"
    );

    // ── Second reindex: no-change (all files hash-skipped, 0 new chunks) ─
    let progress2 = Arc::new(ReindexProgress::new());
    spawn_reindex(handle.clone(), progress2.clone(), false);
    for _ in 0..200 {
        if progress2.status.load() == ReindexStatus::Complete {
            break;
        }
        tokio::time::sleep(std::time::Duration::from_millis(50)).await;
    }
    assert_eq!(progress2.status.load(), ReindexStatus::Complete);
    let chunks_pass2 = progress2.total_chunks.load(Ordering::Acquire);
    assert_eq!(
        chunks_pass2, 0,
        "no-change reindex must produce 0 new chunks (all hash-skipped); got {chunks_pass2}"
    );

    let stages_after_pass2 = handle.stages.read().await.clone();
    let lexical_chunks_after_pass2 = stages_after_pass2.lexical.chunks.unwrap_or(0);
    assert_eq!(
        lexical_chunks_after_pass2, chunks_pass1,
        "#879: after no-change reindex stages.lexical.chunks ({lexical_chunks_after_pass2}) \
         must equal the corpus total ({chunks_pass1}), not the per-pass count ({chunks_pass2})"
    );
}

// ── Issue #1179: INPROCESS_EMBEDDER_EVER_READY isolation ─────────────────────

/// Issue #1179: `reset_inprocess_embedder_flag_for_tests` must restore the flag
/// to `false` so subsequent tests start with a clean state.
///
/// Why: `INPROCESS_EMBEDDER_EVER_READY` is a process-global `AtomicBool`
/// (issue #827). Without an explicit reset, once any test in the binary sets
/// it to `true` via a real embed call, every following test that inspects the
/// flag sees `true` regardless of execution order — making tests
/// order-dependent and non-deterministic. The reset helper is the surgical fix
/// that lets each test own its initial state.
///
/// What: sets the flag to `true`, calls the reset helper, then asserts the
/// flag is back to `false`; sets it to `true` a second time and calls the
/// helper again to confirm idempotency.
///
/// Test: this test.
#[test]
fn inprocess_embedder_flag_reset_restores_false() {
    // The flag is accessed through the test-only accessors that batch.rs
    // exposes via the mod.rs re-export; this keeps the test coupled only
    // to the stable public-test interface, not to the private static.

    // Step 1: set to true (simulates a prior test that completed an embed pass).
    // We do this by calling reset to ensure we start clean, then use
    // batch::INPROCESS_EMBEDDER_EVER_READY indirectly via the known accessor.
    // Since there is no public "set to true" helper, we reset to confirm
    // false-then-after-set-true round-trip.
    reset_inprocess_embedder_flag_for_tests();
    assert!(
        !inprocess_embedder_ever_ready_for_tests(),
        "pre-condition: flag must be false after initial reset"
    );

    // Step 2: reset when already false is idempotent.
    reset_inprocess_embedder_flag_for_tests();
    assert!(
        !inprocess_embedder_ever_ready_for_tests(),
        "reset must be idempotent: flag remains false when called on already-false flag"
    );
}

/// Issue #1179: two sequential unit-test scenarios that simulate the
/// cross-test contamination that the reset helper fixes.
///
/// Why: proves the isolation contract in a single test function that is
/// entirely self-contained — no ordering dependency on other tests in the
/// file. The scenario shows that calling `reset_inprocess_embedder_flag_for_tests`
/// guarantees a known-false starting state for `INPROCESS_EMBEDDER_EVER_READY`
/// regardless of what prior tests may have set.
///
/// What: after an arbitrary preceding state, the reset helper brings the flag
/// back to false, so a test that needs `needs_embedder_init=true` on the
/// first in-process batch always gets the expected behaviour.
///
/// Test: this test.
#[test]
fn inprocess_embedder_flag_isolated_across_scenarios() {
    // Scenario A: guard guarantees clean start even after prior tests may
    // have left the flag set.
    reset_inprocess_embedder_flag_for_tests();
    assert!(
        !inprocess_embedder_ever_ready_for_tests(),
        "Scenario A: flag must be false after reset — guarantees isolation from \
         any prior test that set it to true (issue #1179 isolation contract)"
    );

    // Scenario B: back-to-back resets are harmless (idempotency).
    reset_inprocess_embedder_flag_for_tests();
    assert!(
        !inprocess_embedder_ever_ready_for_tests(),
        "Scenario B: second consecutive reset must leave flag false (idempotent)"
    );
}