obj-db 1.1.2

Embedded document database. Stable file format, full ACID, single-file portability.
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207
1208
1209
1210
1211
1212
1213
1214
1215
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
1229
1230
1231
1232
1233
1234
1235
1236
1237
1238
1239
1240
1241
1242
1243
1244
1245
1246
1247
1248
1249
1250
1251
1252
1253
1254
1255
1256
1257
1258
1259
1260
1261
1262
1263
1264
1265
1266
1267
1268
1269
1270
1271
1272
1273
1274
1275
1276
1277
1278
1279
1280
1281
1282
1283
1284
1285
1286
1287
1288
1289
1290
1291
1292
1293
1294
1295
1296
1297
1298
1299
1300
1301
1302
1303
1304
1305
1306
1307
1308
1309
1310
1311
1312
1313
1314
1315
1316
1317
1318
1319
1320
1321
1322
1323
1324
1325
1326
1327
1328
1329
1330
1331
1332
1333
1334
1335
1336
1337
1338
1339
1340
1341
1342
1343
1344
1345
1346
1347
1348
1349
1350
1351
1352
1353
1354
1355
1356
1357
1358
1359
1360
1361
1362
1363
1364
1365
1366
1367
1368
1369
1370
1371
1372
1373
1374
1375
1376
1377
1378
1379
1380
1381
1382
1383
1384
1385
1386
1387
1388
1389
1390
1391
1392
1393
1394
1395
1396
1397
1398
1399
1400
1401
1402
1403
1404
1405
1406
1407
1408
1409
1410
1411
1412
1413
1414
1415
1416
1417
1418
1419
1420
1421
1422
1423
1424
1425
1426
1427
1428
1429
1430
1431
1432
1433
1434
1435
1436
1437
1438
1439
1440
1441
1442
1443
1444
1445
1446
1447
1448
1449
1450
1451
1452
1453
1454
1455
1456
1457
1458
1459
1460
1461
1462
1463
1464
1465
1466
1467
1468
1469
1470
1471
1472
1473
1474
1475
1476
1477
1478
1479
1480
1481
1482
1483
1484
1485
1486
1487
1488
1489
1490
1491
1492
1493
1494
1495
1496
1497
1498
1499
1500
1501
1502
1503
1504
1505
1506
1507
1508
1509
1510
1511
1512
1513
1514
1515
1516
1517
1518
1519
1520
1521
1522
1523
1524
1525
1526
1527
1528
1529
1530
1531
1532
1533
1534
1535
1536
1537
1538
1539
1540
1541
1542
1543
1544
1545
1546
1547
1548
1549
1550
1551
1552
1553
1554
1555
1556
1557
1558
1559
1560
1561
1562
1563
1564
1565
1566
1567
1568
1569
1570
1571
1572
1573
1574
1575
1576
1577
1578
1579
1580
1581
1582
1583
1584
1585
1586
1587
1588
1589
1590
1591
1592
1593
1594
1595
1596
1597
1598
1599
1600
1601
1602
1603
1604
1605
1606
1607
1608
1609
1610
1611
1612
1613
1614
1615
1616
1617
1618
1619
1620
1621
1622
1623
1624
1625
1626
1627
1628
1629
1630
1631
1632
1633
1634
1635
1636
1637
1638
1639
1640
1641
1642
1643
1644
1645
1646
1647
1648
1649
1650
1651
1652
1653
1654
1655
1656
1657
1658
1659
1660
1661
1662
1663
1664
1665
1666
1667
1668
1669
1670
1671
1672
1673
1674
1675
1676
1677
1678
1679
1680
1681
1682
1683
1684
1685
1686
//! `Db` — public entry point.
//!
//! Wraps `obj_core::TxnEnv` + the catalog into the user-facing API
//! described in [`design.md`](https://github.com/uname-n/obj/blob/master/design.md)
//! § API and pinned in
//! [`docs/format.md`](https://github.com/uname-n/obj/blob/master/docs/format.md)
//! § Db public API contract.

use std::collections::{HashMap, HashSet, VecDeque};
use std::marker::PhantomData;
use std::ops::Bound;
use std::path::Path;
use std::sync::{Arc, Mutex};
use std::time::Duration;

use obj_core::btree::BTree;
use obj_core::pager::page::PageId;
use obj_core::pager::{lock_path_for, Pager};
use obj_core::platform::FileHandle;
use obj_core::{Catalog, CollectionDescriptor, Document, Error, Id, Result, TxnEnv};

use crate::config::Config;
use crate::txn::{AttachedDb, ReadTxn, WriteTxn};

/// Per-batch refill size for [`IterAll`]. The iterator yields one
/// document at a time to the caller, but internally fetches the
/// primary B-tree in `BATCH` chunks so the per-step pager-lock
/// acquisition cost amortises over many `next` calls. Power-of-ten
/// Rule 3: the buffer is fixed-size (constant 256 entries — at
/// ~512 bytes/doc that's ~128 KiB peak); the buffer does NOT scale
/// with the collection's total size.
const ITER_ALL_BATCH: usize = 256;

/// The embedded document database.
///
/// `Db` is `Send + Sync`; share across threads via `Arc<Db>` for
/// concurrent reader / single-writer access.  See
/// [`design.md`](https://github.com/uname-n/obj/blob/master/design.md)
/// § "Concurrent readers, one writer".
///
/// At M6 the public `Db` is hard-typed against
/// `obj_core::FileHandle`.  A future refactor may make it generic
/// over `F: FileBackend` so fault-injection harnesses can build on
/// the same API; today the test helpers reach for the lower-level
/// `obj-core` building blocks instead.
pub struct Db {
    pub(crate) env: Arc<TxnEnv<FileHandle>>,
    pub(crate) catalog: Arc<Mutex<Catalog<FileHandle>>>,
    pub(crate) readonly: bool,
    pub(crate) busy_timeout: Duration,
    /// Per-process cache of `(collection, version)` keys whose
    /// `Document::indexes()` reconciliation has already run. M7
    /// #57: reconciliation is idempotent but a catalog walk +
    /// declaration cycle is non-trivial — caching ensures only the
    /// first `WriteTxn::collection::<T>()` call per process per
    /// `(collection, version)` pays the cost. #130: the version is part
    /// of the key so a later schema version that ADDS an index still
    /// reconciles (the name-only key skipped it, leaving the new index
    /// never `Active`).
    pub(crate) reconciled: Arc<Mutex<HashSet<(String, u32)>>>,
    /// M11 #93: registry of attached read-only databases keyed by
    /// namespace. Populated by [`Db::attach`]; consulted by
    /// [`Db::collection_namespace`] and by
    /// [`Db::read_transaction`] when it pins per-attached
    /// snapshots. `Arc<Mutex<_>>` because `attach` / `detach` take
    /// `&mut self` while live read transactions hold borrows into
    /// the registry; the mutex coordinates the two.
    pub(crate) attached: Arc<Mutex<HashMap<String, AttachedDb>>>,
    /// #83 (c): published length of `attached`, mutated only while the
    /// `attached` mutex is held. A relaxed load lets the hot read path
    /// ([`Self::pin_attached_snapshots`]) skip the mutex acquire when
    /// no databases are attached (the overwhelmingly common case).
    pub(crate) attached_len: Arc<std::sync::atomic::AtomicUsize>,
}

impl std::fmt::Debug for Db {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        f.debug_struct("Db")
            .field("readonly", &self.readonly)
            .field("busy_timeout", &self.busy_timeout)
            .finish_non_exhaustive()
    }
}

// ---------- FFI plumbing ------------------------------------------
//
// The accessors below expose the shareable internals of `Db` so the
// `libobj` C-ABI crate can wire its own Arc-shaped transaction and
// iterator handles without re-implementing the lifecycle. They are
// marked `#[doc(hidden)]` because user-Rust code should reach for
// the typed `Db::transaction` / `Db::read_transaction` API; the
// raw accessors exist so a sibling FFI / native-host crate can
// build its own self-contained handle types.
impl Db {
    /// Shared environment Arc — pager + cross-process lock file.
    ///
    /// Used by [`libobj`](../../libobj/index.html) to build owned
    /// transaction handles whose lifetime extends past a single
    /// `Db::transaction` closure call.
    #[doc(hidden)]
    #[must_use]
    pub fn env_arc(&self) -> Arc<TxnEnv<FileHandle>> {
        Arc::clone(&self.env)
    }

    /// Shared catalog Arc.
    ///
    /// Used by [`libobj`](../../libobj/index.html) for the same
    /// reason as [`Self::env_arc`].
    #[doc(hidden)]
    #[must_use]
    pub fn catalog_arc(&self) -> Arc<Mutex<Catalog<FileHandle>>> {
        Arc::clone(&self.catalog)
    }

    /// Shared per-process reconciliation cache Arc.
    ///
    /// Used by [`libobj`](../../libobj/index.html) for the same
    /// reason as [`Self::env_arc`].
    #[doc(hidden)]
    #[must_use]
    pub fn reconciled_arc(&self) -> Arc<Mutex<HashSet<(String, u32)>>> {
        Arc::clone(&self.reconciled)
    }

    /// Busy-lock timeout configured at open time.
    #[doc(hidden)]
    #[must_use]
    pub fn busy_timeout(&self) -> Duration {
        self.busy_timeout
    }
}

impl Db {
    /// Open or create a file-backed database at `path` with default
    /// configuration.
    ///
    /// Creates the file if absent; reopens otherwise. A `Db` is
    /// `Send + Sync` — share across threads via `Arc<Db>` for the
    /// concurrent-reader / single-writer workload documented in
    /// [`docs/concurrency.md`](https://github.com/uname-n/obj/blob/master/docs/concurrency.md).
    ///
    /// For an ephemeral database use [`Db::memory`]; for a
    /// read-only handle that coexists with another process's writer
    /// use [`Db::open_readonly`]; for custom durability /
    /// cache / lock knobs use [`Db::open_with`] + [`Config`].
    ///
    /// # Examples
    ///
    /// ```
    /// # fn main() -> obj::Result<()> {
    /// use obj::Db;
    ///
    /// let dir = tempfile::tempdir()?;
    ///
    /// // File-backed. Creates the file if absent; reopens otherwise.
    /// let _db = Db::open(dir.path().join("app.obj"))?;
    ///
    /// // In-memory. No persistence, no file locks. Useful for tests.
    /// let _mem = Db::memory()?;
    ///
    /// // Read-only. Coexists safely with a writer in another process.
    /// // Every mutating call returns `Err(Error::ReadOnly { ... })`.
    /// let _ro = Db::open_readonly(dir.path().join("app.obj"))?;
    /// # Ok(())
    /// # }
    /// ```
    ///
    /// # Errors
    ///
    /// Returns the underlying [`Error`] from
    /// [`obj_core::pager::Pager::open`] on syscall or format
    /// failure.
    #[cfg_attr(
        feature = "tracing",
        tracing::instrument(
            name = "db.open",
            level = "info",
            skip_all,
            fields(path = %path.as_ref().display()),
        )
    )]
    pub fn open<P: AsRef<Path>>(path: P) -> Result<Self> {
        Self::open_with(path, Config::default())
    }

    /// Open or create a file-backed database with `config`.
    ///
    /// # Errors
    ///
    /// As [`Db::open`].
    // Takes `Config` by value: this is the frozen builder-handoff
    // signature (the caller's `Config::default().sync_mode(..)...`
    // chain ends here). `Config` is no longer `Copy` (issue #17), so
    // clippy now sees the by-value argument as only borrowed in the
    // body — but the by-value convention is part of the 1.0 surface
    // and must not change. Power-of-ten Rule 10: documented allow.
    #[allow(clippy::needless_pass_by_value)]
    #[cfg_attr(
        feature = "tracing",
        tracing::instrument(
            name = "db.open",
            level = "info",
            skip_all,
            fields(path = %path.as_ref().display()),
        )
    )]
    pub fn open_with<P: AsRef<Path>>(path: P, mut config: Config) -> Result<Self> {
        let path_buf = path.as_ref().to_path_buf();
        // Issue #31: the pager `Config` is no longer `Copy` (its
        // `encryption_key` zeroizes on drop), so move it out of
        // `config` rather than copying it — `from_parts` only reads
        // the remaining scalar fields. `mem::take` leaves a default
        // (keyless) pager `Config` behind, which is never read again.
        let pager_config = std::mem::take(&mut config.pager);
        let pager = Pager::open(&path_buf, pager_config)?;
        // Issue #1: cross-process locks anchor against a dedicated
        // `<db>.obj-lock` sidecar file. Keeping the lock byte out
        // of the main DB file avoids `ERROR_LOCK_VIOLATION` on
        // Windows once the DB grows past whatever offset the
        // lock byte would otherwise sit at (`LockFileEx` is
        // mandatory). The sidecar is left in place across opens;
        // deleting it would race two openers into two different
        // inodes and miss each other's exclusion.
        let lock_file = if config.cross_process_lock {
            let lock_path = lock_path_for(&path_buf);
            let handle = FileHandle::open_or_create(&lock_path)?;
            // Materialise the locked byte range as real file
            // content. POSIX OFD and Windows `LockFileEx` both
            // permit locking past EOF, but a non-empty sidecar
            // is the most conservative choice across kernels
            // (and lets the same offsets work on every platform).
            // 128 bytes covers WRITER_LOCK (96) and the full
            // READER_LOCK_RANGE (97..128).
            handle.set_len(128)?;
            Some(Arc::new(handle))
        } else {
            None
        };
        Self::from_parts(pager, lock_file, &config)
    }

    /// Open a fresh in-memory database.  No persistence, no file
    /// locks.  Useful for unit tests and ephemeral workloads.
    ///
    /// # Errors
    ///
    /// Returns [`Error::InvalidArgument`] only if `config` has
    /// zero cache frames.
    pub fn memory() -> Result<Self> {
        Self::memory_with(Config::default())
    }

    /// As [`Db::memory`] with a caller-supplied [`Config`].
    ///
    /// # Errors
    ///
    /// As [`Db::memory`].
    // By-value `Config` is the frozen builder-handoff signature; see
    // the note on [`Db::open_with`]. Power-of-ten Rule 10: documented
    // allow (issue #17 dropped `Copy`, which made the lint fire).
    #[allow(clippy::needless_pass_by_value)]
    pub fn memory_with(mut config: Config) -> Result<Self> {
        // Issue #31: move the (no-longer-`Copy`) pager `Config` out
        // rather than copying it; `from_parts` only reads the
        // remaining scalar fields. See `open_with` for the rationale.
        let pager_config = std::mem::take(&mut config.pager);
        let pager = Pager::memory(pager_config)?;
        Self::from_parts(pager, None, &config)
    }

    /// Open the database at `path` in read-only mode.  The
    /// resulting `Db` rejects every mutating operation with
    /// `Err(Error::ReadOnly { ... })`.  Coexists safely with a
    /// writer in another process via the cross-process reader
    /// lock.
    ///
    /// # Errors
    ///
    /// As [`Db::open`].
    pub fn open_readonly<P: AsRef<Path>>(path: P) -> Result<Self> {
        let config = Config {
            readonly: true,
            ..Config::default()
        };
        Self::open_with(path, config)
    }

    fn from_parts(
        mut pager: Pager<FileHandle>,
        lock_file: Option<Arc<FileHandle>>,
        config: &Config,
    ) -> Result<Self> {
        // #64: catalog init (and its `tree.insert` → `alloc_page` /
        // `set_root_catalog` chain) mutates the file header. Header
        // mutations now ride the WAL via `stage_or_write_header`,
        // which requires an open Pager txn so the staged page-0 frame
        // commits atomically with the B-tree pages it depends on.
        // Wrap the init call in `begin_txn` / `commit` / `end_txn` so
        // a `Db::open` against a fresh file is durable on return
        // (matching the pre-#64 contract). `open_or_init` on an
        // already-initialised database is read-only and the wrapped
        // `commit()` is a no-op (`Pager::commit` short-circuits on an
        // empty pending set + clean `header_dirty`).
        pager.begin_txn();
        let init = Catalog::open_or_init(&mut pager);
        let catalog = match init {
            Ok(c) => {
                let commit_result = pager.commit();
                pager.end_txn();
                commit_result?;
                c
            }
            Err(e) => {
                pager.end_txn();
                return Err(e);
            }
        };
        // M11 #91: lightweight open-time integrity check. Runs the
        // catalog-portion of `integrity_check` and surfaces any
        // detected failure as the strongest available error so the
        // caller decides whether to attempt repair or abort. Opt out
        // via `Config::skip_open_check(true)`.
        if !config.skip_open_check {
            let report = obj_core::integrity::quick_check(&mut pager)?;
            if let Some(err) = first_failure_as_error(&report) {
                return Err(err);
            }
        }
        Ok(Self {
            env: Arc::new(TxnEnv::new(pager, lock_file)),
            catalog: Arc::new(Mutex::new(catalog)),
            readonly: config.readonly,
            busy_timeout: config.busy_timeout,
            reconciled: Arc::new(Mutex::new(HashSet::new())),
            attached: Arc::new(Mutex::new(HashMap::new())),
            attached_len: Arc::new(std::sync::atomic::AtomicUsize::new(0)),
        })
    }

    /// Run a closure inside a write transaction.
    ///
    /// Begins a [`WriteTxn`], runs the closure with `&mut tx`.  If
    /// the closure returns `Ok(r)`, the transaction is committed and
    /// `Ok(r)` is returned.  If the closure returns `Err(e)`, the
    /// transaction is rolled back and `Err(e)` is returned.  A
    /// panic inside the closure unwinds with an implicit rollback
    /// via the `WriteTxn` `Drop` impl. See
    /// [`docs/concurrency.md`](https://github.com/uname-n/obj/blob/master/docs/concurrency.md)
    /// for the lock-acquisition contract.
    ///
    /// # Examples
    ///
    /// Atomic batch — both inserts commit together or not at all:
    ///
    /// ```
    /// # fn main() -> obj::Result<()> {
    /// use obj::Db;
    /// use serde::{Deserialize, Serialize};
    ///
    /// #[derive(Debug, Serialize, Deserialize, obj::Document)]
    /// struct Order {
    ///     customer_id: u64,
    ///     total_cents: u64,
    /// }
    ///
    /// let dir = tempfile::tempdir()?;
    /// let db = Db::open(dir.path().join("txn.obj"))?;
    ///
    /// let (a, b) = db.transaction(|tx| {
    ///     let coll = tx.collection::<Order>()?;
    ///     let a = coll.insert(Order { customer_id: 1, total_cents: 50 })?;
    ///     let b = coll.insert(Order { customer_id: 2, total_cents: 200 })?;
    ///     Ok((a, b))
    /// })?;
    /// assert_ne!(a, b, "freshly-allocated ids are distinct");
    /// # Ok(())
    /// # }
    /// ```
    ///
    /// Returning `Err(_)` rolls every staged write back; the `Err`
    /// the closure returns is the `Err` the caller sees:
    ///
    /// ```
    /// # fn main() -> obj::Result<()> {
    /// use obj::{Db, Error};
    /// use serde::{Deserialize, Serialize};
    ///
    /// #[derive(Debug, Serialize, Deserialize, obj::Document)]
    /// struct Order { total_cents: u64 }
    ///
    /// let dir = tempfile::tempdir()?;
    /// let db = Db::open(dir.path().join("rollback.obj"))?;
    /// let id = db.insert(Order { total_cents: 10 })?;
    ///
    /// let outcome: obj::Result<()> = db.transaction(|tx| {
    ///     let coll = tx.collection::<Order>()?;
    ///     coll.update(id, |o| { o.total_cents = 99_999; })?;
    ///     Err(Error::InvalidArgument("synthetic abort"))
    /// });
    /// assert!(matches!(outcome, Err(Error::InvalidArgument(_))));
    ///
    /// let after: Order = db
    ///     .get::<Order>(id)?
    ///     .ok_or(Error::InvalidArgument("just inserted"))?;
    /// assert_eq!(after.total_cents, 10, "rolled-back update is invisible");
    /// # Ok(())
    /// # }
    /// ```
    ///
    /// # Errors
    ///
    /// - [`Error::ReadOnly`] if the database was opened read-only.
    /// - [`Error::Busy`] if a sibling transaction holds the lock(s).
    /// - Any error the closure returns.
    #[cfg_attr(
        feature = "tracing",
        tracing::instrument(name = "db.transaction", level = "info", skip_all)
    )]
    pub fn transaction<R, F>(&self, body: F) -> Result<R>
    where
        F: FnOnce(&mut WriteTxn<'_>) -> Result<R>,
    {
        if self.readonly {
            return Err(Error::ReadOnly {
                operation: "transaction",
            });
        }
        #[cfg(feature = "tracing")]
        tracing::debug!("begin");
        let inner = obj_core::WriteTxn::begin(&self.env, self.busy_timeout)?;
        let mut tx = WriteTxn::new(
            inner,
            Arc::clone(&self.catalog),
            Arc::clone(&self.reconciled),
        );
        match body(&mut tx) {
            Ok(value) => {
                tx.commit()?;
                #[cfg(feature = "tracing")]
                tracing::debug!("commit");
                Ok(value)
            }
            Err(e) => {
                // Best-effort rollback; surface the closure's
                // error.  Refresh the in-memory catalog after
                // rollback so its B-tree root state matches the
                // file's `root_catalog` header field — catalog
                // mutations write the header directly (not through
                // the WAL) so a rollback leaves the header pointing
                // at the most-recent in-memory root, which is
                // exactly what re-opening the catalog will pick up.
                let _ = tx.rollback();
                let _ = self.refresh_catalog();
                #[cfg(feature = "tracing")]
                tracing::debug!("rollback");
                Err(e)
            }
        }
    }

    /// Re-open the in-memory `Catalog` handle from the pager.  Used
    /// after a transaction rollback to discard the catalog's in-
    /// memory `next_collection_id` / `tree.root` state that may
    /// have advanced during the rolled-back closure.
    fn refresh_catalog(&self) -> Result<()> {
        let mut pager = self.env.pager().lock().map_err(|_| Error::Busy {
            kind: obj_core::LockKind::WriterInProcess,
        })?;
        let fresh = obj_core::Catalog::open_or_init(&mut pager)?;
        let mut existing = self.catalog.lock().map_err(|_| Error::Busy {
            kind: obj_core::LockKind::WriterInProcess,
        })?;
        *existing = fresh;
        Ok(())
    }

    /// Run a closure inside a read transaction.  See
    /// [`Self::transaction`] for the closure shape and atomicity
    /// contract; reads inside `body` observe a consistent
    /// snapshot of the database.
    ///
    /// Every read inside the closure observes a single consistent
    /// snapshot — the snapshot is pinned at the moment the closure
    /// begins. Concurrent writers do not affect what the closure
    /// sees.
    ///
    /// # Examples
    ///
    /// Two reads inside one `read_transaction` see the same value
    /// even if a writer commits in between:
    ///
    /// ```
    /// # fn main() -> obj::Result<()> {
    /// use obj::Db;
    /// use serde::{Deserialize, Serialize};
    ///
    /// #[derive(Debug, PartialEq, Eq, Serialize, Deserialize, obj::Document)]
    /// struct Order { total_cents: u64 }
    ///
    /// let dir = tempfile::tempdir()?;
    /// let db = Db::open(dir.path().join("read.obj"))?;
    /// let id = db.insert(Order { total_cents: 10 })?;
    ///
    /// let (a, b) = db.read_transaction(|tx| {
    ///     let coll = tx.collection::<Order>()?;
    ///     let a = coll.get(id)?;
    ///     let b = coll.get(id)?;
    ///     Ok((a, b))
    /// })?;
    /// assert_eq!(a, b);
    /// # Ok(())
    /// # }
    /// ```
    ///
    /// # Errors
    ///
    /// - [`Error::Busy`] if the reader lock could not be acquired.
    /// - Any error the closure returns.
    #[cfg_attr(
        feature = "tracing",
        tracing::instrument(name = "db.read_transaction", level = "info", skip_all)
    )]
    pub fn read_transaction<R, F>(&self, body: F) -> Result<R>
    where
        F: FnOnce(&ReadTxn<'_>) -> Result<R>,
    {
        #[cfg(feature = "tracing")]
        tracing::debug!("begin");
        let inner = obj_core::ReadTxn::begin_with_timeout(&self.env, self.busy_timeout)?;
        // M11 #93: pin one snapshot per attached database so reads
        // through the closure observe a consistent view per file.
        let attached_contexts = self.pin_attached_snapshots()?;
        let tx = ReadTxn::with_attached(inner, attached_contexts);
        let result = body(&tx);
        #[cfg(feature = "tracing")]
        tracing::debug!("commit");
        result
    }

    /// Snapshot every attached database into a per-namespace
    /// [`crate::txn::AttachedReadCtx`]. Each context owns its own
    /// pin; dropping the returned map releases every pin.
    fn pin_attached_snapshots(
        &self,
    ) -> Result<std::collections::HashMap<String, crate::txn::AttachedReadCtx>> {
        // #83 (c): the common case is no attached databases. A single
        // relaxed load of `attached_len` lets the empty path skip the
        // `self.attached` mutex acquire entirely (`with_capacity(0)`
        // already does not allocate, so the mutex is the only cost).
        //
        // Correctness under concurrent detach: the map produced here
        // feeds only namespaced read dispatch in `open_readonly_named`
        // (`<ns>.<tail>` → attached snapshot). With zero attachments no
        // namespace can resolve, so an empty map is observably the same
        // as the locked build. `attached_len` is mutated only while the
        // `attached` mutex is held (in `attach` / `detach`), so a `0`
        // observed here means "no attachment is committed to the map",
        // matching what a lock-and-read would have seen at that instant.
        // A concurrent attach that has not yet published its count is
        // exactly an attach ordered *after* this txn began — invisible
        // by design, same as the cross-process snapshot semantics.
        if self.attached_len.load(std::sync::atomic::Ordering::Relaxed) == 0 {
            return Ok(std::collections::HashMap::new());
        }
        let registry = self.attached.lock().map_err(|_| Error::Busy {
            kind: obj_core::LockKind::WriterInProcess,
        })?;
        let mut out: std::collections::HashMap<String, crate::txn::AttachedReadCtx> =
            std::collections::HashMap::with_capacity(registry.len());
        for (namespace, attached) in registry.iter() {
            let env = Arc::clone(&attached.env);
            let snapshot = {
                let mut pager = env.pager().lock().map_err(|_| Error::Busy {
                    kind: obj_core::LockKind::WriterInProcess,
                })?;
                pager.reader_snapshot()?
            };
            out.insert(
                namespace.clone(),
                crate::txn::AttachedReadCtx { env, snapshot },
            );
        }
        Ok(out)
    }

    /// Pin one [`crate::txn::AttachedReadCtx`] for the single attachment
    /// registered under `namespace`. Used by [`Self::dump_raw`]'s
    /// namespaced full-scan path: it needs exactly one attached env +
    /// pinned snapshot, not the whole per-namespace map
    /// [`Self::pin_attached_snapshots`] builds.
    ///
    /// The snapshot is pinned for the returned context's lifetime; the
    /// caller (the `DumpIter`) owns the context, so the pin holds for
    /// the iterator's whole life and a concurrent `detach` cannot
    /// invalidate the in-flight scan.
    ///
    /// # Errors
    ///
    /// - [`Error::CollectionNamespaceUnknown`] if `namespace` is not
    ///   attached on this handle.
    /// - [`Error::Busy`] if the registry / pager mutex is poisoned.
    pub(crate) fn pin_attached_ctx(&self, namespace: &str) -> Result<crate::txn::AttachedReadCtx> {
        let env = {
            let registry = self.attached.lock().map_err(|_| Error::Busy {
                kind: obj_core::LockKind::WriterInProcess,
            })?;
            let attached =
                registry
                    .get(namespace)
                    .ok_or_else(|| Error::CollectionNamespaceUnknown {
                        namespace: namespace.to_owned(),
                    })?;
            Arc::clone(&attached.env)
        };
        let snapshot = {
            let mut pager = env.pager().lock().map_err(|_| Error::Busy {
                kind: obj_core::LockKind::WriterInProcess,
            })?;
            pager.reader_snapshot()?
        };
        Ok(crate::txn::AttachedReadCtx { env, snapshot })
    }

    /// Attach the database at `path` under `namespace`. The
    /// attached file is opened read-only; collections in the
    /// attached file become visible to subsequent
    /// `Db::collection::<T>()` calls (and the one-shot per-op API
    /// — `Db::get::<T>()`, etc.) when `T::COLLECTION` is of the
    /// form `<namespace>.<collection_name>`.
    ///
    /// Writes against namespaced collections return
    /// [`Error::AttachedDatabaseIsReadOnly`].
    ///
    /// Each attached database gets its own snapshot pinned at
    /// read-transaction begin; [`Db::detach`] removes the registry
    /// entry but in-flight reads complete against their pinned
    /// snapshot.
    ///
    /// # Examples
    ///
    /// ```
    /// # fn main() -> obj::Result<()> {
    /// use obj::Db;
    /// use serde::{Deserialize, Serialize};
    ///
    /// #[derive(Debug, Clone, Serialize, Deserialize, obj::Document)]
    /// #[obj(collection = "orders_attach_doc")]
    /// struct Order { total_cents: u64 }
    ///
    /// // Same struct shape, namespaced collection name. Reads
    /// // against this type route to the attached "archive" db.
    /// #[derive(Debug, Clone, Serialize, Deserialize, obj::Document)]
    /// #[obj(collection = "archive.orders_attach_doc")]
    /// struct ArchivedOrder { total_cents: u64 }
    ///
    /// let dir = tempfile::tempdir()?;
    /// let live = dir.path().join("live.obj");
    /// let archive = dir.path().join("archive.obj");
    ///
    /// // Seed the archive (writes go through its own un-namespaced type).
    /// {
    ///     let archive_db = Db::open(&archive)?;
    ///     let _ = archive_db.insert(Order { total_cents: 999 })?;
    /// }
    ///
    /// // Open the live db, attach the archive under "archive".
    /// let mut db = Db::open(&live)?;
    /// let _ = db.insert(Order { total_cents: 100 })?;
    /// db.attach(&archive, "archive")?;
    ///
    /// // One read transaction, two collections.
    /// db.read_transaction(|tx| {
    ///     let live = tx.collection::<Order>()?;
    ///     let arch = tx.collection::<ArchivedOrder>()?;
    ///     assert_eq!(live.all()?.len(), 1);
    ///     assert_eq!(arch.all()?.len(), 1);
    ///     Ok(())
    /// })?;
    ///
    /// db.detach("archive")?;
    /// # Ok(())
    /// # }
    /// ```
    ///
    /// # Errors
    ///
    /// - [`Error::AttachmentAlreadyExists`] if `namespace` is in
    ///   use on this `Db`.
    /// - [`Error::AttachmentNotReadable`] if `path` cannot be
    ///   opened read-only.
    pub fn attach<P: AsRef<std::path::Path>>(
        &mut self,
        path: P,
        namespace: impl Into<String>,
    ) -> Result<()> {
        // The `&mut self` signature is preserved for API stability,
        // but the attachment registry is interior-mutable
        // (`Arc<Mutex<_>>` + `Arc<AtomicUsize>`), so the work is
        // delegated to the `&self` core shared with
        // [`Self::attach_shared`].
        self.attach_inner(path.as_ref(), namespace.into())
    }

    /// Shared-reference (`&self`) form of [`Self::attach`]. Attaches
    /// the database at `path` under `namespace` through `&self`, for
    /// callers that hold a shared handle (e.g. an `Arc<Db>`) and
    /// cannot obtain `&mut self`.
    ///
    /// Behaviour is identical to [`Self::attach`]: the attachment
    /// registry is interior-mutable, guarded by the same per-`Db`
    /// mutex, so concurrent `attach_shared` / `detach_shared` calls
    /// from multiple threads serialise on that mutex. The duplicate-
    /// namespace re-check after the read-only open closes the race
    /// between two concurrent attaches of the same namespace.
    ///
    /// # Examples
    ///
    /// ```
    /// # fn main() -> obj::Result<()> {
    /// use std::sync::Arc;
    /// use obj::Db;
    ///
    /// let dir = tempfile::tempdir()?;
    /// let live = dir.path().join("live.obj");
    /// let archive = dir.path().join("archive.obj");
    /// let _ = Db::open(&archive)?;
    ///
    /// // A shared handle cannot call `&mut self` `attach`, but can
    /// // call `attach_shared`.
    /// let db = Arc::new(Db::open(&live)?);
    /// db.attach_shared(&archive, "archive")?;
    /// db.detach_shared("archive")?;
    /// # Ok(())
    /// # }
    /// ```
    ///
    /// # Errors
    ///
    /// - [`Error::AttachmentAlreadyExists`] if `namespace` is in
    ///   use on this `Db`.
    /// - [`Error::AttachmentNotReadable`] if `path` cannot be
    ///   opened read-only.
    /// - [`Error::Busy`] if the registry mutex is poisoned.
    pub fn attach_shared<P: AsRef<std::path::Path>>(
        &self,
        path: P,
        namespace: impl Into<String>,
    ) -> Result<()> {
        self.attach_inner(path.as_ref(), namespace.into())
    }

    /// `&self` core shared by [`Self::attach`] and
    /// [`Self::attach_shared`]. Both signatures are thin wrappers; the
    /// mutation flows through the interior-mutable `attached` registry
    /// regardless of the public method's receiver.
    fn attach_inner(&self, path: &std::path::Path, namespace: String) -> Result<()> {
        let path_buf = path.to_path_buf();
        {
            let registry = self.attached.lock().map_err(|_| Error::Busy {
                kind: obj_core::LockKind::WriterInProcess,
            })?;
            if registry.contains_key(&namespace) {
                return Err(Error::AttachmentAlreadyExists { namespace });
            }
        }
        let attached_db =
            Db::open_readonly(&path_buf).map_err(|source| Error::AttachmentNotReadable {
                path: path_buf.clone(),
                source: Box::new(source),
            })?;
        let env = Arc::clone(&attached_db.env);
        let mut registry = self.attached.lock().map_err(|_| Error::Busy {
            kind: obj_core::LockKind::WriterInProcess,
        })?;
        // Re-check after acquiring the lock — a sibling caller may
        // have raced; the `Arc<Mutex<_>>` shape makes the check
        // necessary on the second acquire.
        if registry.contains_key(&namespace) {
            return Err(Error::AttachmentAlreadyExists { namespace });
        }
        registry.insert(
            namespace,
            AttachedDb {
                env,
                _db: attached_db,
            },
        );
        // #83 (c): publish the new length while still holding the
        // `attached` mutex so `pin_attached_snapshots`' relaxed load is
        // never stale relative to a committed attachment.
        self.attached_len
            .store(registry.len(), std::sync::atomic::Ordering::Relaxed);
        Ok(())
    }

    /// Remove the attachment registered under `namespace`. Returns
    /// [`Error::CollectionNamespaceUnknown`] if the namespace is
    /// not attached.
    ///
    /// In-flight read transactions hold their own snapshot pins on
    /// the attached env; detach removes the registry entry, but the
    /// in-flight read may still complete against its pinned
    /// snapshot.
    ///
    /// # Errors
    ///
    /// - [`Error::CollectionNamespaceUnknown`] if `namespace` is
    ///   not attached.
    /// - [`Error::Busy`] if the registry mutex is poisoned.
    pub fn detach(&mut self, namespace: &str) -> Result<()> {
        // `&mut self` preserved for API stability; the registry is
        // interior-mutable, so the work delegates to the `&self` core
        // shared with [`Self::detach_shared`].
        self.detach_inner(namespace)
    }

    /// Shared-reference (`&self`) form of [`Self::detach`]. Removes the
    /// attachment registered under `namespace` through `&self`, for
    /// callers that hold a shared handle (e.g. an `Arc<Db>`).
    ///
    /// Behaviour is identical to [`Self::detach`]. In-flight read
    /// transactions hold their own snapshot pins on the attached env;
    /// `detach_shared` removes the registry entry, but the in-flight
    /// read may still complete against its pinned snapshot. Concurrent
    /// `attach_shared` / `detach_shared` calls serialise on the same
    /// per-`Db` registry mutex.
    ///
    /// # Errors
    ///
    /// - [`Error::CollectionNamespaceUnknown`] if `namespace` is
    ///   not attached.
    /// - [`Error::Busy`] if the registry mutex is poisoned.
    pub fn detach_shared(&self, namespace: &str) -> Result<()> {
        self.detach_inner(namespace)
    }

    /// `&self` core shared by [`Self::detach`] and
    /// [`Self::detach_shared`].
    fn detach_inner(&self, namespace: &str) -> Result<()> {
        let mut registry = self.attached.lock().map_err(|_| Error::Busy {
            kind: obj_core::LockKind::WriterInProcess,
        })?;
        if registry.remove(namespace).is_none() {
            return Err(Error::CollectionNamespaceUnknown {
                namespace: namespace.to_owned(),
            });
        }
        // #83 (c): publish the post-detach length under the lock.
        self.attached_len
            .store(registry.len(), std::sync::atomic::Ordering::Relaxed);
        Ok(())
    }

    /// Insert `doc` into its collection.  One-shot transaction;
    /// returns the assigned [`Id`].
    ///
    /// The one-shot API opens, commits, and closes a private
    /// transaction per call. Reach for [`Db::transaction`] when
    /// several mutations must commit or roll back as a single
    /// atomic unit.
    ///
    /// # Examples
    ///
    /// One-shot CRUD against a `Document`-derived type:
    ///
    /// ```
    /// # fn main() -> obj::Result<()> {
    /// use obj::Db;
    /// use serde::{Deserialize, Serialize};
    ///
    /// #[derive(Debug, Clone, Serialize, Deserialize, obj::Document)]
    /// struct Order {
    ///     customer_id: u64,
    ///     total_cents: u64,
    ///     status: String,
    /// }
    ///
    /// let dir = tempfile::tempdir()?;
    /// let db = Db::open(dir.path().join("oneshot.obj"))?;
    ///
    /// // insert returns the freshly-allocated Id.
    /// let id = db.insert(Order {
    ///     customer_id: 1,
    ///     total_cents: 100,
    ///     status: "pending".to_owned(),
    /// })?;
    ///
    /// // get returns Option<T>.
    /// let _maybe: Option<Order> = db.get::<Order>(id)?;
    ///
    /// // update applies a closure in place.
    /// db.update::<Order, _>(id, |o| {
    ///     o.status = "shipped".to_owned();
    /// })?;
    ///
    /// // upsert at a caller-supplied id (insert or replace).
    /// let id2 = obj::Id::try_new(42)
    ///     .ok_or(obj::Error::InvalidArgument("non-zero"))?;
    /// db.upsert::<Order>(id2, Order {
    ///     customer_id: 2,
    ///     total_cents: 999,
    ///     status: "new".to_owned(),
    /// })?;
    ///
    /// // delete returns true if the row existed.
    /// let existed = db.delete::<Order>(id)?;
    /// assert!(existed);
    /// # Ok(())
    /// # }
    /// ```
    ///
    /// # Errors
    ///
    /// As [`Self::transaction`] plus any error from
    /// [`crate::Collection::insert`].
    pub fn insert<T: Document>(&self, doc: T) -> Result<Id> {
        self.transaction(|tx| tx.collection::<T>()?.insert(doc))
    }

    /// Fetch the document at `id`.  Returns `Ok(None)` if absent.
    ///
    /// # Errors
    ///
    /// As [`Self::read_transaction`] plus any error from
    /// [`crate::Collection::get`].
    pub fn get<T: Document>(&self, id: Id) -> Result<Option<T>> {
        // #83 (b): the one-shot point read goes through the fused
        // single-pager-lock path (descriptor resolve + primary get
        // under one guard) instead of `tx.collection()?.get()`, which
        // takes the pager mutex twice. Observably identical for the
        // one-shot caller (same snapshot, same `CollectionNotFound`
        // contract). Namespaced reads fall back to the handle path.
        self.read_transaction(|tx| crate::collection::fused_point_get::<T>(tx, id))
    }

    /// Update the document at `id` via the closure.
    ///
    /// # Errors
    ///
    /// - [`Error::DocumentNotFound`] if `id` does not exist.
    /// - As [`Self::transaction`].
    pub fn update<T, F>(&self, id: Id, f: F) -> Result<()>
    where
        T: Document,
        F: FnOnce(&mut T),
    {
        self.transaction(|tx| tx.collection::<T>()?.update(id, f))
    }

    /// Delete the document at `id`.  Returns `true` if it existed.
    ///
    /// # Errors
    ///
    /// As [`Self::transaction`].
    pub fn delete<T: Document>(&self, id: Id) -> Result<bool> {
        self.transaction(|tx| tx.collection::<T>()?.delete(id))
    }

    /// Insert or replace the document at `id`.
    ///
    /// # Errors
    ///
    /// As [`Self::transaction`].
    pub fn upsert<T: Document>(&self, id: Id, doc: T) -> Result<()> {
        self.transaction(|tx| tx.collection::<T>()?.upsert(id, doc))
    }

    /// Convenience wrapper around [`crate::Collection::find_unique`]
    /// — `db.find_unique::<Customer>("by_email", "ada@example.com")`.
    /// Runs inside a one-shot read transaction.
    ///
    /// `O(log n)`, no collection scan; the lookup walks the named
    /// index's B-tree directly. Defined only on `Unique` indexes —
    /// for the other kinds use
    /// [`Collection::lookup`](crate::Collection::lookup) or
    /// [`Collection::index_range`](crate::Collection::index_range).
    ///
    /// # Examples
    ///
    /// ```
    /// # fn main() -> obj::Result<()> {
    /// use obj::Db;
    /// use serde::{Deserialize, Serialize};
    ///
    /// #[derive(Debug, Clone, Serialize, Deserialize, obj::Document)]
    /// #[obj(collection = "customers_find_unique_doc")]
    /// struct Customer {
    ///     #[obj(index = unique)]
    ///     email: String,
    /// }
    ///
    /// let dir = tempfile::tempdir()?;
    /// let db = Db::open(dir.path().join("find-unique.obj"))?;
    /// let _ = db.insert(Customer { email: "ada@example.com".to_owned() })?;
    /// let by_email: Option<Customer> = db
    ///     .find_unique::<Customer>("email", "ada@example.com")?;
    /// assert!(by_email.is_some());
    /// # Ok(())
    /// # }
    /// ```
    ///
    /// # Errors
    ///
    /// As [`crate::Collection::find_unique`].
    pub fn find_unique<T: Document>(
        &self,
        index_name: &str,
        key: impl Into<obj_core::codec::Dynamic>,
    ) -> Result<Option<T>> {
        self.read_transaction(|tx| tx.collection::<T>()?.find_unique(index_name, key))
    }

    /// Construct a fresh M8 [`crate::Query`] builder rooted at this
    /// database. The builder borrows `&self` for the build phase;
    /// the borrow ends when [`crate::Query::fetch`] returns.
    ///
    /// Compose with [`Query::filter`](crate::Query::filter),
    /// [`Query::limit`](crate::Query::limit),
    /// [`Query::sort_by`](crate::Query::sort_by),
    /// [`Query::index_range`](crate::Query::index_range). Terminate
    /// with [`Query::fetch`](crate::Query::fetch) (for the
    /// documents) or [`Query::count`](crate::Query::count) (for the
    /// count alone).
    ///
    /// Mirrors [`design.md`](https://github.com/uname-n/obj/blob/master/design.md)
    /// § Querying — see the M8 examples in
    /// [`crates/obj/tests/design_md_queries.rs`](https://github.com/uname-n/obj/blob/master/crates/obj/tests/design_md_queries.rs)
    /// for the full surface.
    ///
    /// # Examples
    ///
    /// Top-N matching documents by an indexed field, ascending:
    ///
    /// ```
    /// # fn main() -> obj::Result<()> {
    /// use obj::Db;
    /// use obj_core::codec::Dynamic;
    /// use serde::{Deserialize, Serialize};
    ///
    /// #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
    /// enum OrderStatus { Pending, Shipped }
    ///
    /// #[derive(Debug, Clone, Serialize, Deserialize, obj::Document)]
    /// #[obj(collection = "orders_query_doc")]
    /// struct Order {
    ///     #[obj(index)]
    ///     customer_id: u64,
    ///     status: OrderStatus,
    ///     #[obj(index)]
    ///     placed_at: u64,
    /// }
    ///
    /// let dir = tempfile::tempdir()?;
    /// let db = Db::open(dir.path().join("queries.obj"))?;
    /// for i in 0..20u64 {
    ///     let _ = db.insert(Order {
    ///         customer_id: i % 3,
    ///         status: if i % 2 == 0 { OrderStatus::Pending } else { OrderStatus::Shipped },
    ///         placed_at: i * 1_000,
    ///     })?;
    /// }
    ///
    /// let pending: Vec<Order> = db
    ///     .query::<Order>()
    ///     .filter(|o| o.status == OrderStatus::Pending)
    ///     .sort_by(|o| Dynamic::U64(o.placed_at))
    ///     .limit(5)
    ///     .fetch()?;
    /// assert!(pending.len() <= 5);
    /// assert!(pending.iter().all(|o| o.status == OrderStatus::Pending));
    /// # Ok(())
    /// # }
    /// ```
    #[must_use]
    pub fn query<T: Document + Send + 'static>(&self) -> crate::Query<'_, T> {
        crate::Query::new(self)
    }

    /// Open a read-only typed handle to the collection registered
    /// under the runtime `name`, instead of the type's compile-time
    /// `T::COLLECTION`.
    ///
    /// This unlocks the
    /// [`design.md`](https://github.com/uname-n/obj/blob/master/design.md)
    /// § Portability example for
    /// attached databases: by passing a namespaced name like
    /// `"archive.orders"`, the returned [`crate::Collection`] reads
    /// from the database attached under the `"archive"` namespace
    /// — see [`Db::attach`].
    ///
    /// Construction is **infallible**: errors (missing collection,
    /// unknown namespace, busy lock) surface at the first method
    /// call on the handle, not at the call to `collection(name)`.
    /// Each read-only method on the returned handle opens a private
    /// [`Db::read_transaction`] and dispatches against the
    /// runtime-named collection's catalog row.
    ///
    /// # Read-only
    ///
    /// The returned handle rejects every mutating call —
    /// [`crate::Collection::insert`], `update`, `delete`, `upsert`
    /// all return [`Error::ReadOnly`]. To write into a non-default
    /// collection, override [`obj_core::Document::COLLECTION`] on
    /// the type itself (compile-time-bound) and use the regular
    /// [`Db::transaction`] / [`crate::WriteTxn::collection`] path.
    /// Phase 1B (M11 #94) intentionally limits the runtime accessor
    /// to reads — the write-through-runtime-name path requires
    /// engine plumbing that is deferred to a later milestone.
    ///
    /// # Example
    ///
    /// ```
    /// use obj::{Db, Document};
    /// use serde::{Deserialize, Serialize};
    /// use tempfile::tempdir;
    ///
    /// #[derive(Debug, Clone, Serialize, Deserialize)]
    /// struct Order { customer_id: u64, total_cents: u64 }
    ///
    /// impl Document for Order {
    ///     const COLLECTION: &'static str = "orders";
    ///     const VERSION: u32 = 1;
    /// }
    ///
    /// fn run() -> obj::Result<()> {
    ///     let dir = tempdir()?;
    ///     let archive_path = dir.path().join("archive.obj");
    ///     // Populate the archive database first.
    ///     {
    ///         let archive_db = Db::open(&archive_path)?;
    ///         archive_db.insert(Order { customer_id: 1, total_cents: 999 })?;
    ///     }
    ///     // Attach it under a namespace and read via the runtime
    ///     // accessor — no need to re-declare `Order` with a
    ///     // namespaced `COLLECTION`.
    ///     let main_path = dir.path().join("main.obj");
    ///     let mut db = Db::open(&main_path)?;
    ///     db.attach(&archive_path, "archive")?;
    ///     let archived: Vec<Order> = db
    ///         .collection::<Order>("archive.orders")
    ///         .all()?
    ///         .into_iter()
    ///         .map(|(_id, doc)| doc)
    ///         .collect();
    ///     assert_eq!(archived.len(), 1);
    ///     Ok(())
    /// }
    /// # run().unwrap();
    /// ```
    #[must_use]
    pub fn collection<T: Document + Send + 'static>(
        &self,
        name: impl Into<String>,
    ) -> crate::Collection<'_, T> {
        crate::Collection::<T>::lazy(self, name.into())
    }

    /// Convenience shim mirroring
    /// [`design.md`](https://github.com/uname-n/obj/blob/master/design.md) —
    /// `for order in db.all::<Order>()? { ... }`. Returns an owned
    /// `Vec<T>` (materialised). One-line shim over [`Db::iter_all`]
    /// that drives the streaming iterator to exhaustion and
    /// collects; if the collection is large enough that peak
    /// memory matters, prefer [`Db::iter_all`] directly.
    ///
    /// Sorting requires materialisation — the comparator needs
    /// every key up front. Use [`Db::query`] +
    /// [`Query::sort_by`](crate::Query::sort_by) for the
    /// top-N-sorted workload; the iterator side has no streaming
    /// sorted shape.
    ///
    /// # Examples
    ///
    /// ```
    /// # fn main() -> obj::Result<()> {
    /// use obj::Db;
    /// use serde::{Deserialize, Serialize};
    ///
    /// #[derive(Debug, Serialize, Deserialize, obj::Document)]
    /// struct Order { total_cents: u64 }
    ///
    /// let dir = tempfile::tempdir()?;
    /// let db = Db::open(dir.path().join("all.obj"))?;
    /// for i in 0..5u64 {
    ///     let _ = db.insert(Order { total_cents: i * 10 })?;
    /// }
    /// let listed: Vec<Order> = db.all::<Order>()?;
    /// assert_eq!(listed.len(), 5);
    /// # Ok(())
    /// # }
    /// ```
    ///
    /// # Errors
    ///
    /// As [`Db::iter_all`].
    pub fn all<T: Document + Send + 'static>(&self) -> Result<Vec<T>> {
        self.iter_all::<T>()?
            .map(|step| step.map(|(_id, doc)| doc))
            .collect()
    }

    /// Write a self-contained `.obj` file at `dest` carrying this
    /// database's state at the LSN of an internally-taken reader
    /// snapshot.
    ///
    /// Hot backup — writers continue uninterrupted against the
    /// source. Post-snapshot writes are NOT in the destination.
    ///
    /// Algorithm (per
    /// [`docs/format.md`](https://github.com/uname-n/obj/blob/master/docs/format.md)
    /// § Hot backup):
    ///
    /// 1. Take a `ReaderSnapshot` against the source pager (pins
    ///    `pinned_lsn`).
    /// 2. `OpenOptions::create_new(true)` on `dest`.
    /// 3. Copy main-file pages `0..page_count` to `dest`.
    /// 4. Overlay every frame in the snapshot's frozen WAL view
    ///    onto `dest` at the frame's page-id offset.
    /// 5. If the snapshot carries a WAL-staged page-0 header,
    ///    overlay it (so `dest`'s page-0 reflects the catalog
    ///    root / freelist head / page count the snapshot would
    ///    have observed).
    /// 6. Patch `dest`'s page-0 header: zero `wal_salt`, recompute
    ///    the header CRC32C.
    /// 7. `sync_data(SyncMode::Full)` on `dest`.
    /// 8. Drop the snapshot (releases the WAL pin).
    ///
    /// On any mid-backup error the destination file is removed
    /// best-effort so a half-written backup does not linger.
    ///
    /// # Examples
    ///
    /// ```
    /// # fn main() -> obj::Result<()> {
    /// use obj::Db;
    /// use serde::{Deserialize, Serialize};
    ///
    /// #[derive(Debug, Clone, Serialize, Deserialize, obj::Document)]
    /// #[obj(collection = "notes_backup_doc")]
    /// struct Note { body: String }
    ///
    /// let dir = tempfile::tempdir()?;
    /// let src = dir.path().join("src.obj");
    /// let dst = dir.path().join("backup.obj");
    ///
    /// let db = Db::open(&src)?;
    /// let _ = db.insert(Note { body: "before backup".to_owned() })?;
    /// db.backup_to(&dst)?;
    ///
    /// // The backup is itself a fully-formed obj file. Open it and read.
    /// let backup = Db::open(&dst)?;
    /// let listed: Vec<Note> = backup.all::<Note>()?;
    /// assert_eq!(listed.len(), 1);
    /// # Ok(())
    /// # }
    /// ```
    ///
    /// # Errors
    ///
    /// - [`Error::BackupDestinationExists`] if `dest` already
    ///   exists.
    /// - [`Error::BackupNotSupportedForMemoryPager`] when called on
    ///   a `Db` constructed via [`Db::memory`] / [`Db::memory_with`].
    /// - [`Error::Io`] on syscall failure during the copy.
    pub fn backup_to<P: AsRef<std::path::Path>>(&self, dest: P) -> Result<()> {
        // #76: a hot backup reads the source MAIN FILE page-by-page
        // (`backup::copy_main_file`). The in-process pager `Mutex`
        // alone excludes only same-process writers; a writer in a
        // SEPARATE OS process can checkpoint pages into the main file
        // mid-copy and yield a torn backup. Hold the cross-process
        // `WRITER_LOCK` for the whole copy so no external writer can
        // mutate the main file under us.
        //
        // Lock-order invariant (MUST match `WriteTxn::begin`, see
        // `docs/concurrency.md`): in-process write-serialization FIRST,
        // cross-process `WRITER_LOCK` SECOND, pager `Mutex` LAST
        // (innermost, acquired and released within the critical
        // section). `obj_core::WriteTxn::begin` acquires the first two
        // in exactly that order, so routing through it inherits the
        // correct ordering and cannot invert against a concurrent
        // in-process `WriteTxn`. We make NO writes; the txn is rolled
        // back (a no-op restore of the unchanged header) once the copy
        // is done. On in-memory / `cross_process_lock = false` envs the
        // cross-process guard is absent (`lock_file = None`) and the
        // txn degrades to the in-process serialization guard only.
        let guard = obj_core::WriteTxn::begin(&self.env, self.busy_timeout)?;
        let result = self.run_backup_under_guard(dest);
        // Release both lock layers in the documented order regardless
        // of the copy's outcome. A rollback failure (poisoned pager
        // mutex) only surfaces when the copy itself succeeded.
        let unlock = guard.rollback();
        result.and(unlock)
    }

    /// #76 helper: take the pager `Mutex` (innermost lock), pin a
    /// reader snapshot, and run the backup copy. Factored out of
    /// [`Self::backup_to`] so the cross-process lock guard's lifetime
    /// is unambiguous and the function stays within the power-of-ten
    /// 60-line budget (Rule 4).
    fn run_backup_under_guard<P: AsRef<std::path::Path>>(&self, dest: P) -> Result<()> {
        let mut pager = self.env.pager().lock().map_err(|_| Error::Busy {
            kind: obj_core::LockKind::WriterInProcess,
        })?;
        let snapshot = pager.reader_snapshot()?;
        obj_core::backup::backup_pager_to_path(&pager, &snapshot, dest)?;
        // Snapshot drops at end of scope; the pin is released and
        // any deferred checkpoint may proceed.
        drop(snapshot);
        drop(pager);
        Ok(())
    }

    /// Fold committed WAL pages into the main `.obj` file and reset
    /// the WAL back to its 64-byte header.
    ///
    /// Wraps [`obj_core::pager::Pager::checkpoint`]. Acquires the
    /// pager lock the same way [`Self::backup_to`] does, then calls
    /// the pager checkpoint. After it returns, the committed records
    /// live in the main file rather than the `-wal` sidecar, and the
    /// WAL is truncated to header-only.
    ///
    /// # Deferred / no-op behavior
    ///
    /// - If a live MVCC reader has pinned an LSN below the end of the
    ///   WAL, the checkpoint is **deferred** (a safe no-op) so the
    ///   reader's frames are not reclaimed out from under it.
    /// - If there is nothing to fold (no committed WAL frames), the
    ///   call is a harmless no-op.
    ///
    /// This is the reusable engine entry point behind the Python
    /// `Db.checkpoint()` binding and a future checkpoint-on-close
    /// path (issue #5).
    ///
    /// # Errors
    ///
    /// - [`Error::ReadOnly`] if the database was opened read-only.
    /// - [`Error::Busy`] if the pager lock is poisoned.
    /// - Any [`Error`] from [`obj_core::pager::Pager::checkpoint`]
    ///   (e.g. [`Error::Io`] on syscall failure).
    #[cfg_attr(
        feature = "tracing",
        tracing::instrument(name = "db.checkpoint", level = "info", skip_all)
    )]
    pub fn checkpoint(&self) -> Result<()> {
        if self.readonly {
            return Err(Error::ReadOnly {
                operation: "checkpoint",
            });
        }
        // #76: checkpoint folds committed WAL frames into the main
        // file and rotates the WAL salt. Both touch the main file and
        // the on-disk header. Taken under only the in-process pager
        // `Mutex` (as before), a writer in a SEPARATE process could
        // run its own commit/checkpoint concurrently and corrupt the
        // salt-rotation handshake or interleave main-file writes. Hold
        // the cross-process `WRITER_LOCK` for the duration, in the
        // documented lock order (in-process serialization → cross-
        // process writer lock → pager mutex; see `WriteTxn::begin` and
        // `Self::backup_to`). We make no user writes; the surrounding
        // txn is rolled back afterwards. The rollback's
        // `restore_header_snapshot` only rewrites `root_catalog` /
        // `freelist_head` / `page_count` (unchanged by checkpoint) and
        // preserves the freshly-rotated `wal_salt`, so it is a no-op
        // with respect to the checkpoint's durable effects.
        let guard = obj_core::WriteTxn::begin(&self.env, self.busy_timeout)?;
        let result = self.run_checkpoint_under_guard();
        let unlock = guard.rollback();
        result.and(unlock)
    }

    /// #76 helper: take the pager `Mutex` (innermost lock) and run the
    /// pager checkpoint. Factored out of [`Self::checkpoint`] so the
    /// cross-process lock guard's lifetime is unambiguous.
    fn run_checkpoint_under_guard(&self) -> Result<()> {
        let mut pager = self.env.pager().lock().map_err(|_| Error::Busy {
            kind: obj_core::LockKind::WriterInProcess,
        })?;
        pager.checkpoint()?;
        drop(pager);
        Ok(())
    }

    /// Streaming iterator over every `(Id, T)` pair in the
    /// collection. The returned [`IterAll`] holds a read transaction
    /// — and therefore a pinned reader snapshot — for its entire
    /// lifetime; the borrow on `self` ends when the iterator is
    /// dropped.
    ///
    /// Each `next` call yields `Result<(Id, T)>`. Per-doc decode
    /// errors surface as `Some(Err(_))` rather than ending the
    /// iteration; the caller decides whether to propagate or
    /// continue.
    ///
    /// Peak memory does NOT scale with collection size — the
    /// iterator's internal buffer is fixed at
    /// `ITER_ALL_BATCH = 256` entries (~128 KiB at the
    /// [`design.md`](https://github.com/uname-n/obj/blob/master/design.md)
    /// ~512 byte/doc estimate). Power-of-ten Rule 3.
    ///
    /// `Query::fetch` is sort-compatible (sort requires
    /// materialisation); `iter_all` is NOT — there is no streaming
    /// shape for sort because the comparator needs every key
    /// up front. Use `Query::sort_by` + `fetch` for the sorted
    /// workload; use `iter_all` for the unsorted large-scan
    /// workload.
    ///
    /// # Examples
    ///
    /// Streaming a small collection and folding into a running sum.
    /// The iterator's peak memory stays bounded regardless of how
    /// many documents the collection holds:
    ///
    /// ```
    /// # fn main() -> obj::Result<()> {
    /// use obj::Db;
    /// use serde::{Deserialize, Serialize};
    ///
    /// #[derive(Debug, Serialize, Deserialize, obj::Document)]
    /// struct Order { total_cents: u64 }
    ///
    /// let dir = tempfile::tempdir()?;
    /// let db = Db::open(dir.path().join("iter.obj"))?;
    /// for i in 0..5u64 {
    ///     let _ = db.insert(Order { total_cents: i * 10 })?;
    /// }
    ///
    /// let mut total: u64 = 0;
    /// for step in db.iter_all::<Order>()? {
    ///     let (_id, doc) = step?;
    ///     total = total
    ///         .checked_add(doc.total_cents)
    ///         .ok_or(obj::Error::InvalidArgument("overflow"))?;
    /// }
    /// assert_eq!(total, 0 + 10 + 20 + 30 + 40);
    /// # Ok(())
    /// # }
    /// ```
    ///
    /// # Errors
    ///
    /// - As [`Db::read_transaction`] (construction-time).
    /// - [`Error::CollectionNotFound`] if the collection is not yet
    ///   registered at the snapshot's pinned LSN.
    /// - Per-step iteration may yield `Some(Err(_))` for pager,
    ///   B-tree, or codec failures.
    pub fn iter_all<T: Document + Send + 'static>(&self) -> Result<IterAll<'_, T>> {
        let inner = obj_core::ReadTxn::begin_with_timeout(&self.env, self.busy_timeout)?;
        let txn = ReadTxn::new(inner);
        // Resolve the descriptor up front so an absent collection
        // surfaces at construction (matching `Db::all`'s pre-M8
        // contract). The handle's lifetime is bound to `txn`; we
        // drop the handle immediately and stash the descriptor.
        let descriptor = {
            let coll = txn.collection::<T>()?;
            coll.descriptor().clone()
        };
        Ok(IterAll {
            txn,
            descriptor,
            buffer: VecDeque::new(),
            last_emitted_key: None,
            finished: false,
            _phantom: PhantomData,
        })
    }
}

/// Streaming iterator returned by [`Db::iter_all`].
///
/// Holds a [`ReadTxn`] for its lifetime so every yielded document
/// is consistent with the snapshot pinned at construction. Yields
/// `Result<(Id, T)>` one entry at a time; refills its internal
/// buffer in fixed-size chunks (`ITER_ALL_BATCH = 256` entries) per
/// pager-lock acquisition, so peak memory stays bounded at a small
/// constant regardless of the collection's size — power-of-ten
/// Rule 3.
///
/// Construction errors surface at the [`Db::iter_all`] call site;
/// per-step errors (pager, B-tree, codec) surface as
/// `Some(Err(_))` during iteration and do NOT terminate it — the
/// caller decides whether to continue.
// Note: `IterAll<T>` is deliberately outside the `serde` surface
// (issue #6). It holds a live `ReadTxn<'db>` snapshot pin plus a
// `VecDeque<Result<(Id, T)>>` buffer keyed by the obj-core `Result`,
// neither of which is serializable in a meaningful way. Users who
// need an off-line representation of iteration output should
// `collect::<Vec<_>>()` first (e.g. via `Db::all`) and serialize the
// resulting `Vec<T>` themselves.
pub struct IterAll<'db, T> {
    /// Owns the snapshot pin for the iterator's lifetime.
    txn: ReadTxn<'db>,
    /// Cached primary-tree root + collection id (`Document::decode`
    /// needs the collection id to validate the per-doc header).
    descriptor: CollectionDescriptor,
    /// Pre-decoded buffer of upcoming entries.
    buffer: VecDeque<Result<(Id, T)>>,
    /// Resumption marker — `Excluded(last_emitted_key)` is the
    /// start bound of the next refill.
    last_emitted_key: Option<Vec<u8>>,
    /// `true` once the underlying B-tree iterator returned no more
    /// entries; subsequent `next` calls return `None`.
    finished: bool,
    /// Track the `T` parameter without owning a value.
    _phantom: PhantomData<fn() -> T>,
}

impl<T> std::fmt::Debug for IterAll<'_, T> {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        f.debug_struct("IterAll")
            .field("collection_id", &self.descriptor.collection_id)
            .field("buffer_len", &self.buffer.len())
            .field("finished", &self.finished)
            .finish_non_exhaustive()
    }
}

impl<T: Document + Send + 'static> IterAll<'_, T> {
    /// Refill the internal buffer with up to [`ITER_ALL_BATCH`]
    /// entries, resuming from `last_emitted_key`. Stores per-doc
    /// decode errors as `Err` in the buffer so the caller can
    /// observe them via `next()` without aborting iteration.
    ///
    /// Power-of-ten Rule 7: every fallible operation is either
    /// captured into the buffer or returned up-front via `?` (which
    /// propagates the lock-acquisition / B-tree-open failures
    /// before the buffer is touched).
    fn refill(&mut self) -> Result<()> {
        // Clone the `Arc<Mutex<Pager>>` so the lock-acquisition does
        // NOT keep an immutable borrow of `self.txn` alive across
        // the buffer-mutation calls below.
        let pager_arc: Arc<Mutex<Pager<FileHandle>>> = Arc::clone(self.txn.inner.env().pager());
        let mut pager = pager_arc.lock().map_err(|_| Error::Busy {
            kind: obj_core::LockKind::WriterInProcess,
        })?;
        let root_pid = PageId::new(self.descriptor.primary_root)
            .ok_or(Error::InvalidArgument("collection primary_root is zero"))?;
        let tree = BTree::<FileHandle>::open(&pager, root_pid)?;
        let start = match &self.last_emitted_key {
            Some(k) => Bound::Excluded(k.clone()),
            None => Bound::Unbounded,
        };
        let collection_id = self.descriptor.collection_id;
        let iter = tree.range(&mut pager, (start, Bound::Unbounded))?;
        let mut yielded: usize = 0;
        let mut last_key: Option<Vec<u8>> = None;
        let mut batch: VecDeque<Result<(Id, T)>> = VecDeque::with_capacity(ITER_ALL_BATCH);
        for step in iter {
            if yielded >= ITER_ALL_BATCH {
                break;
            }
            yielded = yielded
                .checked_add(1)
                .ok_or(Error::BTreeInvariantViolated {
                    reason: "iter_all batch counter overflow",
                })?;
            buffer_one_entry::<T>(&mut batch, &mut last_key, collection_id, step);
        }
        if yielded < ITER_ALL_BATCH {
            self.finished = true;
        }
        // The lock is held until `pager` drops at function end — but
        // by the time we mutate `self.buffer` / `self.last_emitted_key`
        // below, the `iter` (which borrowed `pager`) is already
        // dropped (its scope ended). Move the staged batch over.
        drop(pager);
        self.buffer.extend(batch);
        if let Some(k) = last_key {
            self.last_emitted_key = Some(k);
        }
        Ok(())
    }
}

/// Process one B-tree iterator step into the staged `batch`. Decode
/// errors and corruption errors are captured as `Err` entries so
/// they surface via `next()` rather than aborting the refill —
/// power-of-ten Rule 7.
///
/// Free function (rather than `&mut self`) so the caller can keep
/// the pager lock open across multiple buffer-pushes without the
/// borrow checker tripping on a second `&mut self.buffer` borrow.
fn buffer_one_entry<T: Document>(
    batch: &mut VecDeque<Result<(Id, T)>>,
    last_key: &mut Option<Vec<u8>>,
    collection_id: u32,
    step: Result<(Vec<u8>, Vec<u8>)>,
) {
    let (key, value) = match step {
        Ok(kv) => kv,
        Err(e) => {
            batch.push_back(Err(e));
            return;
        }
    };
    let Some(id) = Id::from_be_bytes(&key) else {
        batch.push_back(Err(Error::InvalidArgument(
            "primary B-tree key is not an Id",
        )));
        return;
    };
    *last_key = Some(key);
    let decoded = obj_core::codec::decode::<T>(&value, collection_id);
    batch.push_back(decoded.map(|doc| (id, doc)));
}

impl<T: Document + Send + 'static> Iterator for IterAll<'_, T> {
    type Item = Result<(Id, T)>;

    fn next(&mut self) -> Option<Self::Item> {
        if let Some(item) = self.buffer.pop_front() {
            return Some(item);
        }
        if self.finished {
            return None;
        }
        if let Err(e) = self.refill() {
            // A lock / open failure surfaces here ONCE and then
            // `finished` is flipped so subsequent `next` calls
            // terminate cleanly. The error is surfaced to the
            // caller via `Some(Err(_))` — power-of-ten Rule 7.
            self.finished = true;
            return Some(Err(e));
        }
        self.buffer.pop_front()
    }
}

/// Split a possibly-namespaced collection name into its
/// `(Some("ns"), "name")` parts. The split is on the FIRST `.`
/// only; downstream collection names may contain further dots.
///
/// `"users"` → `(None, "users")`.
/// `"archive.orders"` → `(Some("archive"), "orders")`.
/// `"archive.orders.legacy"` → `(Some("archive"), "orders.legacy")`.
#[must_use]
pub(crate) fn split_namespace(name: &str) -> (Option<&str>, &str) {
    match name.find('.') {
        Some(idx) => (Some(&name[..idx]), &name[idx + 1..]),
        None => (None, name),
    }
}

/// `Db` is `Send + Sync` so it composes with `Arc<Db>` for
/// concurrent reader / single-writer workloads.  The thread-safety
/// is inherited from the underlying `Arc<TxnEnv>` + `Arc<Mutex<Catalog>>`.
const _: () = {
    fn assert_send_sync<T: Send + Sync>() {}
    let _ = assert_send_sync::<Db>;
};

/// Translate the first failure in `report` (if any) into the
/// strongest `Error` we can synthesise. Used by [`Db::from_parts`]'s
/// open-time fast check so the caller sees `Err(Error::Corruption {
/// page_id })` rather than an opaque `IntegrityReport`. The page-id
/// in the returned error is the locus of the failure when one is
/// available; for failures whose locus is a non-page (e.g. an
/// `OrphanIndexEntry`, which `quick_check` does NOT emit), the
/// catalog root page-id is used as a stand-in.
fn first_failure_as_error(report: &obj_core::IntegrityReport) -> Option<Error> {
    let first = report.failures.first()?;
    let err = match first {
        obj_core::IntegrityFailure::ChecksumMismatch { page_id }
        | obj_core::IntegrityFailure::OrphanPage { page_id }
        | obj_core::IntegrityFailure::BTreeSortViolation { page_id }
        | obj_core::IntegrityFailure::FreelistChainBroken { page_id }
        | obj_core::IntegrityFailure::BTreeSiblingChainBroken { page_id, .. }
        | obj_core::IntegrityFailure::BTreeLevelInvariantViolated { page_id, .. }
        | obj_core::IntegrityFailure::DanglingCatalogPointer { page_id, .. } => {
            Error::Corruption { page_id: *page_id }
        }
        obj_core::IntegrityFailure::BTreeDepthExceeded { limit, .. } => {
            Error::BTreeDepthExceeded { limit: *limit }
        }
        // The remaining variants — OrphanIndexEntry,
        // MissingIndexEntry, and any future `#[non_exhaustive]`
        // additions — are never emitted by `quick_check` (which
        // walks the catalog tree only, not the per-collection
        // primary or index trees). The defensive fallback is a
        // coarse `Corruption { page_id: 0 }` so the open-time
        // check still fails closed if the obj-core layer ever
        // grows a new fast-check failure mode.
        _ => Error::Corruption { page_id: 0 },
    };
    Some(err)
}