obj/cli.rs
1//! CLI-facing public hooks on [`Db`].
2//!
3//! Surface area used by the M12 `obj-cli` binary that does not fit
4//! cleanly into the typed read/write transaction API:
5//!
6//! - [`Db::stat`] — collects a one-shot snapshot of header + catalog
7//! summary for `obj stat`.
8//! - [`Db::dump_raw`] — type-erased streaming walk of a named
9//! collection's primary B-tree for `obj dump`.
10//!
11//! These methods are marked **CLI-facing, may move pre-1.0** in
12//! rustdoc — the typed `Db::iter_all` / `Db::collection::<T>()` API
13//! is the long-term shape for user code; the helpers here exist
14//! because the CLI inspects files whose `Document` types are not
15//! linked into the binary.
16//!
17//! Power-of-ten posture:
18//! - Rule 2: [`DumpIter`] is bounded by the user-supplied `--limit`
19//! (zero means unbounded — the caller's loop is responsible).
20//! - Rule 3: [`DumpIter`] buffers in fixed-size chunks (`DUMP_BATCH`)
21//! so peak memory does not scale with collection size.
22//! - Rule 4: every helper below is ≤ 60 lines.
23//! - Rule 7: every `Result` / `Option` is matched or propagated.
24//! - Rule 9: no `dyn` — `DumpIter` is concrete-typed.
25
26use std::collections::VecDeque;
27use std::ops::Bound;
28use std::sync::Arc;
29
30use obj_core::btree::BTree;
31use obj_core::codec::{DocumentHeader, DOC_HEADER_SIZE};
32use obj_core::pager::page::PageId;
33use obj_core::pager::Pager;
34use obj_core::platform::FileHandle;
35use obj_core::{CollectionDescriptor, Error, Id, Result};
36
37use crate::txn::{AttachedReadCtx, ReadTxn};
38use crate::Db;
39
40/// Refill batch for [`DumpIter`]. Same Rule-3 rationale as
41/// `ITER_ALL_BATCH` — a small fixed cap so peak memory is bounded
42/// regardless of the collection's size.
43const DUMP_BATCH: usize = 256;
44
45/// One drained [`DumpIter`] refill batch: the staged records, the last
46/// key seen (the next resumption marker), and the count drained (a
47/// count `< DUMP_BATCH` signals end-of-tree). Aliased to keep the
48/// refill-helper signatures within clippy's `type_complexity` budget.
49type DumpBatch = (VecDeque<Result<DumpRecord>>, Option<Vec<u8>>, usize);
50
51/// One-shot snapshot of a database's header + catalog summary.
52///
53/// Returned by [`Db::stat`]. CLI-facing — may evolve pre-1.0; user
54/// code should reach for the typed [`Db::iter_all`] /
55/// `Db::read_transaction(|tx| tx.collection::<T>())` APIs instead.
56#[derive(Debug, Clone)]
57#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
58#[non_exhaustive]
59pub struct DbStat {
60 /// Format major version from page 0.
61 pub format_major: u16,
62 /// Format minor version from page 0.
63 pub format_minor: u16,
64 /// On-disk page size in bytes.
65 pub page_size: u16,
66 /// Total number of pages in the database, including page 0.
67 pub page_count: u64,
68 /// Logical file size in bytes (`page_count * page_size`). For
69 /// file-backed pagers this equals the on-disk size; for memory
70 /// pagers it is the in-memory backing buffer length.
71 pub file_size_bytes: u64,
72 /// One entry per registered collection. Order matches the
73 /// catalog B-tree's natural sort (by name).
74 pub collections: Vec<CollectionStat>,
75}
76
77/// Per-collection summary inside [`DbStat`].
78///
79/// `doc_count` and `total_payload_bytes` are computed by walking
80/// the collection's primary B-tree once; the cost is O(n) in the
81/// number of documents in the collection. `obj stat` calls
82/// [`Db::stat`] once per `--collection` invocation; tools that
83/// need realtime telemetry should NOT use this surface.
84#[derive(Debug, Clone)]
85#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
86#[non_exhaustive]
87pub struct CollectionStat {
88 /// User-visible name.
89 pub name: String,
90 /// Catalog-assigned numeric id.
91 pub collection_id: u32,
92 /// `Document::VERSION` of the type that last wrote to the
93 /// collection.
94 pub type_version: u32,
95 /// Number of documents in the primary B-tree.
96 pub doc_count: u64,
97 /// Approximate total payload bytes — sum of every doc header's
98 /// `payload_len`. Does NOT include the 16-byte per-doc header
99 /// itself; per `docs/format.md` the in-file footprint is
100 /// `payload_len + DOC_HEADER_SIZE` per doc, plus B+tree
101 /// per-page overhead.
102 pub total_payload_bytes: u64,
103 /// Number of `Active` secondary indexes. `DroppedPending`
104 /// descriptors are NOT counted.
105 pub active_index_count: usize,
106 /// Number of `DroppedPending` index descriptors (kept so the
107 /// `index_id` is never reused; pages reclaimed on the next
108 /// checkpoint).
109 pub dropped_index_count: usize,
110 /// Full secondary-index descriptors for this collection, in
111 /// catalog order. Includes both `Active` and `DroppedPending`
112 /// entries — `indexes.len() == active_index_count +
113 /// dropped_index_count`. Surfaced for introspection tooling
114 /// (e.g. the obj-py descriptor view); the count fields above
115 /// remain the cheap summary.
116 pub indexes: Vec<obj_core::IndexDescriptor>,
117}
118
119/// One raw record yielded by [`DumpIter`].
120#[derive(Debug, Clone)]
121#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
122pub struct DumpRecord {
123 /// Primary id, decoded from the B-tree key.
124 pub id: Id,
125 /// Per-document header (16 bytes on disk, decoded).
126 pub header: DocumentHeader,
127 /// Payload bytes following the header. Type-erased; no
128 /// schema-aware decode is attempted.
129 pub payload: Vec<u8>,
130}
131
132impl Db {
133 /// One-shot snapshot of header + catalog summary.
134 ///
135 /// CLI-facing — used by `obj stat`. May move pre-1.0; user code
136 /// should prefer the typed [`Db::iter_all`] /
137 /// `Db::read_transaction(|tx| tx.collection::<T>())` APIs.
138 ///
139 /// # Errors
140 ///
141 /// - [`Error::Busy`] if the pager mutex is poisoned or contested.
142 /// - Pager / B-tree / postcard errors propagated from the
143 /// catalog walk + per-collection primary-tree walks.
144 pub fn stat(&self) -> Result<DbStat> {
145 let descriptors = self.collect_descriptors()?;
146 let mut collections: Vec<CollectionStat> = Vec::with_capacity(descriptors.len());
147 for (name, descriptor) in descriptors {
148 collections.push(self.stat_one_collection(&name, &descriptor)?);
149 }
150 let pager = self.env.pager().lock().map_err(|_| Error::Busy {
151 kind: obj_core::LockKind::WriterInProcess,
152 })?;
153 let (format_major, format_minor) = pager.format_version();
154 let page_size = pager.page_size();
155 let page_count = pager.page_count();
156 let file_size_bytes = u64::from(page_size).saturating_mul(page_count);
157 Ok(DbStat {
158 format_major,
159 format_minor,
160 page_size,
161 page_count,
162 file_size_bytes,
163 collections,
164 })
165 }
166
167 /// Streaming type-erased walk of a named collection's primary
168 /// B-tree. Each step yields a [`DumpRecord`]: the primary id,
169 /// the per-doc header (decoded), and the raw payload bytes.
170 ///
171 /// `limit == 0` is treated as unbounded; the caller's iteration
172 /// loop is the implicit bound. Power-of-ten Rule 2: callers
173 /// who want a hard cap must impose it on their `take(...)`.
174 ///
175 /// CLI-facing — used by `obj dump`. The `Document` trait is
176 /// NOT consulted; schema-aware decode requires a registered
177 /// type and is the caller's responsibility above this layer.
178 ///
179 /// # Namespace dispatch (M11 #132) + snapshot isolation (#135)
180 ///
181 /// A bare `collection` resolves against the calling Db's own
182 /// env, but BOTH the catalog lookup and the primary-tree walk go
183 /// through the read txn's pinned [`obj_core::ReaderSnapshot`]
184 /// (#135) — the scan is snapshot-isolated against concurrent
185 /// writers exactly as the point reads (`get_via_snapshot`) and the
186 /// attached path are; a writer's post-snapshot node splits / page
187 /// reuse cannot surface as a spurious `Corruption`. A
188 /// `"<namespace>.<tail>"` name resolves against the read-only
189 /// database attached under `<namespace>`: the iterator pins one
190 /// [`obj_core::ReaderSnapshot`] on the attached env for its whole
191 /// lifetime and walks that env's primary tree as-of the pinned LSN
192 /// — mirroring the namespace dispatch the point-read shims
193 /// (`get_with_version` etc.) gained in #123, so `all()` /
194 /// `query.fetch()` over an attachment see the same documents the
195 /// namespaced point reads do.
196 ///
197 /// # Errors
198 ///
199 /// - [`Error::CollectionNotFound`] if `collection` is not
200 /// registered AT THE SNAPSHOT'S PINNED LSN.
201 /// - [`Error::CollectionNamespaceUnknown`] if `collection` carries
202 /// a namespace prefix that is not attached on this handle.
203 /// - As [`Db::read_transaction`] (construction-time).
204 pub fn dump_raw(&self, collection: &str, limit: usize) -> Result<DumpIter<'_>> {
205 // The calling-env read txn pins the local file's reader lock for
206 // the iterator's lifetime; it backs the `'db` borrow even on the
207 // namespaced path (where the walk reads the attached env instead).
208 let inner = obj_core::ReadTxn::begin_with_timeout(&self.env, self.busy_timeout)?;
209 let txn = ReadTxn::new(inner);
210 let (descriptor, attached) = self.resolve_dump_target(&txn, collection)?;
211 Ok(DumpIter {
212 txn,
213 attached,
214 descriptor,
215 buffer: VecDeque::new(),
216 last_emitted_key: None,
217 finished: false,
218 limit,
219 emitted: 0,
220 })
221 }
222
223 /// Resolve `collection` to the `(descriptor, attached)` pair the
224 /// [`DumpIter`] should walk. A bare name resolves against `txn`'s
225 /// pinned [`obj_core::ReaderSnapshot`] via
226 /// [`obj_core::Catalog::lookup_via_snapshot`] (`attached == None`,
227 /// walked locally) — #135: this yields a `primary_root` consistent
228 /// with the snapshot the scan reads pages through, so a concurrent
229 /// writer's freelist-recycled catalog page cannot surface as a
230 /// spurious `Corruption`. A `"<ns>.<tail>"` name pins a snapshot on
231 /// the attached env and resolves `<tail>` against THAT snapshot's
232 /// catalog, so the iterator is consistent with the namespaced
233 /// point-read shims.
234 fn resolve_dump_target(
235 &self,
236 txn: &ReadTxn<'_>,
237 collection: &str,
238 ) -> Result<(CollectionDescriptor, Option<AttachedReadCtx>)> {
239 let (namespace, tail) = crate::db::split_namespace(collection);
240 let Some(ns) = namespace else {
241 let pager = self.env.pager().lock().map_err(|_| Error::Busy {
242 kind: obj_core::LockKind::WriterInProcess,
243 })?;
244 let descriptor = obj_core::Catalog::<FileHandle>::lookup_via_snapshot(
245 &pager,
246 txn.inner.snapshot(),
247 collection,
248 )?
249 .ok_or_else(|| Error::CollectionNotFound {
250 name: collection.to_owned(),
251 })?;
252 return Ok((descriptor, None));
253 };
254 let ctx = self.pin_attached_ctx(ns)?;
255 let descriptor = {
256 let pager = ctx.env.pager().lock().map_err(|_| Error::Busy {
257 kind: obj_core::LockKind::WriterInProcess,
258 })?;
259 obj_core::Catalog::<FileHandle>::lookup_via_snapshot(&pager, &ctx.snapshot, tail)?
260 .ok_or_else(|| Error::CollectionNotFound {
261 name: collection.to_owned(),
262 })?
263 };
264 Ok((descriptor, Some(ctx)))
265 }
266
267 /// Walk the catalog and return `(name, descriptor)` pairs. Held
268 /// across the per-collection stats walk so the catalog mutex is
269 /// not held simultaneously with the pager mutex inside
270 /// [`Self::stat_one_collection`].
271 fn collect_descriptors(&self) -> Result<Vec<(String, CollectionDescriptor)>> {
272 let mut pager = self.env.pager().lock().map_err(|_| Error::Busy {
273 kind: obj_core::LockKind::WriterInProcess,
274 })?;
275 let catalog = self.catalog.lock().map_err(|_| Error::Busy {
276 kind: obj_core::LockKind::WriterInProcess,
277 })?;
278 catalog.list_collections(&mut pager)
279 }
280
281 /// Walk one collection's primary B-tree, counting docs and
282 /// summing payload bytes. Returns the populated
283 /// [`CollectionStat`].
284 fn stat_one_collection(
285 &self,
286 name: &str,
287 descriptor: &CollectionDescriptor,
288 ) -> Result<CollectionStat> {
289 let (doc_count, total_payload_bytes) = self.walk_primary_for_stat(name)?;
290 let active_index_count = descriptor
291 .indexes
292 .iter()
293 .filter(|d| d.status == obj_core::IndexStatus::Active)
294 .count();
295 let dropped_index_count = descriptor
296 .indexes
297 .iter()
298 .filter(|d| d.status == obj_core::IndexStatus::DroppedPending)
299 .count();
300 Ok(CollectionStat {
301 name: name.to_owned(),
302 collection_id: descriptor.collection_id,
303 type_version: descriptor.type_version,
304 doc_count,
305 total_payload_bytes,
306 active_index_count,
307 dropped_index_count,
308 // Catalog order, Active + DroppedPending — clone the full
309 // descriptor list so introspection callers see the same
310 // ordering the count fields summarise.
311 indexes: descriptor.indexes.clone(),
312 })
313 }
314
315 /// Streaming walk of one primary tree. Returns `(doc_count,
316 /// total_payload_bytes)`. Power-of-ten Rule 2: bounded by
317 /// `obj_core::catalog::MAX_COLLECTIONS * <doc cap>` — the B-tree
318 /// range iterator does NOT carry a cap of its own, but the
319 /// loop count is bounded by the on-disk doc count and so is
320 /// itself bounded by the (finite) file size. We add an
321 /// explicit overflow check on the running counters.
322 ///
323 /// The walk pins a [`obj_core::ReaderSnapshot`], re-resolves the
324 /// collection's `primary_root` against THAT snapshot's catalog
325 /// (via [`obj_core::Catalog::lookup_via_snapshot`]), and iterates
326 /// the tree via [`BTree::range_via_snapshot`] (#135) so `Db::stat`
327 /// is isolated from concurrent writers' node splits/merges and page
328 /// reuse. Pinning a snapshot but walking the live root (or the live
329 /// tree) would spuriously decode-error mid-scan. The snapshot is
330 /// pinned at or after the live-catalog read that produced `name`, so
331 /// the collection is guaranteed present at the pinned LSN.
332 fn walk_primary_for_stat(&self, name: &str) -> Result<(u64, u64)> {
333 let mut pager = self.env.pager().lock().map_err(|_| Error::Busy {
334 kind: obj_core::LockKind::WriterInProcess,
335 })?;
336 let snapshot = pager.reader_snapshot()?;
337 let descriptor =
338 obj_core::Catalog::<FileHandle>::lookup_via_snapshot(&pager, &snapshot, name)?
339 .ok_or_else(|| Error::CollectionNotFound {
340 name: name.to_owned(),
341 })?;
342 let root_pid = PageId::new(descriptor.primary_root)
343 .ok_or(Error::InvalidArgument("collection primary_root is zero"))?;
344 let iter = BTree::<FileHandle>::range_via_snapshot(&pager, &snapshot, root_pid, ..)?;
345 let mut doc_count: u64 = 0;
346 let mut total: u64 = 0;
347 for step in iter {
348 let (_key, value) = step?;
349 let header = DocumentHeader::read_from(&value)?;
350 doc_count = doc_count
351 .checked_add(1)
352 .ok_or(Error::BTreeInvariantViolated {
353 reason: "stat doc-count overflow",
354 })?;
355 total = total.checked_add(u64::from(header.payload_len)).ok_or(
356 Error::BTreeInvariantViolated {
357 reason: "stat payload-byte sum overflow",
358 },
359 )?;
360 }
361 Ok((doc_count, total))
362 }
363}
364
365/// Streaming iterator returned by [`Db::dump_raw`].
366///
367/// Holds a [`ReadTxn`] for its lifetime so the snapshot pin keeps
368/// the catalog and primary B-tree stable across refills. Yields
369/// `Result<DumpRecord>` one entry at a time; per-step errors do
370/// NOT terminate iteration — the caller decides whether to
371/// continue. Construction errors surface at the [`Db::dump_raw`]
372/// call site.
373pub struct DumpIter<'db> {
374 txn: ReadTxn<'db>,
375 /// `Some(_)` for a namespaced (`"<ns>.<tail>"`) scan: the iterator
376 /// walks this attached env's primary tree as-of the pinned snapshot
377 /// (M11 #132). `None` for a bare-name scan, which walks the calling
378 /// Db's primary tree as-of `txn`'s pinned reader snapshot (#135) —
379 /// snapshot-isolated against concurrent writers, exactly as the
380 /// attached path is.
381 attached: Option<AttachedReadCtx>,
382 descriptor: CollectionDescriptor,
383 buffer: VecDeque<Result<DumpRecord>>,
384 last_emitted_key: Option<Vec<u8>>,
385 finished: bool,
386 limit: usize,
387 emitted: usize,
388}
389
390impl std::fmt::Debug for DumpIter<'_> {
391 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
392 f.debug_struct("DumpIter")
393 .field("collection_id", &self.descriptor.collection_id)
394 .field("buffer_len", &self.buffer.len())
395 .field("finished", &self.finished)
396 .field("limit", &self.limit)
397 .field("emitted", &self.emitted)
398 .finish_non_exhaustive()
399 }
400}
401
402impl DumpIter<'_> {
403 /// Refill the internal buffer with up to [`DUMP_BATCH`] entries.
404 /// Resumption marker is `Excluded(last_emitted_key)` — identical
405 /// to the [`crate::IterAll`] refill pattern.
406 ///
407 /// Dispatches on [`Self::attached`]: a namespaced scan walks the
408 /// attached env's primary tree as-of its pinned snapshot
409 /// ([`Self::refill_attached`], #132); a bare-name scan walks the
410 /// calling Db's primary tree as-of the dump txn's pinned
411 /// [`obj_core::ReaderSnapshot`] ([`Self::refill_local`], #135) —
412 /// the same snapshot-isolation guarantee the attached path and the
413 /// point reads (`get_via_snapshot`) already enjoy, so concurrent
414 /// writes cannot mutate the tree out from under the scan.
415 fn refill(&mut self) -> Result<()> {
416 let start = match &self.last_emitted_key {
417 Some(k) => Bound::Excluded(k.clone()),
418 None => Bound::Unbounded,
419 };
420 let primary_root = self.descriptor.primary_root;
421 let (batch, last_key, drained) = match &self.attached {
422 None => Self::refill_local(
423 self.txn.inner.env(),
424 self.txn.inner.snapshot(),
425 primary_root,
426 start,
427 ),
428 Some(ctx) => Self::refill_attached(ctx, primary_root, start),
429 }?;
430 if drained < DUMP_BATCH {
431 self.finished = true;
432 }
433 self.buffer.extend(batch);
434 if let Some(k) = last_key {
435 self.last_emitted_key = Some(k);
436 }
437 Ok(())
438 }
439
440 /// Drain up to [`DUMP_BATCH`] entries from the calling Db's primary
441 /// tree (bare-name scan), pinned to the dump txn's
442 /// [`obj_core::ReaderSnapshot`] via
443 /// [`BTree::range_via_snapshot`] (#135). Walking the snapshot rather
444 /// than the live tree (`BTree::range`) keeps the scan isolated from
445 /// concurrent writers' node splits/merges and page reuse — the same
446 /// guarantee as [`Self::refill_attached`] and the point reads.
447 fn refill_local(
448 env: &obj_core::TxnEnv<FileHandle>,
449 snapshot: &obj_core::ReaderSnapshot<FileHandle>,
450 primary_root: u64,
451 start: Bound<Vec<u8>>,
452 ) -> Result<DumpBatch> {
453 let pager_arc: Arc<std::sync::Mutex<Pager<FileHandle>>> = Arc::clone(env.pager());
454 let pager = pager_arc.lock().map_err(|_| Error::Busy {
455 kind: obj_core::LockKind::WriterInProcess,
456 })?;
457 let root_pid = PageId::new(primary_root)
458 .ok_or(Error::InvalidArgument("collection primary_root is zero"))?;
459 let iter = BTree::<FileHandle>::range_via_snapshot(
460 &pager,
461 snapshot,
462 root_pid,
463 (start, Bound::Unbounded),
464 )?;
465 drain_dump_batch(iter)
466 }
467
468 /// Drain up to [`DUMP_BATCH`] entries from an attached env's primary
469 /// tree, pinned to the attachment's snapshot (#132 namespaced scan).
470 fn refill_attached(
471 ctx: &AttachedReadCtx,
472 primary_root: u64,
473 start: Bound<Vec<u8>>,
474 ) -> Result<DumpBatch> {
475 let pager = ctx.env.pager().lock().map_err(|_| Error::Busy {
476 kind: obj_core::LockKind::WriterInProcess,
477 })?;
478 let root_pid = PageId::new(primary_root)
479 .ok_or(Error::InvalidArgument("collection primary_root is zero"))?;
480 let iter = BTree::<FileHandle>::range_via_snapshot(
481 &pager,
482 &ctx.snapshot,
483 root_pid,
484 (start, Bound::Unbounded),
485 )?;
486 drain_dump_batch(iter)
487 }
488}
489
490/// Drain at most [`DUMP_BATCH`] entries from a primary-tree range
491/// iterator into a staged batch. Returns `(batch, last_key, drained)`
492/// where `drained < DUMP_BATCH` signals end-of-tree to the caller.
493///
494/// Generic over the concrete range-iterator type (Rule 9: a single
495/// static bound, no `dyn`) so the local and attached snapshot walks
496/// (both `BTree::range_via_snapshot`, #135) share one body.
497/// Bounded by `DUMP_BATCH` (Rule 2); the overflow check guards the
498/// running counter (Rule 7).
499fn drain_dump_batch<I>(iter: I) -> Result<DumpBatch>
500where
501 I: Iterator<Item = Result<(Vec<u8>, Vec<u8>)>>,
502{
503 let mut yielded: usize = 0;
504 let mut last_key: Option<Vec<u8>> = None;
505 let mut batch: VecDeque<Result<DumpRecord>> = VecDeque::with_capacity(DUMP_BATCH);
506 for step in iter {
507 if yielded >= DUMP_BATCH {
508 break;
509 }
510 yielded = yielded
511 .checked_add(1)
512 .ok_or(Error::BTreeInvariantViolated {
513 reason: "dump_raw batch counter overflow",
514 })?;
515 buffer_one_dump_entry(&mut batch, &mut last_key, step);
516 }
517 Ok((batch, last_key, yielded))
518}
519
520/// Process one B-tree iterator step into the staged `batch` for
521/// [`DumpIter::refill`]. Errors are captured as `Err` entries so
522/// they surface via `next()` rather than aborting the refill —
523/// power-of-ten Rule 7.
524fn buffer_one_dump_entry(
525 batch: &mut VecDeque<Result<DumpRecord>>,
526 last_key: &mut Option<Vec<u8>>,
527 step: Result<(Vec<u8>, Vec<u8>)>,
528) {
529 let (key, value) = match step {
530 Ok(kv) => kv,
531 Err(e) => {
532 batch.push_back(Err(e));
533 return;
534 }
535 };
536 let Some(id) = Id::from_be_bytes(&key) else {
537 batch.push_back(Err(Error::InvalidArgument(
538 "primary B-tree key is not an Id",
539 )));
540 return;
541 };
542 *last_key = Some(key);
543 let header = match DocumentHeader::read_from(&value) {
544 Ok(h) => h,
545 Err(e) => {
546 batch.push_back(Err(e));
547 return;
548 }
549 };
550 let payload = value
551 .get(DOC_HEADER_SIZE..)
552 .map(<[u8]>::to_vec)
553 .unwrap_or_default();
554 batch.push_back(Ok(DumpRecord {
555 id,
556 header,
557 payload,
558 }));
559}
560
561impl Iterator for DumpIter<'_> {
562 type Item = Result<DumpRecord>;
563
564 fn next(&mut self) -> Option<Self::Item> {
565 if self.limit != 0 && self.emitted >= self.limit {
566 return None;
567 }
568 if let Some(item) = self.buffer.pop_front() {
569 self.emitted = self.emitted.saturating_add(1);
570 return Some(item);
571 }
572 if self.finished {
573 return None;
574 }
575 if let Err(e) = self.refill() {
576 self.finished = true;
577 return Some(Err(e));
578 }
579 let item = self.buffer.pop_front()?;
580 self.emitted = self.emitted.saturating_add(1);
581 Some(item)
582 }
583}