obj-db 1.1.1

Embedded document database. Stable file format, full ACID, single-file portability.
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
//! CLI-facing public hooks on [`Db`].
//!
//! Surface area used by the M12 `obj-cli` binary that does not fit
//! cleanly into the typed read/write transaction API:
//!
//! - [`Db::stat`] — collects a one-shot snapshot of header + catalog
//!   summary for `obj stat`.
//! - [`Db::dump_raw`] — type-erased streaming walk of a named
//!   collection's primary B-tree for `obj dump`.
//!
//! These methods are marked **CLI-facing, may move pre-1.0** in
//! rustdoc — the typed `Db::iter_all` / `Db::collection::<T>()` API
//! is the long-term shape for user code; the helpers here exist
//! because the CLI inspects files whose `Document` types are not
//! linked into the binary.
//!
//! Power-of-ten posture:
//! - Rule 2: [`DumpIter`] is bounded by the user-supplied `--limit`
//!   (zero means unbounded — the caller's loop is responsible).
//! - Rule 3: [`DumpIter`] buffers in fixed-size chunks (`DUMP_BATCH`)
//!   so peak memory does not scale with collection size.
//! - Rule 4: every helper below is ≤ 60 lines.
//! - Rule 7: every `Result` / `Option` is matched or propagated.
//! - Rule 9: no `dyn` — `DumpIter` is concrete-typed.

use std::collections::VecDeque;
use std::ops::Bound;
use std::sync::Arc;

use obj_core::btree::BTree;
use obj_core::codec::{DocumentHeader, DOC_HEADER_SIZE};
use obj_core::pager::page::PageId;
use obj_core::pager::Pager;
use obj_core::platform::FileHandle;
use obj_core::{CollectionDescriptor, Error, Id, Result};

use crate::txn::{AttachedReadCtx, ReadTxn};
use crate::Db;

/// Refill batch for [`DumpIter`]. Same Rule-3 rationale as
/// `ITER_ALL_BATCH` — a small fixed cap so peak memory is bounded
/// regardless of the collection's size.
const DUMP_BATCH: usize = 256;

/// One drained [`DumpIter`] refill batch: the staged records, the last
/// key seen (the next resumption marker), and the count drained (a
/// count `< DUMP_BATCH` signals end-of-tree). Aliased to keep the
/// refill-helper signatures within clippy's `type_complexity` budget.
type DumpBatch = (VecDeque<Result<DumpRecord>>, Option<Vec<u8>>, usize);

/// One-shot snapshot of a database's header + catalog summary.
///
/// Returned by [`Db::stat`]. CLI-facing — may evolve pre-1.0; user
/// code should reach for the typed [`Db::iter_all`] /
/// `Db::read_transaction(|tx| tx.collection::<T>())` APIs instead.
#[derive(Debug, Clone)]
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
#[non_exhaustive]
pub struct DbStat {
    /// Format major version from page 0.
    pub format_major: u16,
    /// Format minor version from page 0.
    pub format_minor: u16,
    /// On-disk page size in bytes.
    pub page_size: u16,
    /// Total number of pages in the database, including page 0.
    pub page_count: u64,
    /// Logical file size in bytes (`page_count * page_size`). For
    /// file-backed pagers this equals the on-disk size; for memory
    /// pagers it is the in-memory backing buffer length.
    pub file_size_bytes: u64,
    /// One entry per registered collection. Order matches the
    /// catalog B-tree's natural sort (by name).
    pub collections: Vec<CollectionStat>,
}

/// Per-collection summary inside [`DbStat`].
///
/// `doc_count` and `total_payload_bytes` are computed by walking
/// the collection's primary B-tree once; the cost is O(n) in the
/// number of documents in the collection. `obj stat` calls
/// [`Db::stat`] once per `--collection` invocation; tools that
/// need realtime telemetry should NOT use this surface.
#[derive(Debug, Clone)]
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
#[non_exhaustive]
pub struct CollectionStat {
    /// User-visible name.
    pub name: String,
    /// Catalog-assigned numeric id.
    pub collection_id: u32,
    /// `Document::VERSION` of the type that last wrote to the
    /// collection.
    pub type_version: u32,
    /// Number of documents in the primary B-tree.
    pub doc_count: u64,
    /// Approximate total payload bytes — sum of every doc header's
    /// `payload_len`. Does NOT include the 16-byte per-doc header
    /// itself; per `docs/format.md` the in-file footprint is
    /// `payload_len + DOC_HEADER_SIZE` per doc, plus B+tree
    /// per-page overhead.
    pub total_payload_bytes: u64,
    /// Number of `Active` secondary indexes. `DroppedPending`
    /// descriptors are NOT counted.
    pub active_index_count: usize,
    /// Number of `DroppedPending` index descriptors (kept so the
    /// `index_id` is never reused; pages reclaimed on the next
    /// checkpoint).
    pub dropped_index_count: usize,
    /// Full secondary-index descriptors for this collection, in
    /// catalog order. Includes both `Active` and `DroppedPending`
    /// entries — `indexes.len() == active_index_count +
    /// dropped_index_count`. Surfaced for introspection tooling
    /// (e.g. the obj-py descriptor view); the count fields above
    /// remain the cheap summary.
    pub indexes: Vec<obj_core::IndexDescriptor>,
}

/// One raw record yielded by [`DumpIter`].
#[derive(Debug, Clone)]
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
pub struct DumpRecord {
    /// Primary id, decoded from the B-tree key.
    pub id: Id,
    /// Per-document header (16 bytes on disk, decoded).
    pub header: DocumentHeader,
    /// Payload bytes following the header. Type-erased; no
    /// schema-aware decode is attempted.
    pub payload: Vec<u8>,
}

impl Db {
    /// One-shot snapshot of header + catalog summary.
    ///
    /// CLI-facing — used by `obj stat`. May move pre-1.0; user code
    /// should prefer the typed [`Db::iter_all`] /
    /// `Db::read_transaction(|tx| tx.collection::<T>())` APIs.
    ///
    /// # Errors
    ///
    /// - [`Error::Busy`] if the pager mutex is poisoned or contested.
    /// - Pager / B-tree / postcard errors propagated from the
    ///   catalog walk + per-collection primary-tree walks.
    pub fn stat(&self) -> Result<DbStat> {
        let descriptors = self.collect_descriptors()?;
        let mut collections: Vec<CollectionStat> = Vec::with_capacity(descriptors.len());
        for (name, descriptor) in descriptors {
            collections.push(self.stat_one_collection(&name, &descriptor)?);
        }
        let pager = self.env.pager().lock().map_err(|_| Error::Busy {
            kind: obj_core::LockKind::WriterInProcess,
        })?;
        let (format_major, format_minor) = pager.format_version();
        let page_size = pager.page_size();
        let page_count = pager.page_count();
        let file_size_bytes = u64::from(page_size).saturating_mul(page_count);
        Ok(DbStat {
            format_major,
            format_minor,
            page_size,
            page_count,
            file_size_bytes,
            collections,
        })
    }

    /// Streaming type-erased walk of a named collection's primary
    /// B-tree. Each step yields a [`DumpRecord`]: the primary id,
    /// the per-doc header (decoded), and the raw payload bytes.
    ///
    /// `limit == 0` is treated as unbounded; the caller's iteration
    /// loop is the implicit bound. Power-of-ten Rule 2: callers
    /// who want a hard cap must impose it on their `take(...)`.
    ///
    /// CLI-facing — used by `obj dump`. The `Document` trait is
    /// NOT consulted; schema-aware decode requires a registered
    /// type and is the caller's responsibility above this layer.
    ///
    /// # Namespace dispatch (M11 #132) + snapshot isolation (#135)
    ///
    /// A bare `collection` resolves against the calling Db's own
    /// env, but BOTH the catalog lookup and the primary-tree walk go
    /// through the read txn's pinned [`obj_core::ReaderSnapshot`]
    /// (#135) — the scan is snapshot-isolated against concurrent
    /// writers exactly as the point reads (`get_via_snapshot`) and the
    /// attached path are; a writer's post-snapshot node splits / page
    /// reuse cannot surface as a spurious `Corruption`. A
    /// `"<namespace>.<tail>"` name resolves against the read-only
    /// database attached under `<namespace>`: the iterator pins one
    /// [`obj_core::ReaderSnapshot`] on the attached env for its whole
    /// lifetime and walks that env's primary tree as-of the pinned LSN
    /// — mirroring the namespace dispatch the point-read shims
    /// (`get_with_version` etc.) gained in #123, so `all()` /
    /// `query.fetch()` over an attachment see the same documents the
    /// namespaced point reads do.
    ///
    /// # Errors
    ///
    /// - [`Error::CollectionNotFound`] if `collection` is not
    ///   registered AT THE SNAPSHOT'S PINNED LSN.
    /// - [`Error::CollectionNamespaceUnknown`] if `collection` carries
    ///   a namespace prefix that is not attached on this handle.
    /// - As [`Db::read_transaction`] (construction-time).
    pub fn dump_raw(&self, collection: &str, limit: usize) -> Result<DumpIter<'_>> {
        // The calling-env read txn pins the local file's reader lock for
        // the iterator's lifetime; it backs the `'db` borrow even on the
        // namespaced path (where the walk reads the attached env instead).
        let inner = obj_core::ReadTxn::begin_with_timeout(&self.env, self.busy_timeout)?;
        let txn = ReadTxn::new(inner);
        let (descriptor, attached) = self.resolve_dump_target(&txn, collection)?;
        Ok(DumpIter {
            txn,
            attached,
            descriptor,
            buffer: VecDeque::new(),
            last_emitted_key: None,
            finished: false,
            limit,
            emitted: 0,
        })
    }

    /// Resolve `collection` to the `(descriptor, attached)` pair the
    /// [`DumpIter`] should walk. A bare name resolves against `txn`'s
    /// pinned [`obj_core::ReaderSnapshot`] via
    /// [`obj_core::Catalog::lookup_via_snapshot`] (`attached == None`,
    /// walked locally) — #135: this yields a `primary_root` consistent
    /// with the snapshot the scan reads pages through, so a concurrent
    /// writer's freelist-recycled catalog page cannot surface as a
    /// spurious `Corruption`. A `"<ns>.<tail>"` name pins a snapshot on
    /// the attached env and resolves `<tail>` against THAT snapshot's
    /// catalog, so the iterator is consistent with the namespaced
    /// point-read shims.
    fn resolve_dump_target(
        &self,
        txn: &ReadTxn<'_>,
        collection: &str,
    ) -> Result<(CollectionDescriptor, Option<AttachedReadCtx>)> {
        let (namespace, tail) = crate::db::split_namespace(collection);
        let Some(ns) = namespace else {
            let pager = self.env.pager().lock().map_err(|_| Error::Busy {
                kind: obj_core::LockKind::WriterInProcess,
            })?;
            let descriptor = obj_core::Catalog::<FileHandle>::lookup_via_snapshot(
                &pager,
                txn.inner.snapshot(),
                collection,
            )?
            .ok_or_else(|| Error::CollectionNotFound {
                name: collection.to_owned(),
            })?;
            return Ok((descriptor, None));
        };
        let ctx = self.pin_attached_ctx(ns)?;
        let descriptor = {
            let pager = ctx.env.pager().lock().map_err(|_| Error::Busy {
                kind: obj_core::LockKind::WriterInProcess,
            })?;
            obj_core::Catalog::<FileHandle>::lookup_via_snapshot(&pager, &ctx.snapshot, tail)?
                .ok_or_else(|| Error::CollectionNotFound {
                    name: collection.to_owned(),
                })?
        };
        Ok((descriptor, Some(ctx)))
    }

    /// Walk the catalog and return `(name, descriptor)` pairs. Held
    /// across the per-collection stats walk so the catalog mutex is
    /// not held simultaneously with the pager mutex inside
    /// [`Self::stat_one_collection`].
    fn collect_descriptors(&self) -> Result<Vec<(String, CollectionDescriptor)>> {
        let mut pager = self.env.pager().lock().map_err(|_| Error::Busy {
            kind: obj_core::LockKind::WriterInProcess,
        })?;
        let catalog = self.catalog.lock().map_err(|_| Error::Busy {
            kind: obj_core::LockKind::WriterInProcess,
        })?;
        catalog.list_collections(&mut pager)
    }

    /// Walk one collection's primary B-tree, counting docs and
    /// summing payload bytes. Returns the populated
    /// [`CollectionStat`].
    fn stat_one_collection(
        &self,
        name: &str,
        descriptor: &CollectionDescriptor,
    ) -> Result<CollectionStat> {
        let (doc_count, total_payload_bytes) = self.walk_primary_for_stat(name)?;
        let active_index_count = descriptor
            .indexes
            .iter()
            .filter(|d| d.status == obj_core::IndexStatus::Active)
            .count();
        let dropped_index_count = descriptor
            .indexes
            .iter()
            .filter(|d| d.status == obj_core::IndexStatus::DroppedPending)
            .count();
        Ok(CollectionStat {
            name: name.to_owned(),
            collection_id: descriptor.collection_id,
            type_version: descriptor.type_version,
            doc_count,
            total_payload_bytes,
            active_index_count,
            dropped_index_count,
            // Catalog order, Active + DroppedPending — clone the full
            // descriptor list so introspection callers see the same
            // ordering the count fields summarise.
            indexes: descriptor.indexes.clone(),
        })
    }

    /// Streaming walk of one primary tree. Returns `(doc_count,
    /// total_payload_bytes)`. Power-of-ten Rule 2: bounded by
    /// `obj_core::catalog::MAX_COLLECTIONS * <doc cap>` — the B-tree
    /// range iterator does NOT carry a cap of its own, but the
    /// loop count is bounded by the on-disk doc count and so is
    /// itself bounded by the (finite) file size. We add an
    /// explicit overflow check on the running counters.
    ///
    /// The walk pins a [`obj_core::ReaderSnapshot`], re-resolves the
    /// collection's `primary_root` against THAT snapshot's catalog
    /// (via [`obj_core::Catalog::lookup_via_snapshot`]), and iterates
    /// the tree via [`BTree::range_via_snapshot`] (#135) so `Db::stat`
    /// is isolated from concurrent writers' node splits/merges and page
    /// reuse. Pinning a snapshot but walking the live root (or the live
    /// tree) would spuriously decode-error mid-scan. The snapshot is
    /// pinned at or after the live-catalog read that produced `name`, so
    /// the collection is guaranteed present at the pinned LSN.
    fn walk_primary_for_stat(&self, name: &str) -> Result<(u64, u64)> {
        let mut pager = self.env.pager().lock().map_err(|_| Error::Busy {
            kind: obj_core::LockKind::WriterInProcess,
        })?;
        let snapshot = pager.reader_snapshot()?;
        let descriptor =
            obj_core::Catalog::<FileHandle>::lookup_via_snapshot(&pager, &snapshot, name)?
                .ok_or_else(|| Error::CollectionNotFound {
                    name: name.to_owned(),
                })?;
        let root_pid = PageId::new(descriptor.primary_root)
            .ok_or(Error::InvalidArgument("collection primary_root is zero"))?;
        let iter = BTree::<FileHandle>::range_via_snapshot(&pager, &snapshot, root_pid, ..)?;
        let mut doc_count: u64 = 0;
        let mut total: u64 = 0;
        for step in iter {
            let (_key, value) = step?;
            let header = DocumentHeader::read_from(&value)?;
            doc_count = doc_count
                .checked_add(1)
                .ok_or(Error::BTreeInvariantViolated {
                    reason: "stat doc-count overflow",
                })?;
            total = total.checked_add(u64::from(header.payload_len)).ok_or(
                Error::BTreeInvariantViolated {
                    reason: "stat payload-byte sum overflow",
                },
            )?;
        }
        Ok((doc_count, total))
    }
}

/// Streaming iterator returned by [`Db::dump_raw`].
///
/// Holds a [`ReadTxn`] for its lifetime so the snapshot pin keeps
/// the catalog and primary B-tree stable across refills. Yields
/// `Result<DumpRecord>` one entry at a time; per-step errors do
/// NOT terminate iteration — the caller decides whether to
/// continue. Construction errors surface at the [`Db::dump_raw`]
/// call site.
pub struct DumpIter<'db> {
    txn: ReadTxn<'db>,
    /// `Some(_)` for a namespaced (`"<ns>.<tail>"`) scan: the iterator
    /// walks this attached env's primary tree as-of the pinned snapshot
    /// (M11 #132). `None` for a bare-name scan, which walks the calling
    /// Db's primary tree as-of `txn`'s pinned reader snapshot (#135) —
    /// snapshot-isolated against concurrent writers, exactly as the
    /// attached path is.
    attached: Option<AttachedReadCtx>,
    descriptor: CollectionDescriptor,
    buffer: VecDeque<Result<DumpRecord>>,
    last_emitted_key: Option<Vec<u8>>,
    finished: bool,
    limit: usize,
    emitted: usize,
}

impl std::fmt::Debug for DumpIter<'_> {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        f.debug_struct("DumpIter")
            .field("collection_id", &self.descriptor.collection_id)
            .field("buffer_len", &self.buffer.len())
            .field("finished", &self.finished)
            .field("limit", &self.limit)
            .field("emitted", &self.emitted)
            .finish_non_exhaustive()
    }
}

impl DumpIter<'_> {
    /// Refill the internal buffer with up to [`DUMP_BATCH`] entries.
    /// Resumption marker is `Excluded(last_emitted_key)` — identical
    /// to the [`crate::IterAll`] refill pattern.
    ///
    /// Dispatches on [`Self::attached`]: a namespaced scan walks the
    /// attached env's primary tree as-of its pinned snapshot
    /// ([`Self::refill_attached`], #132); a bare-name scan walks the
    /// calling Db's primary tree as-of the dump txn's pinned
    /// [`obj_core::ReaderSnapshot`] ([`Self::refill_local`], #135) —
    /// the same snapshot-isolation guarantee the attached path and the
    /// point reads (`get_via_snapshot`) already enjoy, so concurrent
    /// writes cannot mutate the tree out from under the scan.
    fn refill(&mut self) -> Result<()> {
        let start = match &self.last_emitted_key {
            Some(k) => Bound::Excluded(k.clone()),
            None => Bound::Unbounded,
        };
        let primary_root = self.descriptor.primary_root;
        let (batch, last_key, drained) = match &self.attached {
            None => Self::refill_local(
                self.txn.inner.env(),
                self.txn.inner.snapshot(),
                primary_root,
                start,
            ),
            Some(ctx) => Self::refill_attached(ctx, primary_root, start),
        }?;
        if drained < DUMP_BATCH {
            self.finished = true;
        }
        self.buffer.extend(batch);
        if let Some(k) = last_key {
            self.last_emitted_key = Some(k);
        }
        Ok(())
    }

    /// Drain up to [`DUMP_BATCH`] entries from the calling Db's primary
    /// tree (bare-name scan), pinned to the dump txn's
    /// [`obj_core::ReaderSnapshot`] via
    /// [`BTree::range_via_snapshot`] (#135). Walking the snapshot rather
    /// than the live tree (`BTree::range`) keeps the scan isolated from
    /// concurrent writers' node splits/merges and page reuse — the same
    /// guarantee as [`Self::refill_attached`] and the point reads.
    fn refill_local(
        env: &obj_core::TxnEnv<FileHandle>,
        snapshot: &obj_core::ReaderSnapshot<FileHandle>,
        primary_root: u64,
        start: Bound<Vec<u8>>,
    ) -> Result<DumpBatch> {
        let pager_arc: Arc<std::sync::Mutex<Pager<FileHandle>>> = Arc::clone(env.pager());
        let pager = pager_arc.lock().map_err(|_| Error::Busy {
            kind: obj_core::LockKind::WriterInProcess,
        })?;
        let root_pid = PageId::new(primary_root)
            .ok_or(Error::InvalidArgument("collection primary_root is zero"))?;
        let iter = BTree::<FileHandle>::range_via_snapshot(
            &pager,
            snapshot,
            root_pid,
            (start, Bound::Unbounded),
        )?;
        drain_dump_batch(iter)
    }

    /// Drain up to [`DUMP_BATCH`] entries from an attached env's primary
    /// tree, pinned to the attachment's snapshot (#132 namespaced scan).
    fn refill_attached(
        ctx: &AttachedReadCtx,
        primary_root: u64,
        start: Bound<Vec<u8>>,
    ) -> Result<DumpBatch> {
        let pager = ctx.env.pager().lock().map_err(|_| Error::Busy {
            kind: obj_core::LockKind::WriterInProcess,
        })?;
        let root_pid = PageId::new(primary_root)
            .ok_or(Error::InvalidArgument("collection primary_root is zero"))?;
        let iter = BTree::<FileHandle>::range_via_snapshot(
            &pager,
            &ctx.snapshot,
            root_pid,
            (start, Bound::Unbounded),
        )?;
        drain_dump_batch(iter)
    }
}

/// Drain at most [`DUMP_BATCH`] entries from a primary-tree range
/// iterator into a staged batch. Returns `(batch, last_key, drained)`
/// where `drained < DUMP_BATCH` signals end-of-tree to the caller.
///
/// Generic over the concrete range-iterator type (Rule 9: a single
/// static bound, no `dyn`) so the local and attached snapshot walks
/// (both `BTree::range_via_snapshot`, #135) share one body.
/// Bounded by `DUMP_BATCH` (Rule 2); the overflow check guards the
/// running counter (Rule 7).
fn drain_dump_batch<I>(iter: I) -> Result<DumpBatch>
where
    I: Iterator<Item = Result<(Vec<u8>, Vec<u8>)>>,
{
    let mut yielded: usize = 0;
    let mut last_key: Option<Vec<u8>> = None;
    let mut batch: VecDeque<Result<DumpRecord>> = VecDeque::with_capacity(DUMP_BATCH);
    for step in iter {
        if yielded >= DUMP_BATCH {
            break;
        }
        yielded = yielded
            .checked_add(1)
            .ok_or(Error::BTreeInvariantViolated {
                reason: "dump_raw batch counter overflow",
            })?;
        buffer_one_dump_entry(&mut batch, &mut last_key, step);
    }
    Ok((batch, last_key, yielded))
}

/// Process one B-tree iterator step into the staged `batch` for
/// [`DumpIter::refill`]. Errors are captured as `Err` entries so
/// they surface via `next()` rather than aborting the refill —
/// power-of-ten Rule 7.
fn buffer_one_dump_entry(
    batch: &mut VecDeque<Result<DumpRecord>>,
    last_key: &mut Option<Vec<u8>>,
    step: Result<(Vec<u8>, Vec<u8>)>,
) {
    let (key, value) = match step {
        Ok(kv) => kv,
        Err(e) => {
            batch.push_back(Err(e));
            return;
        }
    };
    let Some(id) = Id::from_be_bytes(&key) else {
        batch.push_back(Err(Error::InvalidArgument(
            "primary B-tree key is not an Id",
        )));
        return;
    };
    *last_key = Some(key);
    let header = match DocumentHeader::read_from(&value) {
        Ok(h) => h,
        Err(e) => {
            batch.push_back(Err(e));
            return;
        }
    };
    let payload = value
        .get(DOC_HEADER_SIZE..)
        .map(<[u8]>::to_vec)
        .unwrap_or_default();
    batch.push_back(Ok(DumpRecord {
        id,
        header,
        payload,
    }));
}

impl Iterator for DumpIter<'_> {
    type Item = Result<DumpRecord>;

    fn next(&mut self) -> Option<Self::Item> {
        if self.limit != 0 && self.emitted >= self.limit {
            return None;
        }
        if let Some(item) = self.buffer.pop_front() {
            self.emitted = self.emitted.saturating_add(1);
            return Some(item);
        }
        if self.finished {
            return None;
        }
        if let Err(e) = self.refill() {
            self.finished = true;
            return Some(Err(e));
        }
        let item = self.buffer.pop_front()?;
        self.emitted = self.emitted.saturating_add(1);
        Some(item)
    }
}