Skip to main content

obj/
cli.rs

1//! CLI-facing public hooks on [`Db`].
2//!
3//! Surface area used by the M12 `obj-cli` binary that does not fit
4//! cleanly into the typed read/write transaction API:
5//!
6//! - [`Db::stat`] — collects a one-shot snapshot of header + catalog
7//!   summary for `obj stat`.
8//! - [`Db::dump_raw`] — type-erased streaming walk of a named
9//!   collection's primary B-tree for `obj dump`.
10//!
11//! These methods are marked **CLI-facing, may move pre-1.0** in
12//! rustdoc — the typed `Db::iter_all` / `Db::collection::<T>()` API
13//! is the long-term shape for user code; the helpers here exist
14//! because the CLI inspects files whose `Document` types are not
15//! linked into the binary.
16//!
17//! Power-of-ten posture:
18//! - Rule 2: [`DumpIter`] is bounded by the user-supplied `--limit`
19//!   (zero means unbounded — the caller's loop is responsible).
20//! - Rule 3: [`DumpIter`] buffers in fixed-size chunks (`DUMP_BATCH`)
21//!   so peak memory does not scale with collection size.
22//! - Rule 4: every helper below is ≤ 60 lines.
23//! - Rule 7: every `Result` / `Option` is matched or propagated.
24//! - Rule 9: no `dyn` — `DumpIter` is concrete-typed.
25
26use std::collections::VecDeque;
27use std::ops::Bound;
28use std::sync::Arc;
29
30use obj_core::btree::BTree;
31use obj_core::codec::{DocumentHeader, DOC_HEADER_SIZE};
32use obj_core::pager::page::PageId;
33use obj_core::pager::Pager;
34use obj_core::platform::FileHandle;
35use obj_core::{CollectionDescriptor, Error, Id, Result};
36
37use crate::txn::ReadTxn;
38use crate::Db;
39
40/// Refill batch for [`DumpIter`]. Same Rule-3 rationale as
41/// `ITER_ALL_BATCH` — a small fixed cap so peak memory is bounded
42/// regardless of the collection's size.
43const DUMP_BATCH: usize = 256;
44
45/// One-shot snapshot of a database's header + catalog summary.
46///
47/// Returned by [`Db::stat`]. CLI-facing — may evolve pre-1.0; user
48/// code should reach for the typed [`Db::iter_all`] /
49/// `Db::read_transaction(|tx| tx.collection::<T>())` APIs instead.
50#[derive(Debug, Clone)]
51#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
52#[non_exhaustive]
53pub struct DbStat {
54    /// Format major version from page 0.
55    pub format_major: u16,
56    /// Format minor version from page 0.
57    pub format_minor: u16,
58    /// On-disk page size in bytes.
59    pub page_size: u16,
60    /// Total number of pages in the database, including page 0.
61    pub page_count: u64,
62    /// Logical file size in bytes (`page_count * page_size`). For
63    /// file-backed pagers this equals the on-disk size; for memory
64    /// pagers it is the in-memory backing buffer length.
65    pub file_size_bytes: u64,
66    /// One entry per registered collection. Order matches the
67    /// catalog B-tree's natural sort (by name).
68    pub collections: Vec<CollectionStat>,
69}
70
71/// Per-collection summary inside [`DbStat`].
72///
73/// `doc_count` and `total_payload_bytes` are computed by walking
74/// the collection's primary B-tree once; the cost is O(n) in the
75/// number of documents in the collection. `obj stat` calls
76/// [`Db::stat`] once per `--collection` invocation; tools that
77/// need realtime telemetry should NOT use this surface.
78#[derive(Debug, Clone)]
79#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
80#[non_exhaustive]
81pub struct CollectionStat {
82    /// User-visible name.
83    pub name: String,
84    /// Catalog-assigned numeric id.
85    pub collection_id: u32,
86    /// `Document::VERSION` of the type that last wrote to the
87    /// collection.
88    pub type_version: u32,
89    /// Number of documents in the primary B-tree.
90    pub doc_count: u64,
91    /// Approximate total payload bytes — sum of every doc header's
92    /// `payload_len`. Does NOT include the 16-byte per-doc header
93    /// itself; per `docs/format.md` the in-file footprint is
94    /// `payload_len + DOC_HEADER_SIZE` per doc, plus B+tree
95    /// per-page overhead.
96    pub total_payload_bytes: u64,
97    /// Number of `Active` secondary indexes. `DroppedPending`
98    /// descriptors are NOT counted.
99    pub active_index_count: usize,
100    /// Number of `DroppedPending` index descriptors (kept so the
101    /// `index_id` is never reused; pages reclaimed on the next
102    /// checkpoint).
103    pub dropped_index_count: usize,
104}
105
106/// One raw record yielded by [`DumpIter`].
107#[derive(Debug, Clone)]
108#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
109pub struct DumpRecord {
110    /// Primary id, decoded from the B-tree key.
111    pub id: Id,
112    /// Per-document header (16 bytes on disk, decoded).
113    pub header: DocumentHeader,
114    /// Payload bytes following the header. Type-erased; no
115    /// schema-aware decode is attempted.
116    pub payload: Vec<u8>,
117}
118
119impl Db {
120    /// One-shot snapshot of header + catalog summary.
121    ///
122    /// CLI-facing — used by `obj stat`. May move pre-1.0; user code
123    /// should prefer the typed [`Db::iter_all`] /
124    /// `Db::read_transaction(|tx| tx.collection::<T>())` APIs.
125    ///
126    /// # Errors
127    ///
128    /// - [`Error::Busy`] if the pager mutex is poisoned or contested.
129    /// - Pager / B-tree / postcard errors propagated from the
130    ///   catalog walk + per-collection primary-tree walks.
131    pub fn stat(&self) -> Result<DbStat> {
132        let descriptors = self.collect_descriptors()?;
133        let mut collections: Vec<CollectionStat> = Vec::with_capacity(descriptors.len());
134        for (name, descriptor) in descriptors {
135            collections.push(self.stat_one_collection(&name, &descriptor)?);
136        }
137        let pager = self.env.pager().lock().map_err(|_| Error::Busy {
138            kind: obj_core::LockKind::WriterInProcess,
139        })?;
140        let (format_major, format_minor) = pager.format_version();
141        let page_size = pager.page_size();
142        let page_count = pager.page_count();
143        let file_size_bytes = u64::from(page_size).saturating_mul(page_count);
144        Ok(DbStat {
145            format_major,
146            format_minor,
147            page_size,
148            page_count,
149            file_size_bytes,
150            collections,
151        })
152    }
153
154    /// Streaming type-erased walk of a named collection's primary
155    /// B-tree. Each step yields a [`DumpRecord`]: the primary id,
156    /// the per-doc header (decoded), and the raw payload bytes.
157    ///
158    /// `limit == 0` is treated as unbounded; the caller's iteration
159    /// loop is the implicit bound. Power-of-ten Rule 2: callers
160    /// who want a hard cap must impose it on their `take(...)`.
161    ///
162    /// CLI-facing — used by `obj dump`. The `Document` trait is
163    /// NOT consulted; schema-aware decode requires a registered
164    /// type and is the caller's responsibility above this layer.
165    ///
166    /// # Errors
167    ///
168    /// - [`Error::CollectionNotFound`] if `collection` is not
169    ///   registered AT THE SNAPSHOT'S PINNED LSN.
170    /// - As [`Db::read_transaction`] (construction-time).
171    pub fn dump_raw(&self, collection: &str, limit: usize) -> Result<DumpIter<'_>> {
172        let inner = obj_core::ReadTxn::begin_with_timeout(&self.env, self.busy_timeout)?;
173        let txn = ReadTxn::new(inner);
174        let descriptor = {
175            let mut pager = self.env.pager().lock().map_err(|_| Error::Busy {
176                kind: obj_core::LockKind::WriterInProcess,
177            })?;
178            let catalog = self.catalog.lock().map_err(|_| Error::Busy {
179                kind: obj_core::LockKind::WriterInProcess,
180            })?;
181            catalog
182                .get(&mut pager, collection)?
183                .ok_or_else(|| Error::CollectionNotFound {
184                    name: collection.to_owned(),
185                })?
186        };
187        Ok(DumpIter {
188            txn,
189            descriptor,
190            buffer: VecDeque::new(),
191            last_emitted_key: None,
192            finished: false,
193            limit,
194            emitted: 0,
195        })
196    }
197
198    /// Walk the catalog and return `(name, descriptor)` pairs. Held
199    /// across the per-collection stats walk so the catalog mutex is
200    /// not held simultaneously with the pager mutex inside
201    /// [`Self::stat_one_collection`].
202    fn collect_descriptors(&self) -> Result<Vec<(String, CollectionDescriptor)>> {
203        let mut pager = self.env.pager().lock().map_err(|_| Error::Busy {
204            kind: obj_core::LockKind::WriterInProcess,
205        })?;
206        let catalog = self.catalog.lock().map_err(|_| Error::Busy {
207            kind: obj_core::LockKind::WriterInProcess,
208        })?;
209        catalog.list_collections(&mut pager)
210    }
211
212    /// Walk one collection's primary B-tree, counting docs and
213    /// summing payload bytes. Returns the populated
214    /// [`CollectionStat`].
215    fn stat_one_collection(
216        &self,
217        name: &str,
218        descriptor: &CollectionDescriptor,
219    ) -> Result<CollectionStat> {
220        let (doc_count, total_payload_bytes) = self.walk_primary_for_stat(descriptor)?;
221        let active_index_count = descriptor
222            .indexes
223            .iter()
224            .filter(|d| d.status == obj_core::IndexStatus::Active)
225            .count();
226        let dropped_index_count = descriptor
227            .indexes
228            .iter()
229            .filter(|d| d.status == obj_core::IndexStatus::DroppedPending)
230            .count();
231        Ok(CollectionStat {
232            name: name.to_owned(),
233            collection_id: descriptor.collection_id,
234            type_version: descriptor.type_version,
235            doc_count,
236            total_payload_bytes,
237            active_index_count,
238            dropped_index_count,
239        })
240    }
241
242    /// Streaming walk of one primary tree. Returns `(doc_count,
243    /// total_payload_bytes)`. Power-of-ten Rule 2: bounded by
244    /// `obj_core::catalog::MAX_COLLECTIONS * <doc cap>` — the B-tree
245    /// range iterator does NOT carry a cap of its own, but the
246    /// loop count is bounded by the on-disk doc count and so is
247    /// itself bounded by the (finite) file size. We add an
248    /// explicit overflow check on the running counters.
249    fn walk_primary_for_stat(&self, descriptor: &CollectionDescriptor) -> Result<(u64, u64)> {
250        let mut pager = self.env.pager().lock().map_err(|_| Error::Busy {
251            kind: obj_core::LockKind::WriterInProcess,
252        })?;
253        let root_pid = PageId::new(descriptor.primary_root)
254            .ok_or(Error::InvalidArgument("collection primary_root is zero"))?;
255        let tree = BTree::<FileHandle>::open(&pager, root_pid)?;
256        let iter = tree.range(&mut pager, ..)?;
257        let mut doc_count: u64 = 0;
258        let mut total: u64 = 0;
259        for step in iter {
260            let (_key, value) = step?;
261            let header = DocumentHeader::read_from(&value)?;
262            doc_count = doc_count
263                .checked_add(1)
264                .ok_or(Error::BTreeInvariantViolated {
265                    reason: "stat doc-count overflow",
266                })?;
267            total = total.checked_add(u64::from(header.payload_len)).ok_or(
268                Error::BTreeInvariantViolated {
269                    reason: "stat payload-byte sum overflow",
270                },
271            )?;
272        }
273        Ok((doc_count, total))
274    }
275}
276
277/// Streaming iterator returned by [`Db::dump_raw`].
278///
279/// Holds a [`ReadTxn`] for its lifetime so the snapshot pin keeps
280/// the catalog and primary B-tree stable across refills. Yields
281/// `Result<DumpRecord>` one entry at a time; per-step errors do
282/// NOT terminate iteration — the caller decides whether to
283/// continue. Construction errors surface at the [`Db::dump_raw`]
284/// call site.
285pub struct DumpIter<'db> {
286    txn: ReadTxn<'db>,
287    descriptor: CollectionDescriptor,
288    buffer: VecDeque<Result<DumpRecord>>,
289    last_emitted_key: Option<Vec<u8>>,
290    finished: bool,
291    limit: usize,
292    emitted: usize,
293}
294
295impl std::fmt::Debug for DumpIter<'_> {
296    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
297        f.debug_struct("DumpIter")
298            .field("collection_id", &self.descriptor.collection_id)
299            .field("buffer_len", &self.buffer.len())
300            .field("finished", &self.finished)
301            .field("limit", &self.limit)
302            .field("emitted", &self.emitted)
303            .finish_non_exhaustive()
304    }
305}
306
307impl DumpIter<'_> {
308    /// Refill the internal buffer with up to [`DUMP_BATCH`] entries.
309    /// Resumption marker is `Excluded(last_emitted_key)` — identical
310    /// to the [`crate::IterAll`] refill pattern.
311    fn refill(&mut self) -> Result<()> {
312        let pager_arc: Arc<std::sync::Mutex<Pager<FileHandle>>> =
313            Arc::clone(self.txn.inner.env().pager());
314        let mut pager = pager_arc.lock().map_err(|_| Error::Busy {
315            kind: obj_core::LockKind::WriterInProcess,
316        })?;
317        let root_pid = PageId::new(self.descriptor.primary_root)
318            .ok_or(Error::InvalidArgument("collection primary_root is zero"))?;
319        let tree = BTree::<FileHandle>::open(&pager, root_pid)?;
320        let start = match &self.last_emitted_key {
321            Some(k) => Bound::Excluded(k.clone()),
322            None => Bound::Unbounded,
323        };
324        let iter = tree.range(&mut pager, (start, Bound::Unbounded))?;
325        let mut yielded: usize = 0;
326        let mut last_key: Option<Vec<u8>> = None;
327        let mut batch: VecDeque<Result<DumpRecord>> = VecDeque::with_capacity(DUMP_BATCH);
328        for step in iter {
329            if yielded >= DUMP_BATCH {
330                break;
331            }
332            yielded = yielded
333                .checked_add(1)
334                .ok_or(Error::BTreeInvariantViolated {
335                    reason: "dump_raw batch counter overflow",
336                })?;
337            buffer_one_dump_entry(&mut batch, &mut last_key, step);
338        }
339        if yielded < DUMP_BATCH {
340            self.finished = true;
341        }
342        drop(pager);
343        self.buffer.extend(batch);
344        if let Some(k) = last_key {
345            self.last_emitted_key = Some(k);
346        }
347        Ok(())
348    }
349}
350
351/// Process one B-tree iterator step into the staged `batch` for
352/// [`DumpIter::refill`]. Errors are captured as `Err` entries so
353/// they surface via `next()` rather than aborting the refill —
354/// power-of-ten Rule 7.
355fn buffer_one_dump_entry(
356    batch: &mut VecDeque<Result<DumpRecord>>,
357    last_key: &mut Option<Vec<u8>>,
358    step: Result<(Vec<u8>, Vec<u8>)>,
359) {
360    let (key, value) = match step {
361        Ok(kv) => kv,
362        Err(e) => {
363            batch.push_back(Err(e));
364            return;
365        }
366    };
367    let Some(id) = Id::from_be_bytes(&key) else {
368        batch.push_back(Err(Error::InvalidArgument(
369            "primary B-tree key is not an Id",
370        )));
371        return;
372    };
373    *last_key = Some(key);
374    let header = match DocumentHeader::read_from(&value) {
375        Ok(h) => h,
376        Err(e) => {
377            batch.push_back(Err(e));
378            return;
379        }
380    };
381    let payload = value
382        .get(DOC_HEADER_SIZE..)
383        .map(<[u8]>::to_vec)
384        .unwrap_or_default();
385    batch.push_back(Ok(DumpRecord {
386        id,
387        header,
388        payload,
389    }));
390}
391
392impl Iterator for DumpIter<'_> {
393    type Item = Result<DumpRecord>;
394
395    fn next(&mut self) -> Option<Self::Item> {
396        if self.limit != 0 && self.emitted >= self.limit {
397            return None;
398        }
399        if let Some(item) = self.buffer.pop_front() {
400            self.emitted = self.emitted.saturating_add(1);
401            return Some(item);
402        }
403        if self.finished {
404            return None;
405        }
406        if let Err(e) = self.refill() {
407            self.finished = true;
408            return Some(Err(e));
409        }
410        let item = self.buffer.pop_front()?;
411        self.emitted = self.emitted.saturating_add(1);
412        Some(item)
413    }
414}