obj-db 1.0.2

Embedded document database. Stable file format, full ACID, single-file portability.
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
//! CLI-facing public hooks on [`Db`].
//!
//! Surface area used by the M12 `obj-cli` binary that does not fit
//! cleanly into the typed read/write transaction API:
//!
//! - [`Db::stat`] — collects a one-shot snapshot of header + catalog
//!   summary for `obj stat`.
//! - [`Db::dump_raw`] — type-erased streaming walk of a named
//!   collection's primary B-tree for `obj dump`.
//!
//! These methods are marked **CLI-facing, may move pre-1.0** in
//! rustdoc — the typed `Db::iter_all` / `Db::collection::<T>()` API
//! is the long-term shape for user code; the helpers here exist
//! because the CLI inspects files whose `Document` types are not
//! linked into the binary.
//!
//! Power-of-ten posture:
//! - Rule 2: [`DumpIter`] is bounded by the user-supplied `--limit`
//!   (zero means unbounded — the caller's loop is responsible).
//! - Rule 3: [`DumpIter`] buffers in fixed-size chunks (`DUMP_BATCH`)
//!   so peak memory does not scale with collection size.
//! - Rule 4: every helper below is ≤ 60 lines.
//! - Rule 7: every `Result` / `Option` is matched or propagated.
//! - Rule 9: no `dyn` — `DumpIter` is concrete-typed.

use std::collections::VecDeque;
use std::ops::Bound;
use std::sync::Arc;

use obj_core::btree::BTree;
use obj_core::codec::{DocumentHeader, DOC_HEADER_SIZE};
use obj_core::pager::page::PageId;
use obj_core::pager::Pager;
use obj_core::platform::FileHandle;
use obj_core::{CollectionDescriptor, Error, Id, Result};

use crate::txn::ReadTxn;
use crate::Db;

/// Refill batch for [`DumpIter`]. Same Rule-3 rationale as
/// `ITER_ALL_BATCH` — a small fixed cap so peak memory is bounded
/// regardless of the collection's size.
const DUMP_BATCH: usize = 256;

/// One-shot snapshot of a database's header + catalog summary.
///
/// Returned by [`Db::stat`]. CLI-facing — may evolve pre-1.0; user
/// code should reach for the typed [`Db::iter_all`] /
/// `Db::read_transaction(|tx| tx.collection::<T>())` APIs instead.
#[derive(Debug, Clone)]
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
#[non_exhaustive]
pub struct DbStat {
    /// Format major version from page 0.
    pub format_major: u16,
    /// Format minor version from page 0.
    pub format_minor: u16,
    /// On-disk page size in bytes.
    pub page_size: u16,
    /// Total number of pages in the database, including page 0.
    pub page_count: u64,
    /// Logical file size in bytes (`page_count * page_size`). For
    /// file-backed pagers this equals the on-disk size; for memory
    /// pagers it is the in-memory backing buffer length.
    pub file_size_bytes: u64,
    /// One entry per registered collection. Order matches the
    /// catalog B-tree's natural sort (by name).
    pub collections: Vec<CollectionStat>,
}

/// Per-collection summary inside [`DbStat`].
///
/// `doc_count` and `total_payload_bytes` are computed by walking
/// the collection's primary B-tree once; the cost is O(n) in the
/// number of documents in the collection. `obj stat` calls
/// [`Db::stat`] once per `--collection` invocation; tools that
/// need realtime telemetry should NOT use this surface.
#[derive(Debug, Clone)]
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
#[non_exhaustive]
pub struct CollectionStat {
    /// User-visible name.
    pub name: String,
    /// Catalog-assigned numeric id.
    pub collection_id: u32,
    /// `Document::VERSION` of the type that last wrote to the
    /// collection.
    pub type_version: u32,
    /// Number of documents in the primary B-tree.
    pub doc_count: u64,
    /// Approximate total payload bytes — sum of every doc header's
    /// `payload_len`. Does NOT include the 16-byte per-doc header
    /// itself; per `docs/format.md` the in-file footprint is
    /// `payload_len + DOC_HEADER_SIZE` per doc, plus B+tree
    /// per-page overhead.
    pub total_payload_bytes: u64,
    /// Number of `Active` secondary indexes. `DroppedPending`
    /// descriptors are NOT counted.
    pub active_index_count: usize,
    /// Number of `DroppedPending` index descriptors (kept so the
    /// `index_id` is never reused; pages reclaimed on the next
    /// checkpoint).
    pub dropped_index_count: usize,
}

/// One raw record yielded by [`DumpIter`].
#[derive(Debug, Clone)]
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
pub struct DumpRecord {
    /// Primary id, decoded from the B-tree key.
    pub id: Id,
    /// Per-document header (16 bytes on disk, decoded).
    pub header: DocumentHeader,
    /// Payload bytes following the header. Type-erased; no
    /// schema-aware decode is attempted.
    pub payload: Vec<u8>,
}

impl Db {
    /// One-shot snapshot of header + catalog summary.
    ///
    /// CLI-facing — used by `obj stat`. May move pre-1.0; user code
    /// should prefer the typed [`Db::iter_all`] /
    /// `Db::read_transaction(|tx| tx.collection::<T>())` APIs.
    ///
    /// # Errors
    ///
    /// - [`Error::Busy`] if the pager mutex is poisoned or contested.
    /// - Pager / B-tree / postcard errors propagated from the
    ///   catalog walk + per-collection primary-tree walks.
    pub fn stat(&self) -> Result<DbStat> {
        let descriptors = self.collect_descriptors()?;
        let mut collections: Vec<CollectionStat> = Vec::with_capacity(descriptors.len());
        for (name, descriptor) in descriptors {
            collections.push(self.stat_one_collection(&name, &descriptor)?);
        }
        let pager = self.env.pager().lock().map_err(|_| Error::Busy {
            kind: obj_core::LockKind::WriterInProcess,
        })?;
        let (format_major, format_minor) = pager.format_version();
        let page_size = pager.page_size();
        let page_count = pager.page_count();
        let file_size_bytes = u64::from(page_size).saturating_mul(page_count);
        Ok(DbStat {
            format_major,
            format_minor,
            page_size,
            page_count,
            file_size_bytes,
            collections,
        })
    }

    /// Streaming type-erased walk of a named collection's primary
    /// B-tree. Each step yields a [`DumpRecord`]: the primary id,
    /// the per-doc header (decoded), and the raw payload bytes.
    ///
    /// `limit == 0` is treated as unbounded; the caller's iteration
    /// loop is the implicit bound. Power-of-ten Rule 2: callers
    /// who want a hard cap must impose it on their `take(...)`.
    ///
    /// CLI-facing — used by `obj dump`. The `Document` trait is
    /// NOT consulted; schema-aware decode requires a registered
    /// type and is the caller's responsibility above this layer.
    ///
    /// # Errors
    ///
    /// - [`Error::CollectionNotFound`] if `collection` is not
    ///   registered AT THE SNAPSHOT'S PINNED LSN.
    /// - As [`Db::read_transaction`] (construction-time).
    pub fn dump_raw(&self, collection: &str, limit: usize) -> Result<DumpIter<'_>> {
        let inner = obj_core::ReadTxn::begin_with_timeout(&self.env, self.busy_timeout)?;
        let txn = ReadTxn::new(inner);
        let descriptor = {
            let mut pager = self.env.pager().lock().map_err(|_| Error::Busy {
                kind: obj_core::LockKind::WriterInProcess,
            })?;
            let catalog = self.catalog.lock().map_err(|_| Error::Busy {
                kind: obj_core::LockKind::WriterInProcess,
            })?;
            catalog
                .get(&mut pager, collection)?
                .ok_or_else(|| Error::CollectionNotFound {
                    name: collection.to_owned(),
                })?
        };
        Ok(DumpIter {
            txn,
            descriptor,
            buffer: VecDeque::new(),
            last_emitted_key: None,
            finished: false,
            limit,
            emitted: 0,
        })
    }

    /// Walk the catalog and return `(name, descriptor)` pairs. Held
    /// across the per-collection stats walk so the catalog mutex is
    /// not held simultaneously with the pager mutex inside
    /// [`Self::stat_one_collection`].
    fn collect_descriptors(&self) -> Result<Vec<(String, CollectionDescriptor)>> {
        let mut pager = self.env.pager().lock().map_err(|_| Error::Busy {
            kind: obj_core::LockKind::WriterInProcess,
        })?;
        let catalog = self.catalog.lock().map_err(|_| Error::Busy {
            kind: obj_core::LockKind::WriterInProcess,
        })?;
        catalog.list_collections(&mut pager)
    }

    /// Walk one collection's primary B-tree, counting docs and
    /// summing payload bytes. Returns the populated
    /// [`CollectionStat`].
    fn stat_one_collection(
        &self,
        name: &str,
        descriptor: &CollectionDescriptor,
    ) -> Result<CollectionStat> {
        let (doc_count, total_payload_bytes) = self.walk_primary_for_stat(descriptor)?;
        let active_index_count = descriptor
            .indexes
            .iter()
            .filter(|d| d.status == obj_core::IndexStatus::Active)
            .count();
        let dropped_index_count = descriptor
            .indexes
            .iter()
            .filter(|d| d.status == obj_core::IndexStatus::DroppedPending)
            .count();
        Ok(CollectionStat {
            name: name.to_owned(),
            collection_id: descriptor.collection_id,
            type_version: descriptor.type_version,
            doc_count,
            total_payload_bytes,
            active_index_count,
            dropped_index_count,
        })
    }

    /// Streaming walk of one primary tree. Returns `(doc_count,
    /// total_payload_bytes)`. Power-of-ten Rule 2: bounded by
    /// `obj_core::catalog::MAX_COLLECTIONS * <doc cap>` — the B-tree
    /// range iterator does NOT carry a cap of its own, but the
    /// loop count is bounded by the on-disk doc count and so is
    /// itself bounded by the (finite) file size. We add an
    /// explicit overflow check on the running counters.
    fn walk_primary_for_stat(&self, descriptor: &CollectionDescriptor) -> Result<(u64, u64)> {
        let mut pager = self.env.pager().lock().map_err(|_| Error::Busy {
            kind: obj_core::LockKind::WriterInProcess,
        })?;
        let root_pid = PageId::new(descriptor.primary_root)
            .ok_or(Error::InvalidArgument("collection primary_root is zero"))?;
        let tree = BTree::<FileHandle>::open(&pager, root_pid)?;
        let iter = tree.range(&mut pager, ..)?;
        let mut doc_count: u64 = 0;
        let mut total: u64 = 0;
        for step in iter {
            let (_key, value) = step?;
            let header = DocumentHeader::read_from(&value)?;
            doc_count = doc_count
                .checked_add(1)
                .ok_or(Error::BTreeInvariantViolated {
                    reason: "stat doc-count overflow",
                })?;
            total = total.checked_add(u64::from(header.payload_len)).ok_or(
                Error::BTreeInvariantViolated {
                    reason: "stat payload-byte sum overflow",
                },
            )?;
        }
        Ok((doc_count, total))
    }
}

/// Streaming iterator returned by [`Db::dump_raw`].
///
/// Holds a [`ReadTxn`] for its lifetime so the snapshot pin keeps
/// the catalog and primary B-tree stable across refills. Yields
/// `Result<DumpRecord>` one entry at a time; per-step errors do
/// NOT terminate iteration — the caller decides whether to
/// continue. Construction errors surface at the [`Db::dump_raw`]
/// call site.
pub struct DumpIter<'db> {
    txn: ReadTxn<'db>,
    descriptor: CollectionDescriptor,
    buffer: VecDeque<Result<DumpRecord>>,
    last_emitted_key: Option<Vec<u8>>,
    finished: bool,
    limit: usize,
    emitted: usize,
}

impl std::fmt::Debug for DumpIter<'_> {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        f.debug_struct("DumpIter")
            .field("collection_id", &self.descriptor.collection_id)
            .field("buffer_len", &self.buffer.len())
            .field("finished", &self.finished)
            .field("limit", &self.limit)
            .field("emitted", &self.emitted)
            .finish_non_exhaustive()
    }
}

impl DumpIter<'_> {
    /// Refill the internal buffer with up to [`DUMP_BATCH`] entries.
    /// Resumption marker is `Excluded(last_emitted_key)` — identical
    /// to the [`crate::IterAll`] refill pattern.
    fn refill(&mut self) -> Result<()> {
        let pager_arc: Arc<std::sync::Mutex<Pager<FileHandle>>> =
            Arc::clone(self.txn.inner.env().pager());
        let mut pager = pager_arc.lock().map_err(|_| Error::Busy {
            kind: obj_core::LockKind::WriterInProcess,
        })?;
        let root_pid = PageId::new(self.descriptor.primary_root)
            .ok_or(Error::InvalidArgument("collection primary_root is zero"))?;
        let tree = BTree::<FileHandle>::open(&pager, root_pid)?;
        let start = match &self.last_emitted_key {
            Some(k) => Bound::Excluded(k.clone()),
            None => Bound::Unbounded,
        };
        let iter = tree.range(&mut pager, (start, Bound::Unbounded))?;
        let mut yielded: usize = 0;
        let mut last_key: Option<Vec<u8>> = None;
        let mut batch: VecDeque<Result<DumpRecord>> = VecDeque::with_capacity(DUMP_BATCH);
        for step in iter {
            if yielded >= DUMP_BATCH {
                break;
            }
            yielded = yielded
                .checked_add(1)
                .ok_or(Error::BTreeInvariantViolated {
                    reason: "dump_raw batch counter overflow",
                })?;
            buffer_one_dump_entry(&mut batch, &mut last_key, step);
        }
        if yielded < DUMP_BATCH {
            self.finished = true;
        }
        drop(pager);
        self.buffer.extend(batch);
        if let Some(k) = last_key {
            self.last_emitted_key = Some(k);
        }
        Ok(())
    }
}

/// Process one B-tree iterator step into the staged `batch` for
/// [`DumpIter::refill`]. Errors are captured as `Err` entries so
/// they surface via `next()` rather than aborting the refill —
/// power-of-ten Rule 7.
fn buffer_one_dump_entry(
    batch: &mut VecDeque<Result<DumpRecord>>,
    last_key: &mut Option<Vec<u8>>,
    step: Result<(Vec<u8>, Vec<u8>)>,
) {
    let (key, value) = match step {
        Ok(kv) => kv,
        Err(e) => {
            batch.push_back(Err(e));
            return;
        }
    };
    let Some(id) = Id::from_be_bytes(&key) else {
        batch.push_back(Err(Error::InvalidArgument(
            "primary B-tree key is not an Id",
        )));
        return;
    };
    *last_key = Some(key);
    let header = match DocumentHeader::read_from(&value) {
        Ok(h) => h,
        Err(e) => {
            batch.push_back(Err(e));
            return;
        }
    };
    let payload = value
        .get(DOC_HEADER_SIZE..)
        .map(<[u8]>::to_vec)
        .unwrap_or_default();
    batch.push_back(Ok(DumpRecord {
        id,
        header,
        payload,
    }));
}

impl Iterator for DumpIter<'_> {
    type Item = Result<DumpRecord>;

    fn next(&mut self) -> Option<Self::Item> {
        if self.limit != 0 && self.emitted >= self.limit {
            return None;
        }
        if let Some(item) = self.buffer.pop_front() {
            self.emitted = self.emitted.saturating_add(1);
            return Some(item);
        }
        if self.finished {
            return None;
        }
        if let Err(e) = self.refill() {
            self.finished = true;
            return Some(Err(e));
        }
        let item = self.buffer.pop_front()?;
        self.emitted = self.emitted.saturating_add(1);
        Some(item)
    }
}