use std::collections::{HashMap, HashSet};
use std::sync::{Arc, Mutex};
use obj_core::btree::BTree;
use obj_core::codec::{DocumentHeader, DOC_HEADER_SIZE, MAX_INLINE_DOC};
use obj_core::index::EncodedIndexKey;
use obj_core::pager::checksum::crc32c;
use obj_core::pager::page::PageId;
use obj_core::pager::Pager;
use obj_core::platform::FileHandle;
use obj_core::{
Catalog, CollectionDescriptor, Document, Error, Id, IndexStatus, ReaderSnapshot, Result, TxnEnv,
};
use crate::collection::Collection;
pub(crate) const RAW_BYTES_TYPE_VERSION: u32 = 1;
pub struct WriteTxn<'db> {
pub(crate) inner: obj_core::WriteTxn<'db, FileHandle>,
pub(crate) catalog: Arc<Mutex<Catalog<FileHandle>>>,
pub(crate) reconciled: Arc<Mutex<HashSet<(String, u32)>>>,
pub(crate) reconciled_staged: HashSet<(String, u32)>,
pub(crate) descriptors: crate::collection::DescriptorCache,
}
impl<'db> WriteTxn<'db> {
#[doc(hidden)]
#[must_use]
pub fn from_parts(
inner: obj_core::WriteTxn<'db, FileHandle>,
catalog: Arc<Mutex<Catalog<FileHandle>>>,
reconciled: Arc<Mutex<HashSet<(String, u32)>>>,
) -> Self {
Self {
inner,
catalog,
reconciled,
reconciled_staged: HashSet::new(),
descriptors: crate::collection::new_descriptor_cache(),
}
}
pub(crate) fn new(
inner: obj_core::WriteTxn<'db, FileHandle>,
catalog: Arc<Mutex<Catalog<FileHandle>>>,
reconciled: Arc<Mutex<HashSet<(String, u32)>>>,
) -> Self {
Self::from_parts(inner, catalog, reconciled)
}
pub fn collection<T: Document>(&mut self) -> Result<Collection<'_, T>> {
if let (Some(namespace), tail) = crate::db::split_namespace(T::COLLECTION) {
return Err(Error::AttachedDatabaseIsReadOnly {
namespace: namespace.to_owned(),
collection: tail.to_owned(),
});
}
Collection::open_or_create(self)
}
pub fn commit(self) -> Result<()> {
self.flush_descriptors()?;
let Self {
inner,
reconciled,
reconciled_staged,
..
} = self;
inner.commit()?;
promote_reconciled(&reconciled, reconciled_staged)
}
fn flush_descriptors(&self) -> Result<()> {
let entries: Vec<(String, CollectionDescriptor)> = {
let cache = crate::collection::lock_descriptors(&self.descriptors)?;
if cache.is_empty() {
return Ok(());
}
cache
.iter()
.map(|(name, descriptor)| (name.clone(), descriptor.clone()))
.collect()
};
let mut pager = lock_pager(self.inner.env())?;
let mut catalog = lock_catalog(&self.catalog)?;
for (name, descriptor) in &entries {
catalog.update(&mut pager, name, descriptor)?;
}
Ok(())
}
pub fn rollback(self) -> Result<()> {
self.inner.rollback()
}
#[doc(hidden)]
pub fn insert_raw_bytes(&mut self, collection: &str, payload: &[u8]) -> Result<Id> {
self.insert_with_version(collection, payload, RAW_BYTES_TYPE_VERSION)
}
#[doc(hidden)]
pub fn insert_with_version(
&mut self,
collection: &str,
payload: &[u8],
type_version: u32,
) -> Result<Id> {
reject_namespaced_write(collection)?;
let _ = ensure_collection_raw(&self.inner, &self.catalog, collection)?;
let mut pager = lock_pager(self.inner.env())?;
let catalog = lock_catalog(&self.catalog)?;
let mut cache = crate::collection::lock_descriptors(&self.descriptors)?;
let descriptor =
crate::collection::cached_descriptor_mut(&mut cache, &mut pager, &catalog, collection)?;
let id = obj_core::id::bump_next_id(&mut descriptor.next_id, || collection.to_owned())?;
let bytes = wrap_raw_payload_with_version(descriptor.collection_id, payload, type_version)?;
let key = id.to_be_bytes();
let mut tree = btree_handle(&pager, descriptor.primary_root)?;
tree.insert(&mut pager, &key, &bytes)?;
descriptor.primary_root = tree.root().get();
Ok(id)
}
#[doc(hidden)]
pub fn get_raw_bytes(&mut self, collection: &str, id: Id) -> Result<Option<Vec<u8>>> {
Ok(self
.get_with_version(collection, id)?
.map(|(payload, _version)| payload))
}
#[doc(hidden)]
pub fn get_with_version(&mut self, collection: &str, id: Id) -> Result<Option<(Vec<u8>, u32)>> {
let descriptor = self.live_descriptor_required(collection)?;
let mut pager = lock_pager(self.inner.env())?;
let tree = btree_handle(&pager, descriptor.primary_root)?;
let key = id.to_be_bytes();
match tree.get(&mut pager, &key)? {
Some(bytes) => Ok(Some(strip_raw_payload_with_version(
&bytes,
descriptor.collection_id,
)?)),
None => Ok(None),
}
}
fn live_descriptor_required(&self, collection: &str) -> Result<CollectionDescriptor> {
{
let cache = crate::collection::lock_descriptors(&self.descriptors)?;
if let Some(descriptor) = cache.get(collection) {
return Ok(descriptor.clone());
}
}
catalog_get_required(&self.inner, &self.catalog, collection)
}
#[doc(hidden)]
pub fn update_raw_bytes(&mut self, collection: &str, id: Id, payload: &[u8]) -> Result<()> {
self.update_with_version(collection, id, payload, RAW_BYTES_TYPE_VERSION)
}
#[doc(hidden)]
pub fn update_with_version(
&mut self,
collection: &str,
id: Id,
payload: &[u8],
type_version: u32,
) -> Result<()> {
reject_namespaced_write(collection)?;
let exists = self.collection_exists(collection)?;
if !exists {
return Err(Error::CollectionNotFound {
name: collection.to_owned(),
});
}
let mut pager = lock_pager(self.inner.env())?;
let catalog = lock_catalog(&self.catalog)?;
let mut cache = crate::collection::lock_descriptors(&self.descriptors)?;
let descriptor =
crate::collection::cached_descriptor_mut(&mut cache, &mut pager, &catalog, collection)?;
let key = id.to_be_bytes();
let mut tree = btree_handle(&pager, descriptor.primary_root)?;
if tree.get(&mut pager, &key)?.is_none() {
return Err(Error::CollectionNotFound {
name: format!("{collection}#{}", id.get()),
});
}
let bytes = wrap_raw_payload_with_version(descriptor.collection_id, payload, type_version)?;
tree.delete(&mut pager, &key)?;
tree.insert(&mut pager, &key, &bytes)?;
descriptor.primary_root = tree.root().get();
Ok(())
}
#[doc(hidden)]
pub fn delete_raw_bytes(&mut self, collection: &str, id: Id) -> Result<bool> {
reject_namespaced_write(collection)?;
if !self.collection_exists(collection)? {
return Err(Error::CollectionNotFound {
name: collection.to_owned(),
});
}
let mut pager = lock_pager(self.inner.env())?;
let catalog = lock_catalog(&self.catalog)?;
let mut cache = crate::collection::lock_descriptors(&self.descriptors)?;
let descriptor =
crate::collection::cached_descriptor_mut(&mut cache, &mut pager, &catalog, collection)?;
let key = id.to_be_bytes();
let mut tree = btree_handle(&pager, descriptor.primary_root)?;
let removed = tree.delete(&mut pager, &key)?;
descriptor.primary_root = tree.root().get();
Ok(removed)
}
#[doc(hidden)]
pub fn count_all_raw(&mut self, collection: &str) -> Result<u64> {
let descriptor = self.live_descriptor_required(collection)?;
let mut pager = lock_pager(self.inner.env())?;
let tree = btree_handle(&pager, descriptor.primary_root)?;
let mut n: u64 = 0;
for step in tree.range(&mut pager, ..)? {
let _ = step?;
n = n.checked_add(1).ok_or(Error::BTreeInvariantViolated {
reason: "primary tree entry count exceeds u64",
})?;
}
Ok(n)
}
#[doc(hidden)]
pub fn upsert_raw_bytes(&mut self, collection: &str, id: Id, payload: &[u8]) -> Result<()> {
self.upsert_with_version(collection, id, payload, RAW_BYTES_TYPE_VERSION)
}
#[doc(hidden)]
pub fn upsert_with_version(
&mut self,
collection: &str,
id: Id,
payload: &[u8],
type_version: u32,
) -> Result<()> {
reject_namespaced_write(collection)?;
let _ = ensure_collection_raw(&self.inner, &self.catalog, collection)?;
let mut pager = lock_pager(self.inner.env())?;
let catalog = lock_catalog(&self.catalog)?;
let mut cache = crate::collection::lock_descriptors(&self.descriptors)?;
let descriptor =
crate::collection::cached_descriptor_mut(&mut cache, &mut pager, &catalog, collection)?;
let bytes = wrap_raw_payload_with_version(descriptor.collection_id, payload, type_version)?;
let key = id.to_be_bytes();
let mut tree = btree_handle(&pager, descriptor.primary_root)?;
let _ = tree.delete(&mut pager, &key)?;
tree.insert(&mut pager, &key, &bytes)?;
descriptor.primary_root = tree.root().get();
Ok(())
}
pub fn insert_raw_indexed(
&mut self,
collection: &str,
payload: &[u8],
type_version: u32,
entries: &[(String, Vec<u8>)],
) -> Result<Id> {
let id = self.insert_with_version(collection, payload, type_version)?;
self.maintain_raw_indexes(collection, id, &[], entries)?;
Ok(id)
}
pub fn update_raw_indexed(
&mut self,
collection: &str,
id: Id,
payload: &[u8],
type_version: u32,
remove: &[(String, Vec<u8>)],
add: &[(String, Vec<u8>)],
) -> Result<()> {
self.update_with_version(collection, id, payload, type_version)?;
self.maintain_raw_indexes(collection, id, remove, add)?;
Ok(())
}
pub fn delete_raw_indexed(
&mut self,
collection: &str,
id: Id,
remove: &[(String, Vec<u8>)],
) -> Result<bool> {
let removed = self.delete_raw_bytes(collection, id)?;
self.maintain_raw_indexes(collection, id, remove, &[])?;
Ok(removed)
}
pub fn reconcile_indexes_raw(
&mut self,
collection: &str,
version: u32,
specs: &[obj_core::IndexSpec],
) -> Result<()> {
let _descriptor = ensure_collection_raw(&self.inner, &self.catalog, collection)?;
crate::collection::reconcile_specs_once(
&self.inner,
&self.catalog,
&self.reconciled,
&mut self.reconciled_staged,
collection,
version,
specs,
)
}
fn maintain_raw_indexes(
&mut self,
collection: &str,
id: Id,
old: &[(String, Vec<u8>)],
new: &[(String, Vec<u8>)],
) -> Result<()> {
if old.is_empty() && new.is_empty() {
return Ok(());
}
let mut pager = lock_pager(self.inner.env())?;
let catalog = lock_catalog(&self.catalog)?;
let mut cache = crate::collection::lock_descriptors(&self.descriptors)?;
let descriptor =
crate::collection::cached_descriptor_mut(&mut cache, &mut pager, &catalog, collection)?;
let touched = touched_index_names(old, new);
for index_name in &touched {
maintain_one_raw_index(&mut pager, descriptor, collection, index_name, old, new, id)?;
}
Ok(())
}
fn collection_exists(&self, collection: &str) -> Result<bool> {
{
let cache = crate::collection::lock_descriptors(&self.descriptors)?;
if cache.contains_key(collection) {
return Ok(true);
}
}
let mut pager = lock_pager(self.inner.env())?;
let catalog = lock_catalog(&self.catalog)?;
Ok(catalog.get(&mut pager, collection)?.is_some())
}
}
pub struct ReadTxn<'db> {
pub(crate) inner: obj_core::ReadTxn<'db, FileHandle>,
pub(crate) attached: HashMap<String, AttachedReadCtx>,
}
impl<'db> ReadTxn<'db> {
#[doc(hidden)]
#[must_use]
pub fn from_parts(inner: obj_core::ReadTxn<'db, FileHandle>) -> Self {
Self {
inner,
attached: HashMap::new(),
}
}
pub(crate) fn new(inner: obj_core::ReadTxn<'db, FileHandle>) -> Self {
Self::from_parts(inner)
}
pub(crate) fn with_attached(
inner: obj_core::ReadTxn<'db, FileHandle>,
attached: HashMap<String, AttachedReadCtx>,
) -> Self {
Self { inner, attached }
}
fn resolve_read_target<'a>(&'a self, collection: &'a str) -> Result<ReadTarget<'a>> {
let (namespace, tail) = crate::db::split_namespace(collection);
match namespace {
None => Ok(ReadTarget {
env: self.inner.env(),
snapshot: self.inner.snapshot(),
lookup_name: collection,
}),
Some(ns) => {
let ctx =
self.attached
.get(ns)
.ok_or_else(|| Error::CollectionNamespaceUnknown {
namespace: ns.to_owned(),
})?;
Ok(ReadTarget {
env: ctx.env.as_ref(),
snapshot: &ctx.snapshot,
lookup_name: tail,
})
}
}
}
#[doc(hidden)]
pub fn get_raw_bytes(&self, collection: &str, id: Id) -> Result<Option<Vec<u8>>> {
Ok(self
.get_with_version(collection, id)?
.map(|(payload, _version)| payload))
}
#[doc(hidden)]
pub fn get_with_version(&self, collection: &str, id: Id) -> Result<Option<(Vec<u8>, u32)>> {
let target = self.resolve_read_target(collection)?;
let descriptor = target.collection_descriptor(collection)?;
let pager = lock_pager(target.env)?;
let root = PageId::new(descriptor.primary_root)
.ok_or(Error::InvalidArgument("collection primary_root is zero"))?;
let key = id.to_be_bytes();
let bytes = obj_core::btree::BTree::<FileHandle>::get_via_snapshot(
&pager,
target.snapshot,
root,
&key,
)?;
match bytes {
Some(b) => Ok(Some(strip_raw_payload_with_version(
&b,
descriptor.collection_id,
)?)),
None => Ok(None),
}
}
#[doc(hidden)]
pub fn snapshot_descriptor(&self, collection: &str) -> Result<Option<CollectionDescriptor>> {
read_descriptor_via_snapshot(self.inner.env(), self.inner.snapshot(), collection)
}
#[doc(hidden)]
#[must_use]
pub fn inner(&self) -> &obj_core::ReadTxn<'db, FileHandle> {
&self.inner
}
#[doc(hidden)]
pub fn snapshot_index_descriptor(
&self,
collection: &str,
index: &str,
) -> Result<obj_core::IndexDescriptor> {
let descriptor =
self.snapshot_descriptor(collection)?
.ok_or_else(|| Error::CollectionNotFound {
name: collection.to_owned(),
})?;
let entry = descriptor.indexes.iter().find(|d| d.name == index);
match entry {
Some(d) if d.status == obj_core::IndexStatus::Active => Ok(d.clone()),
_ => Err(Error::IndexNotFound {
collection: collection.to_owned(),
name: index.to_owned(),
}),
}
}
#[doc(hidden)]
pub fn count_all_raw(&self, collection: &str) -> Result<u64> {
let target = self.resolve_read_target(collection)?;
let descriptor = target.collection_descriptor(collection)?;
count_via_btree_range_full(target.env, target.snapshot, descriptor.primary_root)
}
#[doc(hidden)]
pub fn index_range_raw(
&self,
collection: &str,
index: &str,
lower: std::ops::Bound<Vec<u8>>,
upper: std::ops::Bound<Vec<u8>>,
) -> Result<Vec<(Id, Vec<u8>)>> {
let target = self.resolve_read_target(collection)?;
let index_descriptor = target.index_descriptor(collection, index)?;
let collection_descriptor = target.collection_descriptor(collection)?;
let (start, end) =
crate::index_bound::widen_bounds_for_kind(lower, upper, index_descriptor.kind);
let entries = collect_index_range_entries(
target.env,
target.snapshot,
index_descriptor.root_page_id,
start,
end,
)?;
materialize_id_payload_pairs(
target.env,
target.snapshot,
&collection_descriptor,
&index_descriptor,
entries,
)
}
#[doc(hidden)]
pub fn index_range_raw_with_version(
&self,
collection: &str,
index: &str,
lower: std::ops::Bound<Vec<u8>>,
upper: std::ops::Bound<Vec<u8>>,
) -> Result<Vec<(Id, u32, Vec<u8>)>> {
let target = self.resolve_read_target(collection)?;
let index_descriptor = target.index_descriptor(collection, index)?;
let collection_descriptor = target.collection_descriptor(collection)?;
let (start, end) =
crate::index_bound::widen_bounds_for_kind(lower, upper, index_descriptor.kind);
let entries = collect_index_range_entries(
target.env,
target.snapshot,
index_descriptor.root_page_id,
start,
end,
)?;
materialize_id_version_payload_rows(
target.env,
target.snapshot,
&collection_descriptor,
&index_descriptor,
entries,
)
}
#[doc(hidden)]
pub fn count_index_range_raw(
&self,
collection: &str,
index: &str,
lower: std::ops::Bound<Vec<u8>>,
upper: std::ops::Bound<Vec<u8>>,
) -> Result<u64> {
let target = self.resolve_read_target(collection)?;
let index_descriptor = target.index_descriptor(collection, index)?;
let (start, end) =
crate::index_bound::widen_bounds_for_kind(lower, upper, index_descriptor.kind);
let entries = collect_index_range_entries(
target.env,
target.snapshot,
index_descriptor.root_page_id,
start,
end,
)?;
u64::try_from(entries.len()).map_err(|_| Error::BTreeInvariantViolated {
reason: "index range entry count exceeds u64",
})
}
#[doc(hidden)]
pub fn find_unique_raw(
&self,
collection: &str,
index: &str,
key_bytes: &[u8],
) -> Result<Option<(Id, Vec<u8>)>> {
Ok(self
.find_unique_with_version(collection, index, key_bytes)?
.map(|(id, payload, _version)| (id, payload)))
}
#[doc(hidden)]
pub fn find_unique_with_version(
&self,
collection: &str,
index: &str,
key_bytes: &[u8],
) -> Result<Option<(Id, Vec<u8>, u32)>> {
let target = self.resolve_read_target(collection)?;
let index_descriptor = target.index_descriptor(collection, index)?;
if index_descriptor.kind != obj_core::IndexKind::Unique {
return Err(Error::IndexNotUnique {
collection: collection.to_owned(),
name: index.to_owned(),
});
}
let id_bytes = {
let pager = lock_pager(target.env)?;
let root = PageId::new(index_descriptor.root_page_id)
.ok_or(Error::InvalidArgument("index root_page_id is zero"))?;
BTree::<FileHandle>::get_via_snapshot(&pager, target.snapshot, root, key_bytes)?
};
match id_bytes {
Some(bytes) => {
let id = Id::from_be_bytes(&bytes).ok_or(Error::Corruption { page_id: 0 })?;
match self.get_with_version(collection, id)? {
Some((payload, version)) => Ok(Some((id, payload, version))),
None => Err(Error::Corruption { page_id: 0 }),
}
}
None => Ok(None),
}
}
pub fn collection<T: Document>(&self) -> Result<Collection<'_, T>> {
Collection::open_readonly(self)
}
}
struct ReadTarget<'a> {
env: &'a TxnEnv<FileHandle>,
snapshot: &'a ReaderSnapshot<FileHandle>,
lookup_name: &'a str,
}
impl ReadTarget<'_> {
fn collection_descriptor(&self, original: &str) -> Result<CollectionDescriptor> {
read_descriptor_via_snapshot(self.env, self.snapshot, self.lookup_name)?.ok_or_else(|| {
Error::CollectionNotFound {
name: original.to_owned(),
}
})
}
fn index_descriptor(&self, original: &str, index: &str) -> Result<obj_core::IndexDescriptor> {
let descriptor = self.collection_descriptor(original)?;
let entry = descriptor.indexes.iter().find(|d| d.name == index);
match entry {
Some(d) if d.status == obj_core::IndexStatus::Active => Ok(d.clone()),
_ => Err(Error::IndexNotFound {
collection: original.to_owned(),
name: index.to_owned(),
}),
}
}
}
pub(crate) struct AttachedReadCtx {
pub(crate) env: Arc<TxnEnv<FileHandle>>,
pub(crate) snapshot: ReaderSnapshot<FileHandle>,
}
pub(crate) struct AttachedDb {
pub(crate) env: Arc<TxnEnv<FileHandle>>,
pub(crate) _db: crate::Db,
}
fn promote_reconciled(
reconciled: &Mutex<HashSet<(String, u32)>>,
staged: HashSet<(String, u32)>,
) -> Result<()> {
if staged.is_empty() {
return Ok(());
}
let mut shared = reconciled.lock().map_err(|_| Error::Busy {
kind: obj_core::LockKind::WriterInProcess,
})?;
shared.extend(staged);
Ok(())
}
pub(crate) fn lock_catalog(
catalog: &Mutex<Catalog<FileHandle>>,
) -> Result<std::sync::MutexGuard<'_, Catalog<FileHandle>>> {
catalog.lock().map_err(|_| Error::Busy {
kind: obj_core::LockKind::WriterInProcess,
})
}
fn reject_namespaced_write(collection: &str) -> Result<()> {
if let (Some(namespace), tail) = crate::db::split_namespace(collection) {
return Err(Error::AttachedDatabaseIsReadOnly {
namespace: namespace.to_owned(),
collection: tail.to_owned(),
});
}
Ok(())
}
fn lock_pager(env: &TxnEnv<FileHandle>) -> Result<std::sync::MutexGuard<'_, Pager<FileHandle>>> {
env.pager().lock().map_err(|_| Error::Busy {
kind: obj_core::LockKind::WriterInProcess,
})
}
fn touched_index_names(old: &[(String, Vec<u8>)], new: &[(String, Vec<u8>)]) -> Vec<String> {
let mut names: Vec<String> = Vec::new();
for (name, _key) in old.iter().chain(new.iter()) {
if !names.iter().any(|n| n == name) {
names.push(name.clone());
}
}
names
}
fn keys_for_index(entries: &[(String, Vec<u8>)], index_name: &str) -> Vec<EncodedIndexKey> {
entries
.iter()
.filter(|(name, _key)| name == index_name)
.map(|(_name, key)| EncodedIndexKey::from_bytes(key.clone()))
.collect()
}
fn maintain_one_raw_index(
pager: &mut Pager<FileHandle>,
descriptor: &mut CollectionDescriptor,
collection: &str,
index_name: &str,
old: &[(String, Vec<u8>)],
new: &[(String, Vec<u8>)],
id: Id,
) -> Result<()> {
let idx = descriptor
.indexes
.iter()
.position(|d| d.name == index_name && d.status == IndexStatus::Active)
.ok_or_else(|| Error::IndexNotFound {
collection: collection.to_owned(),
name: index_name.to_owned(),
})?;
let spec = crate::index_maint::descriptor_to_spec(&descriptor.indexes[idx])?;
let old_keys = keys_for_index(old, index_name);
let new_keys = keys_for_index(new, index_name);
crate::index_maint::maintain_index_from_keys(
pager, descriptor, idx, collection, &spec, &old_keys, &new_keys, id,
)
}
fn ensure_collection_raw(
inner: &obj_core::WriteTxn<'_, FileHandle>,
catalog: &Arc<Mutex<Catalog<FileHandle>>>,
name: &str,
) -> Result<CollectionDescriptor> {
let mut pager = inner.lock_pager()?;
let mut catalog_guard = lock_catalog(catalog)?;
if let Some(d) = catalog_guard.get(&mut pager, name)? {
return Ok(d);
}
let tree = BTree::<FileHandle>::empty(&mut pager)?;
let descriptor = CollectionDescriptor::new(0, tree.root().get(), RAW_BYTES_TYPE_VERSION);
let _id = catalog_guard.insert(&mut pager, name, descriptor)?;
catalog_guard
.get(&mut pager, name)?
.ok_or(Error::Corruption { page_id: 0 })
}
fn catalog_get_required(
inner: &obj_core::WriteTxn<'_, FileHandle>,
catalog: &Arc<Mutex<Catalog<FileHandle>>>,
name: &str,
) -> Result<CollectionDescriptor> {
let mut pager = inner.lock_pager()?;
let catalog_guard = lock_catalog(catalog)?;
catalog_guard
.get(&mut pager, name)?
.ok_or_else(|| Error::CollectionNotFound {
name: name.to_owned(),
})
}
fn read_descriptor_via_snapshot(
env: &TxnEnv<FileHandle>,
snapshot: &ReaderSnapshot<FileHandle>,
name: &str,
) -> Result<Option<CollectionDescriptor>> {
let pager = lock_pager(env)?;
Catalog::<FileHandle>::lookup_via_snapshot(&pager, snapshot, name)
}
fn btree_handle(pager: &Pager<FileHandle>, root: u64) -> Result<BTree<FileHandle>> {
let root_pid =
PageId::new(root).ok_or(Error::InvalidArgument("collection primary_root is zero"))?;
BTree::<FileHandle>::open(pager, root_pid)
}
fn wrap_raw_payload_with_version(
collection_id: u32,
payload: &[u8],
type_version: u32,
) -> Result<Vec<u8>> {
let payload_len = u32::try_from(payload.len()).map_err(|_| Error::DocumentTooLarge {
len: payload.len(),
max: MAX_INLINE_DOC,
})?;
let total = DOC_HEADER_SIZE
.checked_add(payload.len())
.ok_or(Error::DocumentTooLarge {
len: usize::MAX,
max: MAX_INLINE_DOC,
})?;
if total > MAX_INLINE_DOC {
return Err(Error::DocumentTooLarge {
len: total,
max: MAX_INLINE_DOC,
});
}
let header = DocumentHeader {
collection_id,
type_version,
payload_len,
payload_crc32c: crc32c(payload),
};
let mut out = Vec::with_capacity(total);
header.write_to(&mut out);
out.extend_from_slice(payload);
Ok(out)
}
fn strip_raw_payload_with_version(
bytes: &[u8],
expected_collection_id: u32,
) -> Result<(Vec<u8>, u32)> {
let header = DocumentHeader::read_from(bytes)?;
if header.collection_id != expected_collection_id {
return Err(Error::CollectionIdMismatch {
expected: expected_collection_id,
found: header.collection_id,
});
}
let payload_len =
usize::try_from(header.payload_len).map_err(|_| Error::Corruption { page_id: 0 })?;
let total = DOC_HEADER_SIZE
.checked_add(payload_len)
.ok_or(Error::Corruption { page_id: 0 })?;
if bytes.len() != total {
return Err(Error::Corruption { page_id: 0 });
}
let payload = &bytes[DOC_HEADER_SIZE..total];
if crc32c(payload) != header.payload_crc32c {
return Err(Error::Corruption { page_id: 0 });
}
Ok((payload.to_vec(), header.type_version))
}
fn count_via_btree_range_full(
env: &TxnEnv<FileHandle>,
snapshot: &ReaderSnapshot<FileHandle>,
primary_root: u64,
) -> Result<u64> {
let pager = lock_pager(env)?;
let root = PageId::new(primary_root)
.ok_or(Error::InvalidArgument("collection primary_root is zero"))?;
let iter = BTree::<FileHandle>::range_via_snapshot(&pager, snapshot, root, ..)?;
let mut n: u64 = 0;
for step in iter {
let _ = step?;
n = n.checked_add(1).ok_or(Error::BTreeInvariantViolated {
reason: "primary tree entry count exceeds u64",
})?;
}
Ok(n)
}
fn collect_index_range_entries(
env: &TxnEnv<FileHandle>,
snapshot: &ReaderSnapshot<FileHandle>,
index_root: u64,
start: std::ops::Bound<Vec<u8>>,
end: std::ops::Bound<Vec<u8>>,
) -> Result<Vec<(Vec<u8>, Vec<u8>)>> {
let pager = lock_pager(env)?;
let root =
PageId::new(index_root).ok_or(Error::InvalidArgument("index root_page_id is zero"))?;
let iter = BTree::<FileHandle>::range_via_snapshot(&pager, snapshot, root, (start, end))?;
let mut out: Vec<(Vec<u8>, Vec<u8>)> = Vec::new();
for step in iter {
out.push(step?);
}
Ok(out)
}
fn materialize_id_payload_pairs(
env: &TxnEnv<FileHandle>,
snapshot: &ReaderSnapshot<FileHandle>,
collection: &CollectionDescriptor,
index: &obj_core::IndexDescriptor,
entries: Vec<(Vec<u8>, Vec<u8>)>,
) -> Result<Vec<(Id, Vec<u8>)>> {
let rows = materialize_id_version_payload_rows(env, snapshot, collection, index, entries)?;
Ok(rows
.into_iter()
.map(|(id, _version, payload)| (id, payload))
.collect())
}
fn materialize_id_version_payload_rows(
env: &TxnEnv<FileHandle>,
snapshot: &ReaderSnapshot<FileHandle>,
collection: &CollectionDescriptor,
index: &obj_core::IndexDescriptor,
entries: Vec<(Vec<u8>, Vec<u8>)>,
) -> Result<Vec<(Id, u32, Vec<u8>)>> {
let mut out: Vec<(Id, u32, Vec<u8>)> = Vec::with_capacity(entries.len());
let mut emitted: std::collections::HashSet<u64> = std::collections::HashSet::new();
let primary_root = PageId::new(collection.primary_root)
.ok_or(Error::InvalidArgument("collection primary_root is zero"))?;
let pager = lock_pager(env)?;
for (full_key, value) in entries {
let id_u64 = index_entry_id(index.kind, &full_key, &value)?;
if index.kind == obj_core::IndexKind::Each && !emitted.insert(id_u64) {
continue;
}
if index.kind != obj_core::IndexKind::Each {
emitted.insert(id_u64);
}
let id = Id::try_new(id_u64).ok_or(Error::Corruption { page_id: 0 })?;
let primary_bytes = BTree::<FileHandle>::get_via_snapshot(
&pager,
snapshot,
primary_root,
&id.to_be_bytes(),
)?
.ok_or(Error::Corruption { page_id: 0 })?;
let (payload, version) =
strip_raw_payload_with_version(&primary_bytes, collection.collection_id)?;
out.push((id, version, payload));
}
Ok(out)
}
fn index_entry_id(kind: obj_core::IndexKind, full_key: &[u8], value: &[u8]) -> Result<u64> {
if kind == obj_core::IndexKind::Unique {
return Ok(Id::from_be_bytes(value)
.ok_or(Error::Corruption { page_id: 0 })?
.get());
}
if full_key.len() < 8 {
return Err(Error::Corruption { page_id: 0 });
}
Ok(Id::from_be_bytes(&full_key[full_key.len() - 8..])
.ok_or(Error::Corruption { page_id: 0 })?
.get())
}