use std::collections::VecDeque;
use std::ops::Bound;
use std::sync::Arc;
use obj_core::btree::BTree;
use obj_core::codec::{DocumentHeader, DOC_HEADER_SIZE};
use obj_core::pager::page::PageId;
use obj_core::pager::Pager;
use obj_core::platform::FileHandle;
use obj_core::{CollectionDescriptor, Error, Id, Result};
use crate::txn::{AttachedReadCtx, ReadTxn};
use crate::Db;
const DUMP_BATCH: usize = 256;
type DumpBatch = (VecDeque<Result<DumpRecord>>, Option<Vec<u8>>, usize);
#[derive(Debug, Clone)]
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
#[non_exhaustive]
pub struct DbStat {
pub format_major: u16,
pub format_minor: u16,
pub page_size: u16,
pub page_count: u64,
pub file_size_bytes: u64,
pub collections: Vec<CollectionStat>,
}
#[derive(Debug, Clone)]
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
#[non_exhaustive]
pub struct CollectionStat {
pub name: String,
pub collection_id: u32,
pub type_version: u32,
pub doc_count: u64,
pub total_payload_bytes: u64,
pub active_index_count: usize,
pub dropped_index_count: usize,
pub indexes: Vec<obj_core::IndexDescriptor>,
}
#[derive(Debug, Clone)]
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
pub struct DumpRecord {
pub id: Id,
pub header: DocumentHeader,
pub payload: Vec<u8>,
}
impl Db {
pub fn stat(&self) -> Result<DbStat> {
let descriptors = self.collect_descriptors()?;
let mut collections: Vec<CollectionStat> = Vec::with_capacity(descriptors.len());
for (name, descriptor) in descriptors {
collections.push(self.stat_one_collection(&name, &descriptor)?);
}
let pager = self.env.pager().lock().map_err(|_| Error::Busy {
kind: obj_core::LockKind::WriterInProcess,
})?;
let (format_major, format_minor) = pager.format_version();
let page_size = pager.page_size();
let page_count = pager.page_count();
let file_size_bytes = u64::from(page_size).saturating_mul(page_count);
Ok(DbStat {
format_major,
format_minor,
page_size,
page_count,
file_size_bytes,
collections,
})
}
pub fn dump_raw(&self, collection: &str, limit: usize) -> Result<DumpIter<'_>> {
let inner = obj_core::ReadTxn::begin_with_timeout(&self.env, self.busy_timeout)?;
let txn = ReadTxn::new(inner);
let (descriptor, attached) = self.resolve_dump_target(&txn, collection)?;
Ok(DumpIter {
txn,
attached,
descriptor,
buffer: VecDeque::new(),
last_emitted_key: None,
finished: false,
limit,
emitted: 0,
})
}
fn resolve_dump_target(
&self,
txn: &ReadTxn<'_>,
collection: &str,
) -> Result<(CollectionDescriptor, Option<AttachedReadCtx>)> {
let (namespace, tail) = crate::db::split_namespace(collection);
let Some(ns) = namespace else {
let pager = self.env.pager().lock().map_err(|_| Error::Busy {
kind: obj_core::LockKind::WriterInProcess,
})?;
let descriptor = obj_core::Catalog::<FileHandle>::lookup_via_snapshot(
&pager,
txn.inner.snapshot(),
collection,
)?
.ok_or_else(|| Error::CollectionNotFound {
name: collection.to_owned(),
})?;
return Ok((descriptor, None));
};
let ctx = self.pin_attached_ctx(ns)?;
let descriptor = {
let pager = ctx.env.pager().lock().map_err(|_| Error::Busy {
kind: obj_core::LockKind::WriterInProcess,
})?;
obj_core::Catalog::<FileHandle>::lookup_via_snapshot(&pager, &ctx.snapshot, tail)?
.ok_or_else(|| Error::CollectionNotFound {
name: collection.to_owned(),
})?
};
Ok((descriptor, Some(ctx)))
}
fn collect_descriptors(&self) -> Result<Vec<(String, CollectionDescriptor)>> {
let mut pager = self.env.pager().lock().map_err(|_| Error::Busy {
kind: obj_core::LockKind::WriterInProcess,
})?;
let catalog = self.catalog.lock().map_err(|_| Error::Busy {
kind: obj_core::LockKind::WriterInProcess,
})?;
catalog.list_collections(&mut pager)
}
fn stat_one_collection(
&self,
name: &str,
descriptor: &CollectionDescriptor,
) -> Result<CollectionStat> {
let (doc_count, total_payload_bytes) = self.walk_primary_for_stat(name)?;
let active_index_count = descriptor
.indexes
.iter()
.filter(|d| d.status == obj_core::IndexStatus::Active)
.count();
let dropped_index_count = descriptor
.indexes
.iter()
.filter(|d| d.status == obj_core::IndexStatus::DroppedPending)
.count();
Ok(CollectionStat {
name: name.to_owned(),
collection_id: descriptor.collection_id,
type_version: descriptor.type_version,
doc_count,
total_payload_bytes,
active_index_count,
dropped_index_count,
indexes: descriptor.indexes.clone(),
})
}
fn walk_primary_for_stat(&self, name: &str) -> Result<(u64, u64)> {
let mut pager = self.env.pager().lock().map_err(|_| Error::Busy {
kind: obj_core::LockKind::WriterInProcess,
})?;
let snapshot = pager.reader_snapshot()?;
let descriptor =
obj_core::Catalog::<FileHandle>::lookup_via_snapshot(&pager, &snapshot, name)?
.ok_or_else(|| Error::CollectionNotFound {
name: name.to_owned(),
})?;
let root_pid = PageId::new(descriptor.primary_root)
.ok_or(Error::InvalidArgument("collection primary_root is zero"))?;
let iter = BTree::<FileHandle>::range_via_snapshot(&pager, &snapshot, root_pid, ..)?;
let mut doc_count: u64 = 0;
let mut total: u64 = 0;
for step in iter {
let (_key, value) = step?;
let header = DocumentHeader::read_from(&value)?;
doc_count = doc_count
.checked_add(1)
.ok_or(Error::BTreeInvariantViolated {
reason: "stat doc-count overflow",
})?;
total = total.checked_add(u64::from(header.payload_len)).ok_or(
Error::BTreeInvariantViolated {
reason: "stat payload-byte sum overflow",
},
)?;
}
Ok((doc_count, total))
}
}
pub struct DumpIter<'db> {
txn: ReadTxn<'db>,
attached: Option<AttachedReadCtx>,
descriptor: CollectionDescriptor,
buffer: VecDeque<Result<DumpRecord>>,
last_emitted_key: Option<Vec<u8>>,
finished: bool,
limit: usize,
emitted: usize,
}
impl std::fmt::Debug for DumpIter<'_> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("DumpIter")
.field("collection_id", &self.descriptor.collection_id)
.field("buffer_len", &self.buffer.len())
.field("finished", &self.finished)
.field("limit", &self.limit)
.field("emitted", &self.emitted)
.finish_non_exhaustive()
}
}
impl DumpIter<'_> {
fn refill(&mut self) -> Result<()> {
let start = match &self.last_emitted_key {
Some(k) => Bound::Excluded(k.clone()),
None => Bound::Unbounded,
};
let primary_root = self.descriptor.primary_root;
let (batch, last_key, drained) = match &self.attached {
None => Self::refill_local(
self.txn.inner.env(),
self.txn.inner.snapshot(),
primary_root,
start,
),
Some(ctx) => Self::refill_attached(ctx, primary_root, start),
}?;
if drained < DUMP_BATCH {
self.finished = true;
}
self.buffer.extend(batch);
if let Some(k) = last_key {
self.last_emitted_key = Some(k);
}
Ok(())
}
fn refill_local(
env: &obj_core::TxnEnv<FileHandle>,
snapshot: &obj_core::ReaderSnapshot<FileHandle>,
primary_root: u64,
start: Bound<Vec<u8>>,
) -> Result<DumpBatch> {
let pager_arc: Arc<std::sync::Mutex<Pager<FileHandle>>> = Arc::clone(env.pager());
let pager = pager_arc.lock().map_err(|_| Error::Busy {
kind: obj_core::LockKind::WriterInProcess,
})?;
let root_pid = PageId::new(primary_root)
.ok_or(Error::InvalidArgument("collection primary_root is zero"))?;
let iter = BTree::<FileHandle>::range_via_snapshot(
&pager,
snapshot,
root_pid,
(start, Bound::Unbounded),
)?;
drain_dump_batch(iter)
}
fn refill_attached(
ctx: &AttachedReadCtx,
primary_root: u64,
start: Bound<Vec<u8>>,
) -> Result<DumpBatch> {
let pager = ctx.env.pager().lock().map_err(|_| Error::Busy {
kind: obj_core::LockKind::WriterInProcess,
})?;
let root_pid = PageId::new(primary_root)
.ok_or(Error::InvalidArgument("collection primary_root is zero"))?;
let iter = BTree::<FileHandle>::range_via_snapshot(
&pager,
&ctx.snapshot,
root_pid,
(start, Bound::Unbounded),
)?;
drain_dump_batch(iter)
}
}
fn drain_dump_batch<I>(iter: I) -> Result<DumpBatch>
where
I: Iterator<Item = Result<(Vec<u8>, Vec<u8>)>>,
{
let mut yielded: usize = 0;
let mut last_key: Option<Vec<u8>> = None;
let mut batch: VecDeque<Result<DumpRecord>> = VecDeque::with_capacity(DUMP_BATCH);
for step in iter {
if yielded >= DUMP_BATCH {
break;
}
yielded = yielded
.checked_add(1)
.ok_or(Error::BTreeInvariantViolated {
reason: "dump_raw batch counter overflow",
})?;
buffer_one_dump_entry(&mut batch, &mut last_key, step);
}
Ok((batch, last_key, yielded))
}
fn buffer_one_dump_entry(
batch: &mut VecDeque<Result<DumpRecord>>,
last_key: &mut Option<Vec<u8>>,
step: Result<(Vec<u8>, Vec<u8>)>,
) {
let (key, value) = match step {
Ok(kv) => kv,
Err(e) => {
batch.push_back(Err(e));
return;
}
};
let Some(id) = Id::from_be_bytes(&key) else {
batch.push_back(Err(Error::InvalidArgument(
"primary B-tree key is not an Id",
)));
return;
};
*last_key = Some(key);
let header = match DocumentHeader::read_from(&value) {
Ok(h) => h,
Err(e) => {
batch.push_back(Err(e));
return;
}
};
let payload = value
.get(DOC_HEADER_SIZE..)
.map(<[u8]>::to_vec)
.unwrap_or_default();
batch.push_back(Ok(DumpRecord {
id,
header,
payload,
}));
}
impl Iterator for DumpIter<'_> {
type Item = Result<DumpRecord>;
fn next(&mut self) -> Option<Self::Item> {
if self.limit != 0 && self.emitted >= self.limit {
return None;
}
if let Some(item) = self.buffer.pop_front() {
self.emitted = self.emitted.saturating_add(1);
return Some(item);
}
if self.finished {
return None;
}
if let Err(e) = self.refill() {
self.finished = true;
return Some(Err(e));
}
let item = self.buffer.pop_front()?;
self.emitted = self.emitted.saturating_add(1);
Some(item)
}
}