use std::collections::VecDeque;
use std::ops::Bound;
use std::sync::Arc;
use obj_core::btree::BTree;
use obj_core::codec::{DocumentHeader, DOC_HEADER_SIZE};
use obj_core::pager::page::PageId;
use obj_core::pager::Pager;
use obj_core::platform::FileHandle;
use obj_core::{CollectionDescriptor, Error, Id, Result};
use crate::txn::ReadTxn;
use crate::Db;
const DUMP_BATCH: usize = 256;
#[derive(Debug, Clone)]
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
#[non_exhaustive]
pub struct DbStat {
pub format_major: u16,
pub format_minor: u16,
pub page_size: u16,
pub page_count: u64,
pub file_size_bytes: u64,
pub collections: Vec<CollectionStat>,
}
#[derive(Debug, Clone)]
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
#[non_exhaustive]
pub struct CollectionStat {
pub name: String,
pub collection_id: u32,
pub type_version: u32,
pub doc_count: u64,
pub total_payload_bytes: u64,
pub active_index_count: usize,
pub dropped_index_count: usize,
}
#[derive(Debug, Clone)]
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
pub struct DumpRecord {
pub id: Id,
pub header: DocumentHeader,
pub payload: Vec<u8>,
}
impl Db {
pub fn stat(&self) -> Result<DbStat> {
let descriptors = self.collect_descriptors()?;
let mut collections: Vec<CollectionStat> = Vec::with_capacity(descriptors.len());
for (name, descriptor) in descriptors {
collections.push(self.stat_one_collection(&name, &descriptor)?);
}
let pager = self.env.pager().lock().map_err(|_| Error::Busy {
kind: obj_core::LockKind::WriterInProcess,
})?;
let (format_major, format_minor) = pager.format_version();
let page_size = pager.page_size();
let page_count = pager.page_count();
let file_size_bytes = u64::from(page_size).saturating_mul(page_count);
Ok(DbStat {
format_major,
format_minor,
page_size,
page_count,
file_size_bytes,
collections,
})
}
pub fn dump_raw(&self, collection: &str, limit: usize) -> Result<DumpIter<'_>> {
let inner = obj_core::ReadTxn::begin_with_timeout(&self.env, self.busy_timeout)?;
let txn = ReadTxn::new(inner);
let descriptor = {
let mut pager = self.env.pager().lock().map_err(|_| Error::Busy {
kind: obj_core::LockKind::WriterInProcess,
})?;
let catalog = self.catalog.lock().map_err(|_| Error::Busy {
kind: obj_core::LockKind::WriterInProcess,
})?;
catalog
.get(&mut pager, collection)?
.ok_or_else(|| Error::CollectionNotFound {
name: collection.to_owned(),
})?
};
Ok(DumpIter {
txn,
descriptor,
buffer: VecDeque::new(),
last_emitted_key: None,
finished: false,
limit,
emitted: 0,
})
}
fn collect_descriptors(&self) -> Result<Vec<(String, CollectionDescriptor)>> {
let mut pager = self.env.pager().lock().map_err(|_| Error::Busy {
kind: obj_core::LockKind::WriterInProcess,
})?;
let catalog = self.catalog.lock().map_err(|_| Error::Busy {
kind: obj_core::LockKind::WriterInProcess,
})?;
catalog.list_collections(&mut pager)
}
fn stat_one_collection(
&self,
name: &str,
descriptor: &CollectionDescriptor,
) -> Result<CollectionStat> {
let (doc_count, total_payload_bytes) = self.walk_primary_for_stat(descriptor)?;
let active_index_count = descriptor
.indexes
.iter()
.filter(|d| d.status == obj_core::IndexStatus::Active)
.count();
let dropped_index_count = descriptor
.indexes
.iter()
.filter(|d| d.status == obj_core::IndexStatus::DroppedPending)
.count();
Ok(CollectionStat {
name: name.to_owned(),
collection_id: descriptor.collection_id,
type_version: descriptor.type_version,
doc_count,
total_payload_bytes,
active_index_count,
dropped_index_count,
})
}
fn walk_primary_for_stat(&self, descriptor: &CollectionDescriptor) -> Result<(u64, u64)> {
let mut pager = self.env.pager().lock().map_err(|_| Error::Busy {
kind: obj_core::LockKind::WriterInProcess,
})?;
let root_pid = PageId::new(descriptor.primary_root)
.ok_or(Error::InvalidArgument("collection primary_root is zero"))?;
let tree = BTree::<FileHandle>::open(&pager, root_pid)?;
let iter = tree.range(&mut pager, ..)?;
let mut doc_count: u64 = 0;
let mut total: u64 = 0;
for step in iter {
let (_key, value) = step?;
let header = DocumentHeader::read_from(&value)?;
doc_count = doc_count
.checked_add(1)
.ok_or(Error::BTreeInvariantViolated {
reason: "stat doc-count overflow",
})?;
total = total.checked_add(u64::from(header.payload_len)).ok_or(
Error::BTreeInvariantViolated {
reason: "stat payload-byte sum overflow",
},
)?;
}
Ok((doc_count, total))
}
}
pub struct DumpIter<'db> {
txn: ReadTxn<'db>,
descriptor: CollectionDescriptor,
buffer: VecDeque<Result<DumpRecord>>,
last_emitted_key: Option<Vec<u8>>,
finished: bool,
limit: usize,
emitted: usize,
}
impl std::fmt::Debug for DumpIter<'_> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("DumpIter")
.field("collection_id", &self.descriptor.collection_id)
.field("buffer_len", &self.buffer.len())
.field("finished", &self.finished)
.field("limit", &self.limit)
.field("emitted", &self.emitted)
.finish_non_exhaustive()
}
}
impl DumpIter<'_> {
fn refill(&mut self) -> Result<()> {
let pager_arc: Arc<std::sync::Mutex<Pager<FileHandle>>> =
Arc::clone(self.txn.inner.env().pager());
let mut pager = pager_arc.lock().map_err(|_| Error::Busy {
kind: obj_core::LockKind::WriterInProcess,
})?;
let root_pid = PageId::new(self.descriptor.primary_root)
.ok_or(Error::InvalidArgument("collection primary_root is zero"))?;
let tree = BTree::<FileHandle>::open(&pager, root_pid)?;
let start = match &self.last_emitted_key {
Some(k) => Bound::Excluded(k.clone()),
None => Bound::Unbounded,
};
let iter = tree.range(&mut pager, (start, Bound::Unbounded))?;
let mut yielded: usize = 0;
let mut last_key: Option<Vec<u8>> = None;
let mut batch: VecDeque<Result<DumpRecord>> = VecDeque::with_capacity(DUMP_BATCH);
for step in iter {
if yielded >= DUMP_BATCH {
break;
}
yielded = yielded
.checked_add(1)
.ok_or(Error::BTreeInvariantViolated {
reason: "dump_raw batch counter overflow",
})?;
buffer_one_dump_entry(&mut batch, &mut last_key, step);
}
if yielded < DUMP_BATCH {
self.finished = true;
}
drop(pager);
self.buffer.extend(batch);
if let Some(k) = last_key {
self.last_emitted_key = Some(k);
}
Ok(())
}
}
fn buffer_one_dump_entry(
batch: &mut VecDeque<Result<DumpRecord>>,
last_key: &mut Option<Vec<u8>>,
step: Result<(Vec<u8>, Vec<u8>)>,
) {
let (key, value) = match step {
Ok(kv) => kv,
Err(e) => {
batch.push_back(Err(e));
return;
}
};
let Some(id) = Id::from_be_bytes(&key) else {
batch.push_back(Err(Error::InvalidArgument(
"primary B-tree key is not an Id",
)));
return;
};
*last_key = Some(key);
let header = match DocumentHeader::read_from(&value) {
Ok(h) => h,
Err(e) => {
batch.push_back(Err(e));
return;
}
};
let payload = value
.get(DOC_HEADER_SIZE..)
.map(<[u8]>::to_vec)
.unwrap_or_default();
batch.push_back(Ok(DumpRecord {
id,
header,
payload,
}));
}
impl Iterator for DumpIter<'_> {
type Item = Result<DumpRecord>;
fn next(&mut self) -> Option<Self::Item> {
if self.limit != 0 && self.emitted >= self.limit {
return None;
}
if let Some(item) = self.buffer.pop_front() {
self.emitted = self.emitted.saturating_add(1);
return Some(item);
}
if self.finished {
return None;
}
if let Err(e) = self.refill() {
self.finished = true;
return Some(Err(e));
}
let item = self.buffer.pop_front()?;
self.emitted = self.emitted.saturating_add(1);
Some(item)
}
}