Skip to main content

obj/
cli.rs

1//! CLI-facing public hooks on [`Db`].
2//!
3//! Surface area used by the M12 `obj-cli` binary that does not fit
4//! cleanly into the typed read/write transaction API:
5//!
6//! - [`Db::stat`] — collects a one-shot snapshot of header + catalog
7//!   summary for `obj stat`.
8//! - [`Db::dump_raw`] — type-erased streaming walk of a named
9//!   collection's primary B-tree for `obj dump`.
10//!
11//! These methods are marked **CLI-facing, may move pre-1.0** in
12//! rustdoc — the typed `Db::iter_all` / `Db::collection::<T>()` API
13//! is the long-term shape for user code; the helpers here exist
14//! because the CLI inspects files whose `Document` types are not
15//! linked into the binary.
16//!
17//! Power-of-ten posture:
18//! - Rule 2: [`DumpIter`] is bounded by the user-supplied `--limit`
19//!   (zero means unbounded — the caller's loop is responsible).
20//! - Rule 3: [`DumpIter`] buffers in fixed-size chunks (`DUMP_BATCH`)
21//!   so peak memory does not scale with collection size.
22//! - Rule 4: every helper below is ≤ 60 lines.
23//! - Rule 7: every `Result` / `Option` is matched or propagated.
24//! - Rule 9: no `dyn` — `DumpIter` is concrete-typed.
25
26use std::collections::VecDeque;
27use std::ops::Bound;
28use std::sync::Arc;
29
30use obj_core::btree::BTree;
31use obj_core::codec::{DocumentHeader, DOC_HEADER_SIZE};
32use obj_core::pager::page::PageId;
33use obj_core::pager::Pager;
34use obj_core::platform::FileHandle;
35use obj_core::{CollectionDescriptor, Error, Id, Result};
36
37use crate::txn::{AttachedReadCtx, ReadTxn};
38use crate::Db;
39
40/// Refill batch for [`DumpIter`]. Same Rule-3 rationale as
41/// `ITER_ALL_BATCH` — a small fixed cap so peak memory is bounded
42/// regardless of the collection's size.
43const DUMP_BATCH: usize = 256;
44
45/// One drained [`DumpIter`] refill batch: the staged records, the last
46/// key seen (the next resumption marker), and the count drained (a
47/// count `< DUMP_BATCH` signals end-of-tree). Aliased to keep the
48/// refill-helper signatures within clippy's `type_complexity` budget.
49type DumpBatch = (VecDeque<Result<DumpRecord>>, Option<Vec<u8>>, usize);
50
51/// One-shot snapshot of a database's header + catalog summary.
52///
53/// Returned by [`Db::stat`]. CLI-facing — may evolve pre-1.0; user
54/// code should reach for the typed [`Db::iter_all`] /
55/// `Db::read_transaction(|tx| tx.collection::<T>())` APIs instead.
56#[derive(Debug, Clone)]
57#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
58#[non_exhaustive]
59pub struct DbStat {
60    /// Format major version from page 0.
61    pub format_major: u16,
62    /// Format minor version from page 0.
63    pub format_minor: u16,
64    /// On-disk page size in bytes.
65    pub page_size: u16,
66    /// Total number of pages in the database, including page 0.
67    pub page_count: u64,
68    /// Logical file size in bytes (`page_count * page_size`). For
69    /// file-backed pagers this equals the on-disk size; for memory
70    /// pagers it is the in-memory backing buffer length.
71    pub file_size_bytes: u64,
72    /// One entry per registered collection. Order matches the
73    /// catalog B-tree's natural sort (by name).
74    pub collections: Vec<CollectionStat>,
75}
76
77/// Per-collection summary inside [`DbStat`].
78///
79/// `doc_count` and `total_payload_bytes` are computed by walking
80/// the collection's primary B-tree once; the cost is O(n) in the
81/// number of documents in the collection. `obj stat` calls
82/// [`Db::stat`] once per `--collection` invocation; tools that
83/// need realtime telemetry should NOT use this surface.
84#[derive(Debug, Clone)]
85#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
86#[non_exhaustive]
87pub struct CollectionStat {
88    /// User-visible name.
89    pub name: String,
90    /// Catalog-assigned numeric id.
91    pub collection_id: u32,
92    /// `Document::VERSION` of the type that last wrote to the
93    /// collection.
94    pub type_version: u32,
95    /// Number of documents in the primary B-tree.
96    pub doc_count: u64,
97    /// Approximate total payload bytes — sum of every doc header's
98    /// `payload_len`. Does NOT include the 16-byte per-doc header
99    /// itself; per
100    /// [`docs/format.md`](https://github.com/uname-n/obj/blob/master/docs/format.md)
101    /// the in-file footprint is
102    /// `payload_len + DOC_HEADER_SIZE` per doc, plus B+tree
103    /// per-page overhead.
104    pub total_payload_bytes: u64,
105    /// Number of `Active` secondary indexes. `DroppedPending`
106    /// descriptors are NOT counted.
107    pub active_index_count: usize,
108    /// Number of `DroppedPending` index descriptors (kept so the
109    /// `index_id` is never reused; pages reclaimed on the next
110    /// checkpoint).
111    pub dropped_index_count: usize,
112    /// Full secondary-index descriptors for this collection, in
113    /// catalog order. Includes both `Active` and `DroppedPending`
114    /// entries — `indexes.len() == active_index_count +
115    /// dropped_index_count`. Surfaced for introspection tooling
116    /// (e.g. the obj-py descriptor view); the count fields above
117    /// remain the cheap summary.
118    pub indexes: Vec<obj_core::IndexDescriptor>,
119}
120
121/// One raw record yielded by [`DumpIter`].
122#[derive(Debug, Clone)]
123#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
124pub struct DumpRecord {
125    /// Primary id, decoded from the B-tree key.
126    pub id: Id,
127    /// Per-document header (16 bytes on disk, decoded).
128    pub header: DocumentHeader,
129    /// Payload bytes following the header. Type-erased; no
130    /// schema-aware decode is attempted.
131    pub payload: Vec<u8>,
132}
133
134impl Db {
135    /// One-shot snapshot of header + catalog summary.
136    ///
137    /// CLI-facing — used by `obj stat`. May move pre-1.0; user code
138    /// should prefer the typed [`Db::iter_all`] /
139    /// `Db::read_transaction(|tx| tx.collection::<T>())` APIs.
140    ///
141    /// # Errors
142    ///
143    /// - [`Error::Busy`] if the pager mutex is poisoned or contested.
144    /// - Pager / B-tree / postcard errors propagated from the
145    ///   catalog walk + per-collection primary-tree walks.
146    pub fn stat(&self) -> Result<DbStat> {
147        let descriptors = self.collect_descriptors()?;
148        let mut collections: Vec<CollectionStat> = Vec::with_capacity(descriptors.len());
149        for (name, descriptor) in descriptors {
150            collections.push(self.stat_one_collection(&name, &descriptor)?);
151        }
152        let pager = self.env.pager().lock().map_err(|_| Error::Busy {
153            kind: obj_core::LockKind::WriterInProcess,
154        })?;
155        let (format_major, format_minor) = pager.format_version();
156        let page_size = pager.page_size();
157        let page_count = pager.page_count();
158        let file_size_bytes = u64::from(page_size).saturating_mul(page_count);
159        Ok(DbStat {
160            format_major,
161            format_minor,
162            page_size,
163            page_count,
164            file_size_bytes,
165            collections,
166        })
167    }
168
169    /// Streaming type-erased walk of a named collection's primary
170    /// B-tree. Each step yields a [`DumpRecord`]: the primary id,
171    /// the per-doc header (decoded), and the raw payload bytes.
172    ///
173    /// `limit == 0` is treated as unbounded; the caller's iteration
174    /// loop is the implicit bound. Power-of-ten Rule 2: callers
175    /// who want a hard cap must impose it on their `take(...)`.
176    ///
177    /// CLI-facing — used by `obj dump`. The `Document` trait is
178    /// NOT consulted; schema-aware decode requires a registered
179    /// type and is the caller's responsibility above this layer.
180    ///
181    /// # Namespace dispatch (M11 #132) + snapshot isolation (#135)
182    ///
183    /// A bare `collection` resolves against the calling Db's own
184    /// env, but BOTH the catalog lookup and the primary-tree walk go
185    /// through the read txn's pinned [`obj_core::ReaderSnapshot`]
186    /// (#135) — the scan is snapshot-isolated against concurrent
187    /// writers exactly as the point reads (`get_via_snapshot`) and the
188    /// attached path are; a writer's post-snapshot node splits / page
189    /// reuse cannot surface as a spurious `Corruption`. A
190    /// `"<namespace>.<tail>"` name resolves against the read-only
191    /// database attached under `<namespace>`: the iterator pins one
192    /// [`obj_core::ReaderSnapshot`] on the attached env for its whole
193    /// lifetime and walks that env's primary tree as-of the pinned LSN
194    /// — mirroring the namespace dispatch the point-read shims
195    /// (`get_with_version` etc.) gained in #123, so `all()` /
196    /// `query.fetch()` over an attachment see the same documents the
197    /// namespaced point reads do.
198    ///
199    /// # Errors
200    ///
201    /// - [`Error::CollectionNotFound`] if `collection` is not
202    ///   registered AT THE SNAPSHOT'S PINNED LSN.
203    /// - [`Error::CollectionNamespaceUnknown`] if `collection` carries
204    ///   a namespace prefix that is not attached on this handle.
205    /// - As [`Db::read_transaction`] (construction-time).
206    pub fn dump_raw(&self, collection: &str, limit: usize) -> Result<DumpIter<'_>> {
207        // The calling-env read txn pins the local file's reader lock for
208        // the iterator's lifetime; it backs the `'db` borrow even on the
209        // namespaced path (where the walk reads the attached env instead).
210        let inner = obj_core::ReadTxn::begin_with_timeout(&self.env, self.busy_timeout)?;
211        let txn = ReadTxn::new(inner);
212        let (descriptor, attached) = self.resolve_dump_target(&txn, collection)?;
213        Ok(DumpIter {
214            txn,
215            attached,
216            descriptor,
217            buffer: VecDeque::new(),
218            last_emitted_key: None,
219            finished: false,
220            limit,
221            emitted: 0,
222        })
223    }
224
225    /// Resolve `collection` to the `(descriptor, attached)` pair the
226    /// [`DumpIter`] should walk. A bare name resolves against `txn`'s
227    /// pinned [`obj_core::ReaderSnapshot`] via
228    /// [`obj_core::Catalog::lookup_via_snapshot`] (`attached == None`,
229    /// walked locally) — #135: this yields a `primary_root` consistent
230    /// with the snapshot the scan reads pages through, so a concurrent
231    /// writer's freelist-recycled catalog page cannot surface as a
232    /// spurious `Corruption`. A `"<ns>.<tail>"` name pins a snapshot on
233    /// the attached env and resolves `<tail>` against THAT snapshot's
234    /// catalog, so the iterator is consistent with the namespaced
235    /// point-read shims.
236    fn resolve_dump_target(
237        &self,
238        txn: &ReadTxn<'_>,
239        collection: &str,
240    ) -> Result<(CollectionDescriptor, Option<AttachedReadCtx>)> {
241        let (namespace, tail) = crate::db::split_namespace(collection);
242        let Some(ns) = namespace else {
243            let pager = self.env.pager().lock().map_err(|_| Error::Busy {
244                kind: obj_core::LockKind::WriterInProcess,
245            })?;
246            let descriptor = obj_core::Catalog::<FileHandle>::lookup_via_snapshot(
247                &pager,
248                txn.inner.snapshot(),
249                collection,
250            )?
251            .ok_or_else(|| Error::CollectionNotFound {
252                name: collection.to_owned(),
253            })?;
254            return Ok((descriptor, None));
255        };
256        let ctx = self.pin_attached_ctx(ns)?;
257        let descriptor = {
258            let pager = ctx.env.pager().lock().map_err(|_| Error::Busy {
259                kind: obj_core::LockKind::WriterInProcess,
260            })?;
261            obj_core::Catalog::<FileHandle>::lookup_via_snapshot(&pager, &ctx.snapshot, tail)?
262                .ok_or_else(|| Error::CollectionNotFound {
263                    name: collection.to_owned(),
264                })?
265        };
266        Ok((descriptor, Some(ctx)))
267    }
268
269    /// Walk the catalog and return `(name, descriptor)` pairs. Held
270    /// across the per-collection stats walk so the catalog mutex is
271    /// not held simultaneously with the pager mutex inside
272    /// [`Self::stat_one_collection`].
273    fn collect_descriptors(&self) -> Result<Vec<(String, CollectionDescriptor)>> {
274        let mut pager = self.env.pager().lock().map_err(|_| Error::Busy {
275            kind: obj_core::LockKind::WriterInProcess,
276        })?;
277        let catalog = self.catalog.lock().map_err(|_| Error::Busy {
278            kind: obj_core::LockKind::WriterInProcess,
279        })?;
280        catalog.list_collections(&mut pager)
281    }
282
283    /// Walk one collection's primary B-tree, counting docs and
284    /// summing payload bytes. Returns the populated
285    /// [`CollectionStat`].
286    fn stat_one_collection(
287        &self,
288        name: &str,
289        descriptor: &CollectionDescriptor,
290    ) -> Result<CollectionStat> {
291        let (doc_count, total_payload_bytes) = self.walk_primary_for_stat(name)?;
292        let active_index_count = descriptor
293            .indexes
294            .iter()
295            .filter(|d| d.status == obj_core::IndexStatus::Active)
296            .count();
297        let dropped_index_count = descriptor
298            .indexes
299            .iter()
300            .filter(|d| d.status == obj_core::IndexStatus::DroppedPending)
301            .count();
302        Ok(CollectionStat {
303            name: name.to_owned(),
304            collection_id: descriptor.collection_id,
305            type_version: descriptor.type_version,
306            doc_count,
307            total_payload_bytes,
308            active_index_count,
309            dropped_index_count,
310            // Catalog order, Active + DroppedPending — clone the full
311            // descriptor list so introspection callers see the same
312            // ordering the count fields summarise.
313            indexes: descriptor.indexes.clone(),
314        })
315    }
316
317    /// Streaming walk of one primary tree. Returns `(doc_count,
318    /// total_payload_bytes)`. Power-of-ten Rule 2: bounded by
319    /// `obj_core::catalog::MAX_COLLECTIONS * <doc cap>` — the B-tree
320    /// range iterator does NOT carry a cap of its own, but the
321    /// loop count is bounded by the on-disk doc count and so is
322    /// itself bounded by the (finite) file size. We add an
323    /// explicit overflow check on the running counters.
324    ///
325    /// The walk pins a [`obj_core::ReaderSnapshot`], re-resolves the
326    /// collection's `primary_root` against THAT snapshot's catalog
327    /// (via [`obj_core::Catalog::lookup_via_snapshot`]), and iterates
328    /// the tree via [`BTree::range_via_snapshot`] (#135) so `Db::stat`
329    /// is isolated from concurrent writers' node splits/merges and page
330    /// reuse. Pinning a snapshot but walking the live root (or the live
331    /// tree) would spuriously decode-error mid-scan. The snapshot is
332    /// pinned at or after the live-catalog read that produced `name`, so
333    /// the collection is guaranteed present at the pinned LSN.
334    fn walk_primary_for_stat(&self, name: &str) -> Result<(u64, u64)> {
335        let mut pager = self.env.pager().lock().map_err(|_| Error::Busy {
336            kind: obj_core::LockKind::WriterInProcess,
337        })?;
338        let snapshot = pager.reader_snapshot()?;
339        let descriptor =
340            obj_core::Catalog::<FileHandle>::lookup_via_snapshot(&pager, &snapshot, name)?
341                .ok_or_else(|| Error::CollectionNotFound {
342                    name: name.to_owned(),
343                })?;
344        let root_pid = PageId::new(descriptor.primary_root)
345            .ok_or(Error::InvalidArgument("collection primary_root is zero"))?;
346        let iter = BTree::<FileHandle>::range_via_snapshot(&pager, &snapshot, root_pid, ..)?;
347        let mut doc_count: u64 = 0;
348        let mut total: u64 = 0;
349        for step in iter {
350            let (_key, value) = step?;
351            let header = DocumentHeader::read_from(&value)?;
352            doc_count = doc_count
353                .checked_add(1)
354                .ok_or(Error::BTreeInvariantViolated {
355                    reason: "stat doc-count overflow",
356                })?;
357            total = total.checked_add(u64::from(header.payload_len)).ok_or(
358                Error::BTreeInvariantViolated {
359                    reason: "stat payload-byte sum overflow",
360                },
361            )?;
362        }
363        Ok((doc_count, total))
364    }
365}
366
367/// Streaming iterator returned by [`Db::dump_raw`].
368///
369/// Holds a [`ReadTxn`] for its lifetime so the snapshot pin keeps
370/// the catalog and primary B-tree stable across refills. Yields
371/// `Result<DumpRecord>` one entry at a time; per-step errors do
372/// NOT terminate iteration — the caller decides whether to
373/// continue. Construction errors surface at the [`Db::dump_raw`]
374/// call site.
375pub struct DumpIter<'db> {
376    txn: ReadTxn<'db>,
377    /// `Some(_)` for a namespaced (`"<ns>.<tail>"`) scan: the iterator
378    /// walks this attached env's primary tree as-of the pinned snapshot
379    /// (M11 #132). `None` for a bare-name scan, which walks the calling
380    /// Db's primary tree as-of `txn`'s pinned reader snapshot (#135) —
381    /// snapshot-isolated against concurrent writers, exactly as the
382    /// attached path is.
383    attached: Option<AttachedReadCtx>,
384    descriptor: CollectionDescriptor,
385    buffer: VecDeque<Result<DumpRecord>>,
386    last_emitted_key: Option<Vec<u8>>,
387    finished: bool,
388    limit: usize,
389    emitted: usize,
390}
391
392impl std::fmt::Debug for DumpIter<'_> {
393    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
394        f.debug_struct("DumpIter")
395            .field("collection_id", &self.descriptor.collection_id)
396            .field("buffer_len", &self.buffer.len())
397            .field("finished", &self.finished)
398            .field("limit", &self.limit)
399            .field("emitted", &self.emitted)
400            .finish_non_exhaustive()
401    }
402}
403
404impl DumpIter<'_> {
405    /// Refill the internal buffer with up to [`DUMP_BATCH`] entries.
406    /// Resumption marker is `Excluded(last_emitted_key)` — identical
407    /// to the [`crate::IterAll`] refill pattern.
408    ///
409    /// Dispatches on [`Self::attached`]: a namespaced scan walks the
410    /// attached env's primary tree as-of its pinned snapshot
411    /// ([`Self::refill_attached`], #132); a bare-name scan walks the
412    /// calling Db's primary tree as-of the dump txn's pinned
413    /// [`obj_core::ReaderSnapshot`] ([`Self::refill_local`], #135) —
414    /// the same snapshot-isolation guarantee the attached path and the
415    /// point reads (`get_via_snapshot`) already enjoy, so concurrent
416    /// writes cannot mutate the tree out from under the scan.
417    fn refill(&mut self) -> Result<()> {
418        let start = match &self.last_emitted_key {
419            Some(k) => Bound::Excluded(k.clone()),
420            None => Bound::Unbounded,
421        };
422        let primary_root = self.descriptor.primary_root;
423        let (batch, last_key, drained) = match &self.attached {
424            None => Self::refill_local(
425                self.txn.inner.env(),
426                self.txn.inner.snapshot(),
427                primary_root,
428                start,
429            ),
430            Some(ctx) => Self::refill_attached(ctx, primary_root, start),
431        }?;
432        if drained < DUMP_BATCH {
433            self.finished = true;
434        }
435        self.buffer.extend(batch);
436        if let Some(k) = last_key {
437            self.last_emitted_key = Some(k);
438        }
439        Ok(())
440    }
441
442    /// Drain up to [`DUMP_BATCH`] entries from the calling Db's primary
443    /// tree (bare-name scan), pinned to the dump txn's
444    /// [`obj_core::ReaderSnapshot`] via
445    /// [`BTree::range_via_snapshot`] (#135). Walking the snapshot rather
446    /// than the live tree (`BTree::range`) keeps the scan isolated from
447    /// concurrent writers' node splits/merges and page reuse — the same
448    /// guarantee as [`Self::refill_attached`] and the point reads.
449    fn refill_local(
450        env: &obj_core::TxnEnv<FileHandle>,
451        snapshot: &obj_core::ReaderSnapshot<FileHandle>,
452        primary_root: u64,
453        start: Bound<Vec<u8>>,
454    ) -> Result<DumpBatch> {
455        let pager_arc: Arc<std::sync::Mutex<Pager<FileHandle>>> = Arc::clone(env.pager());
456        let pager = pager_arc.lock().map_err(|_| Error::Busy {
457            kind: obj_core::LockKind::WriterInProcess,
458        })?;
459        let root_pid = PageId::new(primary_root)
460            .ok_or(Error::InvalidArgument("collection primary_root is zero"))?;
461        let iter = BTree::<FileHandle>::range_via_snapshot(
462            &pager,
463            snapshot,
464            root_pid,
465            (start, Bound::Unbounded),
466        )?;
467        drain_dump_batch(iter)
468    }
469
470    /// Drain up to [`DUMP_BATCH`] entries from an attached env's primary
471    /// tree, pinned to the attachment's snapshot (#132 namespaced scan).
472    fn refill_attached(
473        ctx: &AttachedReadCtx,
474        primary_root: u64,
475        start: Bound<Vec<u8>>,
476    ) -> Result<DumpBatch> {
477        let pager = ctx.env.pager().lock().map_err(|_| Error::Busy {
478            kind: obj_core::LockKind::WriterInProcess,
479        })?;
480        let root_pid = PageId::new(primary_root)
481            .ok_or(Error::InvalidArgument("collection primary_root is zero"))?;
482        let iter = BTree::<FileHandle>::range_via_snapshot(
483            &pager,
484            &ctx.snapshot,
485            root_pid,
486            (start, Bound::Unbounded),
487        )?;
488        drain_dump_batch(iter)
489    }
490}
491
492/// Drain at most [`DUMP_BATCH`] entries from a primary-tree range
493/// iterator into a staged batch. Returns `(batch, last_key, drained)`
494/// where `drained < DUMP_BATCH` signals end-of-tree to the caller.
495///
496/// Generic over the concrete range-iterator type (Rule 9: a single
497/// static bound, no `dyn`) so the local and attached snapshot walks
498/// (both `BTree::range_via_snapshot`, #135) share one body.
499/// Bounded by `DUMP_BATCH` (Rule 2); the overflow check guards the
500/// running counter (Rule 7).
501fn drain_dump_batch<I>(iter: I) -> Result<DumpBatch>
502where
503    I: Iterator<Item = Result<(Vec<u8>, Vec<u8>)>>,
504{
505    let mut yielded: usize = 0;
506    let mut last_key: Option<Vec<u8>> = None;
507    let mut batch: VecDeque<Result<DumpRecord>> = VecDeque::with_capacity(DUMP_BATCH);
508    for step in iter {
509        if yielded >= DUMP_BATCH {
510            break;
511        }
512        yielded = yielded
513            .checked_add(1)
514            .ok_or(Error::BTreeInvariantViolated {
515                reason: "dump_raw batch counter overflow",
516            })?;
517        buffer_one_dump_entry(&mut batch, &mut last_key, step);
518    }
519    Ok((batch, last_key, yielded))
520}
521
522/// Process one B-tree iterator step into the staged `batch` for
523/// [`DumpIter::refill`]. Errors are captured as `Err` entries so
524/// they surface via `next()` rather than aborting the refill —
525/// power-of-ten Rule 7.
526fn buffer_one_dump_entry(
527    batch: &mut VecDeque<Result<DumpRecord>>,
528    last_key: &mut Option<Vec<u8>>,
529    step: Result<(Vec<u8>, Vec<u8>)>,
530) {
531    let (key, value) = match step {
532        Ok(kv) => kv,
533        Err(e) => {
534            batch.push_back(Err(e));
535            return;
536        }
537    };
538    let Some(id) = Id::from_be_bytes(&key) else {
539        batch.push_back(Err(Error::InvalidArgument(
540            "primary B-tree key is not an Id",
541        )));
542        return;
543    };
544    *last_key = Some(key);
545    let header = match DocumentHeader::read_from(&value) {
546        Ok(h) => h,
547        Err(e) => {
548            batch.push_back(Err(e));
549            return;
550        }
551    };
552    let payload = value
553        .get(DOC_HEADER_SIZE..)
554        .map(<[u8]>::to_vec)
555        .unwrap_or_default();
556    batch.push_back(Ok(DumpRecord {
557        id,
558        header,
559        payload,
560    }));
561}
562
563impl Iterator for DumpIter<'_> {
564    type Item = Result<DumpRecord>;
565
566    fn next(&mut self) -> Option<Self::Item> {
567        if self.limit != 0 && self.emitted >= self.limit {
568            return None;
569        }
570        if let Some(item) = self.buffer.pop_front() {
571            self.emitted = self.emitted.saturating_add(1);
572            return Some(item);
573        }
574        if self.finished {
575            return None;
576        }
577        if let Err(e) = self.refill() {
578            self.finished = true;
579            return Some(Err(e));
580        }
581        let item = self.buffer.pop_front()?;
582        self.emitted = self.emitted.saturating_add(1);
583        Some(item)
584    }
585}