use std::collections::{HashMap, HashSet, VecDeque};
use std::marker::PhantomData;
use std::ops::Bound;
use std::path::Path;
use std::sync::{Arc, Mutex};
use std::time::Duration;
use obj_core::btree::BTree;
use obj_core::pager::page::PageId;
use obj_core::pager::{lock_path_for, Pager};
use obj_core::platform::FileHandle;
use obj_core::{Catalog, CollectionDescriptor, Document, Error, Id, Result, TxnEnv};
use crate::config::Config;
use crate::txn::{AttachedDb, ReadTxn, WriteTxn};
const ITER_ALL_BATCH: usize = 256;
pub struct Db {
pub(crate) env: Arc<TxnEnv<FileHandle>>,
pub(crate) catalog: Arc<Mutex<Catalog<FileHandle>>>,
pub(crate) readonly: bool,
pub(crate) busy_timeout: Duration,
pub(crate) reconciled: Arc<Mutex<HashSet<(String, u32)>>>,
pub(crate) attached: Arc<Mutex<HashMap<String, AttachedDb>>>,
pub(crate) attached_len: Arc<std::sync::atomic::AtomicUsize>,
}
impl std::fmt::Debug for Db {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Db")
.field("readonly", &self.readonly)
.field("busy_timeout", &self.busy_timeout)
.finish_non_exhaustive()
}
}
impl Db {
#[doc(hidden)]
#[must_use]
pub fn env_arc(&self) -> Arc<TxnEnv<FileHandle>> {
Arc::clone(&self.env)
}
#[doc(hidden)]
#[must_use]
pub fn catalog_arc(&self) -> Arc<Mutex<Catalog<FileHandle>>> {
Arc::clone(&self.catalog)
}
#[doc(hidden)]
#[must_use]
pub fn reconciled_arc(&self) -> Arc<Mutex<HashSet<(String, u32)>>> {
Arc::clone(&self.reconciled)
}
#[doc(hidden)]
#[must_use]
pub fn busy_timeout(&self) -> Duration {
self.busy_timeout
}
}
impl Db {
#[cfg_attr(
feature = "tracing",
tracing::instrument(
name = "db.open",
level = "info",
skip_all,
fields(path = %path.as_ref().display()),
)
)]
pub fn open<P: AsRef<Path>>(path: P) -> Result<Self> {
Self::open_with(path, Config::default())
}
#[allow(clippy::needless_pass_by_value)]
#[cfg_attr(
feature = "tracing",
tracing::instrument(
name = "db.open",
level = "info",
skip_all,
fields(path = %path.as_ref().display()),
)
)]
pub fn open_with<P: AsRef<Path>>(path: P, mut config: Config) -> Result<Self> {
let path_buf = path.as_ref().to_path_buf();
let pager_config = std::mem::take(&mut config.pager);
let pager = Pager::open(&path_buf, pager_config)?;
let lock_file = if config.cross_process_lock {
let lock_path = lock_path_for(&path_buf);
let handle = FileHandle::open_or_create(&lock_path)?;
handle.set_len(128)?;
Some(Arc::new(handle))
} else {
None
};
Self::from_parts(pager, lock_file, &config)
}
pub fn memory() -> Result<Self> {
Self::memory_with(Config::default())
}
#[allow(clippy::needless_pass_by_value)]
pub fn memory_with(mut config: Config) -> Result<Self> {
let pager_config = std::mem::take(&mut config.pager);
let pager = Pager::memory(pager_config)?;
Self::from_parts(pager, None, &config)
}
pub fn open_readonly<P: AsRef<Path>>(path: P) -> Result<Self> {
let config = Config {
readonly: true,
..Config::default()
};
Self::open_with(path, config)
}
fn from_parts(
mut pager: Pager<FileHandle>,
lock_file: Option<Arc<FileHandle>>,
config: &Config,
) -> Result<Self> {
pager.begin_txn();
let init = Catalog::open_or_init(&mut pager);
let catalog = match init {
Ok(c) => {
let commit_result = pager.commit();
pager.end_txn();
commit_result?;
c
}
Err(e) => {
pager.end_txn();
return Err(e);
}
};
if !config.skip_open_check {
let report = obj_core::integrity::quick_check(&mut pager)?;
if let Some(err) = first_failure_as_error(&report) {
return Err(err);
}
}
Ok(Self {
env: Arc::new(TxnEnv::new(pager, lock_file)),
catalog: Arc::new(Mutex::new(catalog)),
readonly: config.readonly,
busy_timeout: config.busy_timeout,
reconciled: Arc::new(Mutex::new(HashSet::new())),
attached: Arc::new(Mutex::new(HashMap::new())),
attached_len: Arc::new(std::sync::atomic::AtomicUsize::new(0)),
})
}
#[cfg_attr(
feature = "tracing",
tracing::instrument(name = "db.transaction", level = "info", skip_all)
)]
pub fn transaction<R, F>(&self, body: F) -> Result<R>
where
F: FnOnce(&mut WriteTxn<'_>) -> Result<R>,
{
if self.readonly {
return Err(Error::ReadOnly {
operation: "transaction",
});
}
#[cfg(feature = "tracing")]
tracing::debug!("begin");
let inner = obj_core::WriteTxn::begin(&self.env, self.busy_timeout)?;
let mut tx = WriteTxn::new(
inner,
Arc::clone(&self.catalog),
Arc::clone(&self.reconciled),
);
match body(&mut tx) {
Ok(value) => {
tx.commit()?;
#[cfg(feature = "tracing")]
tracing::debug!("commit");
Ok(value)
}
Err(e) => {
let _ = tx.rollback();
let _ = self.refresh_catalog();
#[cfg(feature = "tracing")]
tracing::debug!("rollback");
Err(e)
}
}
}
fn refresh_catalog(&self) -> Result<()> {
let mut pager = self.env.pager().lock().map_err(|_| Error::Busy {
kind: obj_core::LockKind::WriterInProcess,
})?;
let fresh = obj_core::Catalog::open_or_init(&mut pager)?;
let mut existing = self.catalog.lock().map_err(|_| Error::Busy {
kind: obj_core::LockKind::WriterInProcess,
})?;
*existing = fresh;
Ok(())
}
#[cfg_attr(
feature = "tracing",
tracing::instrument(name = "db.read_transaction", level = "info", skip_all)
)]
pub fn read_transaction<R, F>(&self, body: F) -> Result<R>
where
F: FnOnce(&ReadTxn<'_>) -> Result<R>,
{
#[cfg(feature = "tracing")]
tracing::debug!("begin");
let inner = obj_core::ReadTxn::begin_with_timeout(&self.env, self.busy_timeout)?;
let attached_contexts = self.pin_attached_snapshots()?;
let tx = ReadTxn::with_attached(inner, attached_contexts);
let result = body(&tx);
#[cfg(feature = "tracing")]
tracing::debug!("commit");
result
}
fn pin_attached_snapshots(
&self,
) -> Result<std::collections::HashMap<String, crate::txn::AttachedReadCtx>> {
if self.attached_len.load(std::sync::atomic::Ordering::Relaxed) == 0 {
return Ok(std::collections::HashMap::new());
}
let registry = self.attached.lock().map_err(|_| Error::Busy {
kind: obj_core::LockKind::WriterInProcess,
})?;
let mut out: std::collections::HashMap<String, crate::txn::AttachedReadCtx> =
std::collections::HashMap::with_capacity(registry.len());
for (namespace, attached) in registry.iter() {
let env = Arc::clone(&attached.env);
let snapshot = {
let mut pager = env.pager().lock().map_err(|_| Error::Busy {
kind: obj_core::LockKind::WriterInProcess,
})?;
pager.reader_snapshot()?
};
out.insert(
namespace.clone(),
crate::txn::AttachedReadCtx { env, snapshot },
);
}
Ok(out)
}
pub(crate) fn pin_attached_ctx(&self, namespace: &str) -> Result<crate::txn::AttachedReadCtx> {
let env = {
let registry = self.attached.lock().map_err(|_| Error::Busy {
kind: obj_core::LockKind::WriterInProcess,
})?;
let attached =
registry
.get(namespace)
.ok_or_else(|| Error::CollectionNamespaceUnknown {
namespace: namespace.to_owned(),
})?;
Arc::clone(&attached.env)
};
let snapshot = {
let mut pager = env.pager().lock().map_err(|_| Error::Busy {
kind: obj_core::LockKind::WriterInProcess,
})?;
pager.reader_snapshot()?
};
Ok(crate::txn::AttachedReadCtx { env, snapshot })
}
pub fn attach<P: AsRef<std::path::Path>>(
&mut self,
path: P,
namespace: impl Into<String>,
) -> Result<()> {
self.attach_inner(path.as_ref(), namespace.into())
}
pub fn attach_shared<P: AsRef<std::path::Path>>(
&self,
path: P,
namespace: impl Into<String>,
) -> Result<()> {
self.attach_inner(path.as_ref(), namespace.into())
}
fn attach_inner(&self, path: &std::path::Path, namespace: String) -> Result<()> {
let path_buf = path.to_path_buf();
{
let registry = self.attached.lock().map_err(|_| Error::Busy {
kind: obj_core::LockKind::WriterInProcess,
})?;
if registry.contains_key(&namespace) {
return Err(Error::AttachmentAlreadyExists { namespace });
}
}
let attached_db =
Db::open_readonly(&path_buf).map_err(|source| Error::AttachmentNotReadable {
path: path_buf.clone(),
source: Box::new(source),
})?;
let env = Arc::clone(&attached_db.env);
let mut registry = self.attached.lock().map_err(|_| Error::Busy {
kind: obj_core::LockKind::WriterInProcess,
})?;
if registry.contains_key(&namespace) {
return Err(Error::AttachmentAlreadyExists { namespace });
}
registry.insert(
namespace,
AttachedDb {
env,
_db: attached_db,
},
);
self.attached_len
.store(registry.len(), std::sync::atomic::Ordering::Relaxed);
Ok(())
}
pub fn detach(&mut self, namespace: &str) -> Result<()> {
self.detach_inner(namespace)
}
pub fn detach_shared(&self, namespace: &str) -> Result<()> {
self.detach_inner(namespace)
}
fn detach_inner(&self, namespace: &str) -> Result<()> {
let mut registry = self.attached.lock().map_err(|_| Error::Busy {
kind: obj_core::LockKind::WriterInProcess,
})?;
if registry.remove(namespace).is_none() {
return Err(Error::CollectionNamespaceUnknown {
namespace: namespace.to_owned(),
});
}
self.attached_len
.store(registry.len(), std::sync::atomic::Ordering::Relaxed);
Ok(())
}
pub fn insert<T: Document>(&self, doc: T) -> Result<Id> {
self.transaction(|tx| tx.collection::<T>()?.insert(doc))
}
pub fn get<T: Document>(&self, id: Id) -> Result<Option<T>> {
self.read_transaction(|tx| crate::collection::fused_point_get::<T>(tx, id))
}
pub fn update<T, F>(&self, id: Id, f: F) -> Result<()>
where
T: Document,
F: FnOnce(&mut T),
{
self.transaction(|tx| tx.collection::<T>()?.update(id, f))
}
pub fn delete<T: Document>(&self, id: Id) -> Result<bool> {
self.transaction(|tx| tx.collection::<T>()?.delete(id))
}
pub fn upsert<T: Document>(&self, id: Id, doc: T) -> Result<()> {
self.transaction(|tx| tx.collection::<T>()?.upsert(id, doc))
}
pub fn find_unique<T: Document>(
&self,
index_name: &str,
key: impl Into<obj_core::codec::Dynamic>,
) -> Result<Option<T>> {
self.read_transaction(|tx| tx.collection::<T>()?.find_unique(index_name, key))
}
#[must_use]
pub fn query<T: Document + Send + 'static>(&self) -> crate::Query<'_, T> {
crate::Query::new(self)
}
#[must_use]
pub fn collection<T: Document + Send + 'static>(
&self,
name: impl Into<String>,
) -> crate::Collection<'_, T> {
crate::Collection::<T>::lazy(self, name.into())
}
pub fn all<T: Document + Send + 'static>(&self) -> Result<Vec<T>> {
self.iter_all::<T>()?
.map(|step| step.map(|(_id, doc)| doc))
.collect()
}
pub fn backup_to<P: AsRef<std::path::Path>>(&self, dest: P) -> Result<()> {
let guard = obj_core::WriteTxn::begin(&self.env, self.busy_timeout)?;
let result = self.run_backup_under_guard(dest);
let unlock = guard.rollback();
result.and(unlock)
}
fn run_backup_under_guard<P: AsRef<std::path::Path>>(&self, dest: P) -> Result<()> {
let mut pager = self.env.pager().lock().map_err(|_| Error::Busy {
kind: obj_core::LockKind::WriterInProcess,
})?;
let snapshot = pager.reader_snapshot()?;
obj_core::backup::backup_pager_to_path(&pager, &snapshot, dest)?;
drop(snapshot);
drop(pager);
Ok(())
}
#[cfg_attr(
feature = "tracing",
tracing::instrument(name = "db.checkpoint", level = "info", skip_all)
)]
pub fn checkpoint(&self) -> Result<()> {
if self.readonly {
return Err(Error::ReadOnly {
operation: "checkpoint",
});
}
let guard = obj_core::WriteTxn::begin(&self.env, self.busy_timeout)?;
let result = self.run_checkpoint_under_guard();
let unlock = guard.rollback();
result.and(unlock)
}
fn run_checkpoint_under_guard(&self) -> Result<()> {
let mut pager = self.env.pager().lock().map_err(|_| Error::Busy {
kind: obj_core::LockKind::WriterInProcess,
})?;
pager.checkpoint()?;
drop(pager);
Ok(())
}
pub fn iter_all<T: Document + Send + 'static>(&self) -> Result<IterAll<'_, T>> {
let inner = obj_core::ReadTxn::begin_with_timeout(&self.env, self.busy_timeout)?;
let txn = ReadTxn::new(inner);
let descriptor = {
let coll = txn.collection::<T>()?;
coll.descriptor().clone()
};
Ok(IterAll {
txn,
descriptor,
buffer: VecDeque::new(),
last_emitted_key: None,
finished: false,
_phantom: PhantomData,
})
}
}
pub struct IterAll<'db, T> {
txn: ReadTxn<'db>,
descriptor: CollectionDescriptor,
buffer: VecDeque<Result<(Id, T)>>,
last_emitted_key: Option<Vec<u8>>,
finished: bool,
_phantom: PhantomData<fn() -> T>,
}
impl<T> std::fmt::Debug for IterAll<'_, T> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("IterAll")
.field("collection_id", &self.descriptor.collection_id)
.field("buffer_len", &self.buffer.len())
.field("finished", &self.finished)
.finish_non_exhaustive()
}
}
impl<T: Document + Send + 'static> IterAll<'_, T> {
fn refill(&mut self) -> Result<()> {
let pager_arc: Arc<Mutex<Pager<FileHandle>>> = Arc::clone(self.txn.inner.env().pager());
let mut pager = pager_arc.lock().map_err(|_| Error::Busy {
kind: obj_core::LockKind::WriterInProcess,
})?;
let root_pid = PageId::new(self.descriptor.primary_root)
.ok_or(Error::InvalidArgument("collection primary_root is zero"))?;
let tree = BTree::<FileHandle>::open(&pager, root_pid)?;
let start = match &self.last_emitted_key {
Some(k) => Bound::Excluded(k.clone()),
None => Bound::Unbounded,
};
let collection_id = self.descriptor.collection_id;
let iter = tree.range(&mut pager, (start, Bound::Unbounded))?;
let mut yielded: usize = 0;
let mut last_key: Option<Vec<u8>> = None;
let mut batch: VecDeque<Result<(Id, T)>> = VecDeque::with_capacity(ITER_ALL_BATCH);
for step in iter {
if yielded >= ITER_ALL_BATCH {
break;
}
yielded = yielded
.checked_add(1)
.ok_or(Error::BTreeInvariantViolated {
reason: "iter_all batch counter overflow",
})?;
buffer_one_entry::<T>(&mut batch, &mut last_key, collection_id, step);
}
if yielded < ITER_ALL_BATCH {
self.finished = true;
}
drop(pager);
self.buffer.extend(batch);
if let Some(k) = last_key {
self.last_emitted_key = Some(k);
}
Ok(())
}
}
fn buffer_one_entry<T: Document>(
batch: &mut VecDeque<Result<(Id, T)>>,
last_key: &mut Option<Vec<u8>>,
collection_id: u32,
step: Result<(Vec<u8>, Vec<u8>)>,
) {
let (key, value) = match step {
Ok(kv) => kv,
Err(e) => {
batch.push_back(Err(e));
return;
}
};
let Some(id) = Id::from_be_bytes(&key) else {
batch.push_back(Err(Error::InvalidArgument(
"primary B-tree key is not an Id",
)));
return;
};
*last_key = Some(key);
let decoded = obj_core::codec::decode::<T>(&value, collection_id);
batch.push_back(decoded.map(|doc| (id, doc)));
}
impl<T: Document + Send + 'static> Iterator for IterAll<'_, T> {
type Item = Result<(Id, T)>;
fn next(&mut self) -> Option<Self::Item> {
if let Some(item) = self.buffer.pop_front() {
return Some(item);
}
if self.finished {
return None;
}
if let Err(e) = self.refill() {
self.finished = true;
return Some(Err(e));
}
self.buffer.pop_front()
}
}
#[must_use]
pub(crate) fn split_namespace(name: &str) -> (Option<&str>, &str) {
match name.find('.') {
Some(idx) => (Some(&name[..idx]), &name[idx + 1..]),
None => (None, name),
}
}
const _: () = {
fn assert_send_sync<T: Send + Sync>() {}
let _ = assert_send_sync::<Db>;
};
fn first_failure_as_error(report: &obj_core::IntegrityReport) -> Option<Error> {
let first = report.failures.first()?;
let err = match first {
obj_core::IntegrityFailure::ChecksumMismatch { page_id }
| obj_core::IntegrityFailure::OrphanPage { page_id }
| obj_core::IntegrityFailure::BTreeSortViolation { page_id }
| obj_core::IntegrityFailure::FreelistChainBroken { page_id }
| obj_core::IntegrityFailure::BTreeSiblingChainBroken { page_id, .. }
| obj_core::IntegrityFailure::BTreeLevelInvariantViolated { page_id, .. }
| obj_core::IntegrityFailure::DanglingCatalogPointer { page_id, .. } => {
Error::Corruption { page_id: *page_id }
}
obj_core::IntegrityFailure::BTreeDepthExceeded { limit, .. } => {
Error::BTreeDepthExceeded { limit: *limit }
}
_ => Error::Corruption { page_id: 0 },
};
Some(err)
}