use std::borrow::Cow;
use std::collections::{HashMap, HashSet, VecDeque};
use std::marker::PhantomData;
use std::ops::Bound;
use std::sync::{Arc, Mutex, MutexGuard};
use obj_core::btree::BTree;
use obj_core::codec::{decode, encode};
use obj_core::pager::page::PageId;
use obj_core::pager::Pager;
use obj_core::platform::FileHandle;
use obj_core::{Catalog, CollectionDescriptor, Document, Error, Id, Result};
pub type IndexIter<'a, Item> = Box<dyn Iterator<Item = Result<Item>> + Send + 'a>;
const ITER_INDEX_RANGE_BATCH: usize = 256;
pub const MAX_DISTINCT_IDS: usize = 100_000;
use crate::txn::{lock_catalog, ReadTxn, WriteTxn};
pub(crate) type DescriptorCache = Arc<Mutex<HashMap<String, CollectionDescriptor>>>;
#[must_use]
pub(crate) fn new_descriptor_cache() -> DescriptorCache {
Arc::new(Mutex::new(HashMap::new()))
}
pub(crate) fn lock_descriptors(
cache: &Mutex<HashMap<String, CollectionDescriptor>>,
) -> Result<MutexGuard<'_, HashMap<String, CollectionDescriptor>>> {
cache.lock().map_err(|_| Error::Busy {
kind: obj_core::LockKind::WriterInProcess,
})
}
pub(crate) fn cached_descriptor_mut<'g>(
cache: &'g mut HashMap<String, CollectionDescriptor>,
pager: &mut Pager<FileHandle>,
catalog: &Catalog<FileHandle>,
name: &str,
) -> Result<&'g mut CollectionDescriptor> {
if !cache.contains_key(name) {
let descriptor = catalog_get_required(pager, catalog, name)?;
cache.insert(name.to_owned(), descriptor);
}
cache.get_mut(name).ok_or(Error::Corruption { page_id: 0 })
}
pub struct Collection<'tx, T: Document> {
mode: CollectionMode<'tx>,
#[allow(clippy::struct_field_names)]
collection_name: Cow<'static, str>,
descriptor: CollectionDescriptor,
_phantom: PhantomData<fn() -> T>,
}
enum CollectionMode<'tx> {
Write(WriteRef<'tx>),
Read(ReadRef<'tx>),
Lazy(LazyRef<'tx>),
}
struct LazyRef<'db> {
db: &'db crate::Db,
}
struct WriteRef<'tx> {
env: &'tx obj_core::TxnEnv<FileHandle>,
catalog: Arc<Mutex<Catalog<FileHandle>>>,
descriptors: DescriptorCache,
}
struct ReadRef<'tx> {
snapshot: &'tx obj_core::ReaderSnapshot<FileHandle>,
env: &'tx obj_core::TxnEnv<FileHandle>,
}
impl<'tx, T: Document> Collection<'tx, T> {
pub(crate) fn open_or_create(tx: &'tx mut WriteTxn<'_>) -> Result<Self> {
let _initial = ensure_collection::<T>(&tx.inner, &tx.catalog)?;
reconcile_indexes_once::<T>(
&tx.inner,
&tx.catalog,
&tx.reconciled,
&mut tx.reconciled_staged,
)?;
let descriptor = reread_descriptor::<T>(&tx.inner, &tx.catalog)?;
Ok(Self {
mode: CollectionMode::Write(WriteRef {
env: tx.inner.env(),
catalog: Arc::clone(&tx.catalog),
descriptors: Arc::clone(&tx.descriptors),
}),
collection_name: Cow::Borrowed(T::COLLECTION),
descriptor,
_phantom: PhantomData,
})
}
pub(crate) fn open_readonly(tx: &'tx ReadTxn<'_>) -> Result<Self> {
Self::open_readonly_named(tx, Cow::Borrowed(T::COLLECTION))
}
pub(crate) fn open_readonly_named(
tx: &'tx ReadTxn<'_>,
name: Cow<'static, str>,
) -> Result<Self> {
let (namespace, tail) = crate::db::split_namespace(&name);
let (env, snapshot, lookup_name): (
&'tx obj_core::TxnEnv<FileHandle>,
&'tx obj_core::ReaderSnapshot<FileHandle>,
&str,
) = match namespace {
None => (tx.inner.env(), tx.inner.snapshot(), &name),
Some(ns) => {
let ctx = tx
.attached
.get(ns)
.ok_or_else(|| Error::CollectionNamespaceUnknown {
namespace: ns.to_owned(),
})?;
(ctx.env.as_ref(), &ctx.snapshot, tail)
}
};
let Some(descriptor) = read_descriptor_via_snapshot_named(env, snapshot, lookup_name)?
else {
return Err(Error::CollectionNotFound {
name: name.into_owned(),
});
};
Ok(Self {
mode: CollectionMode::Read(ReadRef { snapshot, env }),
collection_name: name,
descriptor,
_phantom: PhantomData,
})
}
pub(crate) fn lazy(db: &'tx crate::Db, name: String) -> Self {
Self {
mode: CollectionMode::Lazy(LazyRef { db }),
collection_name: Cow::Owned(name),
descriptor: CollectionDescriptor::new(0, 0, 0),
_phantom: PhantomData,
}
}
#[must_use]
pub fn descriptor(&self) -> &CollectionDescriptor {
&self.descriptor
}
fn write_primary_root(&self, write: &WriteRef<'tx>) -> Result<u64> {
let cache = lock_descriptors(&write.descriptors)?;
Ok(cache
.get(self.collection_name.as_ref())
.map_or(self.descriptor.primary_root, |d| d.primary_root))
}
fn write_index_root(
&self,
write: &WriteRef<'tx>,
index_name: &str,
fallback: u64,
) -> Result<u64> {
let cache = lock_descriptors(&write.descriptors)?;
let Some(descriptor) = cache.get(self.collection_name.as_ref()) else {
return Ok(fallback);
};
let live = descriptor
.indexes
.iter()
.find(|d| d.name == index_name && d.status == obj_core::IndexStatus::Active)
.map_or(fallback, |d| d.root_page_id);
Ok(live)
}
#[allow(clippy::needless_pass_by_value)]
pub fn insert(&self, doc: T) -> Result<Id> {
let write = self.write_or_err("insert")?;
let name: &str = self.collection_name.as_ref();
let mut pager = lock_pager(write.env)?;
let catalog = lock_catalog(&write.catalog)?;
let mut cache = lock_descriptors(&write.descriptors)?;
let descriptor = cached_descriptor_mut(&mut cache, &mut pager, &catalog, name)?;
let id = obj_core::id::bump_next_id(&mut descriptor.next_id, || name.to_owned())?;
let bytes = encode(&doc, descriptor.collection_id)?;
let key = id.to_be_bytes();
let mut tree = btree_handle(&pager, descriptor.primary_root)?;
tree.insert(&mut pager, &key, &bytes)?;
descriptor.primary_root = tree.root().get();
crate::index_maint::apply_doc_change::<T>(&mut pager, descriptor, None, Some(&doc), id)?;
Ok(id)
}
pub fn get(&self, id: Id) -> Result<Option<T>> {
if let Some(r) = self.dispatch_lazy(|c| c.get(id)) {
return r;
}
let key = id.to_be_bytes();
let Some(bytes) = self.read_or_write_env_for_get(&key)? else {
return Ok(None);
};
Ok(Some(decode::<T>(&bytes, self.descriptor.collection_id)?))
}
fn read_or_write_env_for_get(&self, key: &[u8]) -> Result<Option<Vec<u8>>> {
match &self.mode {
CollectionMode::Write(write) => {
let primary_root = self.write_primary_root(write)?;
let mut pager = lock_pager(write.env)?;
let tree = btree_handle(&pager, primary_root)?;
tree.get(&mut pager, key)
}
CollectionMode::Read(read) => {
snapshot_get_via_btree(read.snapshot, read.env, self.descriptor.primary_root, key)
}
CollectionMode::Lazy(_) => Err(Error::ReadOnly {
operation: "internal: lazy-mode primary read",
}),
}
}
pub fn update<F>(&self, id: Id, f: F) -> Result<()>
where
F: FnOnce(&mut T),
{
let write = self.write_or_err("update")?;
let name: &str = self.collection_name.as_ref();
let mut pager = lock_pager(write.env)?;
let catalog = lock_catalog(&write.catalog)?;
let mut cache = lock_descriptors(&write.descriptors)?;
let descriptor = cached_descriptor_mut(&mut cache, &mut pager, &catalog, name)?;
let key = id.to_be_bytes();
let mut tree = btree_handle(&pager, descriptor.primary_root)?;
let existing = tree.get(&mut pager, &key)?.ok_or(Error::DocumentNotFound {
collection: T::COLLECTION,
id: id.get(),
})?;
let old_value = decode::<T>(&existing, descriptor.collection_id)?;
let mut new_value = decode::<T>(&existing, descriptor.collection_id)?;
f(&mut new_value);
let bytes = encode(&new_value, descriptor.collection_id)?;
tree.delete(&mut pager, &key)?;
tree.insert(&mut pager, &key, &bytes)?;
descriptor.primary_root = tree.root().get();
crate::index_maint::apply_doc_change::<T>(
&mut pager,
descriptor,
Some(&old_value),
Some(&new_value),
id,
)?;
Ok(())
}
pub fn delete(&self, id: Id) -> Result<bool> {
let write = self.write_or_err("delete")?;
let name: &str = self.collection_name.as_ref();
let mut pager = lock_pager(write.env)?;
let catalog = lock_catalog(&write.catalog)?;
let mut cache = lock_descriptors(&write.descriptors)?;
let descriptor = cached_descriptor_mut(&mut cache, &mut pager, &catalog, name)?;
let key = id.to_be_bytes();
let mut tree = btree_handle(&pager, descriptor.primary_root)?;
let old_value = match tree.get(&mut pager, &key)? {
Some(bytes) => Some(decode::<T>(&bytes, descriptor.collection_id)?),
None => None,
};
let removed = tree.delete(&mut pager, &key)?;
descriptor.primary_root = tree.root().get();
crate::index_maint::apply_doc_change::<T>(
&mut pager,
descriptor,
old_value.as_ref(),
None,
id,
)?;
Ok(removed)
}
#[allow(clippy::needless_pass_by_value)]
pub fn upsert(&self, id: Id, doc: T) -> Result<()> {
let write = self.write_or_err("upsert")?;
let name: &str = self.collection_name.as_ref();
let mut pager = lock_pager(write.env)?;
let catalog = lock_catalog(&write.catalog)?;
let mut cache = lock_descriptors(&write.descriptors)?;
let descriptor = cached_descriptor_mut(&mut cache, &mut pager, &catalog, name)?;
let bytes = encode(&doc, descriptor.collection_id)?;
let key = id.to_be_bytes();
let mut tree = btree_handle(&pager, descriptor.primary_root)?;
let old_value = match tree.get(&mut pager, &key)? {
Some(prior) => Some(decode::<T>(&prior, descriptor.collection_id)?),
None => None,
};
let _ = tree.delete(&mut pager, &key)?;
tree.insert(&mut pager, &key, &bytes)?;
descriptor.primary_root = tree.root().get();
crate::index_maint::apply_doc_change::<T>(
&mut pager,
descriptor,
old_value.as_ref(),
Some(&doc),
id,
)?;
Ok(())
}
pub fn find_unique(
&self,
index_name: &str,
key: impl Into<obj_core::codec::Dynamic>,
) -> Result<Option<T>> {
let key_dyn = key.into();
if let Some(r) = self.dispatch_lazy(|c| c.find_unique(index_name, key_dyn.clone())) {
return r;
}
let descriptor = self.active_index(index_name)?;
if descriptor.kind != obj_core::IndexKind::Unique {
return Err(Error::IndexNotUnique {
collection: self.collection_name.clone().into_owned(),
name: index_name.to_owned(),
});
}
let encoded = index_key_for_lookup(descriptor, &[key_dyn])?;
let id_bytes = self.index_get(descriptor, encoded.as_bytes())?;
match id_bytes {
Some(bytes) => match Id::from_be_bytes(&bytes) {
Some(id) => self.get(id),
None => Err(Error::Corruption { page_id: 0 }),
},
None => Ok(None),
}
}
pub fn lookup(
&self,
index_name: &str,
key: impl Into<obj_core::codec::Dynamic>,
) -> Result<IndexIter<'static, T>>
where
T: Send + 'static,
{
let key_dyn = key.into();
if let Some(r) = self.dispatch_lazy(|c| c.lookup(index_name, key_dyn.clone())) {
return r;
}
let descriptor = self.active_index(index_name)?;
let encoded = index_key_for_lookup(descriptor, &[key_dyn])?;
let ids = match descriptor.kind {
obj_core::IndexKind::Unique => self.collect_unique(descriptor, encoded.as_bytes())?,
obj_core::IndexKind::Standard
| obj_core::IndexKind::Each
| obj_core::IndexKind::Composite => {
self.collect_nonunique_equal(descriptor, encoded.as_bytes())?
}
_ => return Err(Error::InvalidArgument("unsupported index kind")),
};
let resolved = self.resolve_unique_ids(ids)?;
Ok(Box::new(resolved.into_iter().map(Ok)))
}
pub fn index_range<R>(
&self,
index_name: &str,
range: R,
) -> Result<IndexIter<'static, (Vec<u8>, T)>>
where
R: std::ops::RangeBounds<obj_core::codec::Dynamic>,
T: Send + 'static,
{
let start = encode_dynamic_bound(range.start_bound())?;
let end = encode_dynamic_bound(range.end_bound())?;
self.index_range_encoded(index_name, start, end)
}
pub(crate) fn index_range_encoded(
&self,
index_name: &str,
start_bound: std::ops::Bound<Vec<u8>>,
end_bound: std::ops::Bound<Vec<u8>>,
) -> Result<IndexIter<'static, (Vec<u8>, T)>>
where
T: Send + 'static,
{
if let Some(r) = self.dispatch_lazy(|c| {
c.index_range_encoded(index_name, start_bound.clone(), end_bound.clone())
}) {
return r;
}
let descriptor = self.active_index(index_name)?;
let (start, end) =
crate::index_bound::widen_bounds_for_kind(start_bound, end_bound, descriptor.kind);
let entries = self.collect_range(descriptor, start, end)?;
let descriptor_kind = descriptor.kind;
let mut out: Vec<Result<(Vec<u8>, T)>> = Vec::with_capacity(entries.len());
let mut emitted_ids: std::collections::HashSet<u64> = std::collections::HashSet::new();
for (full_key, id_bytes_value) in entries {
let Some(id) = Id::from_be_bytes(&id_bytes_value) else {
out.push(Err(Error::Corruption { page_id: 0 }));
continue;
};
if descriptor_kind == obj_core::IndexKind::Each && !emitted_ids.insert(id.get()) {
continue;
}
let user_key = strip_id_suffix(&full_key, descriptor_kind);
match self.get(id) {
Ok(Some(doc)) => out.push(Ok((user_key, doc))),
Ok(None) => {
out.push(Err(Error::Corruption { page_id: 0 }));
}
Err(e) => out.push(Err(e)),
}
}
Ok(Box::new(out.into_iter()))
}
pub fn iter_range<'a, R>(&'a self, index_name: &str, range: R) -> Result<IterIndexRange<'a, T>>
where
R: std::ops::RangeBounds<obj_core::codec::Dynamic>,
T: Send + 'static,
{
let start_bound = encode_dynamic_bound(range.start_bound())?;
let end_bound = encode_dynamic_bound(range.end_bound())?;
self.iter_range_encoded(index_name, start_bound, end_bound)
}
fn iter_range_encoded<'a>(
&'a self,
index_name: &str,
start_bound: Bound<Vec<u8>>,
end_bound: Bound<Vec<u8>>,
) -> Result<IterIndexRange<'a, T>>
where
T: Send + 'static,
{
if matches!(self.mode, CollectionMode::Lazy(_)) {
return self.iter_range_lazy_fallback(index_name, start_bound, end_bound);
}
let descriptor = self.active_index(index_name)?;
let index_root = match &self.mode {
CollectionMode::Write(w) => {
self.write_index_root(w, index_name, descriptor.root_page_id)?
}
_ => descriptor.root_page_id,
};
let (start, end) =
crate::index_bound::widen_bounds_for_kind(start_bound, end_bound, descriptor.kind);
let initial_resume = match start {
Bound::Included(k) => InitialResume::Included(k),
Bound::Excluded(k) => InitialResume::Excluded(k),
Bound::Unbounded => InitialResume::Unbounded,
};
Ok(IterIndexRange {
coll: self,
descriptor_kind: descriptor.kind,
index_root,
initial_resume: Some(initial_resume),
last_full_key: None,
end_bound: end,
buffer: VecDeque::new(),
emitted_ids: HashSet::new(),
finished: false,
})
}
fn iter_range_lazy_fallback<'a>(
&'a self,
index_name: &str,
start_bound: Bound<Vec<u8>>,
end_bound: Bound<Vec<u8>>,
) -> Result<IterIndexRange<'a, T>>
where
T: Send + 'static,
{
let materialized = self.index_range_encoded(index_name, start_bound, end_bound)?;
let mut buffer: VecDeque<Result<StagedEntry<T>>> = VecDeque::new();
for item in materialized {
match item {
Ok((key, doc)) => buffer.push_back(Ok(StagedEntry::Resolved(key, doc))),
Err(e) => buffer.push_back(Err(e)),
}
}
Ok(IterIndexRange {
coll: self,
descriptor_kind: obj_core::IndexKind::Standard,
index_root: 0,
initial_resume: None,
last_full_key: None,
end_bound: Bound::Unbounded,
buffer,
emitted_ids: HashSet::new(),
finished: true,
})
}
pub(crate) fn index_kind(&self, index_name: &str) -> Result<obj_core::IndexKind> {
Ok(self.active_index(index_name)?.kind)
}
fn active_index(&self, index_name: &str) -> Result<&obj_core::IndexDescriptor> {
let entry = self
.descriptor
.indexes
.iter()
.find(|d| d.name == index_name);
match entry {
Some(d) if d.status == obj_core::IndexStatus::Active => Ok(d),
_ => Err(Error::IndexNotFound {
collection: self.collection_name.clone().into_owned(),
name: index_name.to_owned(),
}),
}
}
fn index_get(
&self,
descriptor: &obj_core::IndexDescriptor,
key: &[u8],
) -> Result<Option<Vec<u8>>> {
match &self.mode {
CollectionMode::Write(write) => {
let root_raw =
self.write_index_root(write, &descriptor.name, descriptor.root_page_id)?;
let root = PageId::new(root_raw)
.ok_or(Error::InvalidArgument("index root_page_id is zero"))?;
let mut pager = lock_pager(write.env)?;
let tree = BTree::<FileHandle>::open(&pager, root)?;
tree.get(&mut pager, key)
}
CollectionMode::Read(read) => {
let root = PageId::new(descriptor.root_page_id)
.ok_or(Error::InvalidArgument("index root_page_id is zero"))?;
let pager = lock_pager(read.env)?;
BTree::<FileHandle>::get_via_snapshot(&pager, read.snapshot, root, key)
}
CollectionMode::Lazy(_) => Err(Error::ReadOnly {
operation: "internal: lazy-mode index_get",
}),
}
}
fn collect_nonunique_equal(
&self,
descriptor: &obj_core::IndexDescriptor,
prefix: &[u8],
) -> Result<Vec<u64>> {
let entries = self.collect_range(
descriptor,
std::ops::Bound::Included(prefix.to_vec()),
std::ops::Bound::Included(append_max_id(prefix)),
)?;
let mut ids = Vec::with_capacity(entries.len());
let mut emitted: std::collections::HashSet<u64> = std::collections::HashSet::new();
for (_full_key, value) in entries {
let id = Id::from_be_bytes(&value).ok_or(Error::Corruption { page_id: 0 })?;
if emitted.insert(id.get()) {
ids.push(id.get());
}
}
Ok(ids)
}
fn collect_unique(
&self,
descriptor: &obj_core::IndexDescriptor,
key: &[u8],
) -> Result<Vec<u64>> {
match self.index_get(descriptor, key)? {
Some(bytes) => Id::from_be_bytes(&bytes)
.map(|id| vec![id.get()])
.ok_or(Error::Corruption { page_id: 0 }),
None => Ok(Vec::new()),
}
}
fn collect_range(
&self,
descriptor: &obj_core::IndexDescriptor,
start: std::ops::Bound<Vec<u8>>,
end: std::ops::Bound<Vec<u8>>,
) -> Result<Vec<(Vec<u8>, Vec<u8>)>> {
match &self.mode {
CollectionMode::Read(r) => {
let root = PageId::new(descriptor.root_page_id)
.ok_or(Error::InvalidArgument("index root_page_id is zero"))?;
let pager = lock_pager(r.env)?;
let iter = BTree::<FileHandle>::range_via_snapshot(
&pager,
r.snapshot,
root,
(start, end),
)?;
let mut out = Vec::new();
for step in iter {
out.push(step?);
}
Ok(out)
}
CollectionMode::Write(w) => {
let root_raw =
self.write_index_root(w, &descriptor.name, descriptor.root_page_id)?;
let root = PageId::new(root_raw)
.ok_or(Error::InvalidArgument("index root_page_id is zero"))?;
let mut pager = lock_pager(w.env)?;
let tree = BTree::<FileHandle>::open(&pager, root)?;
let iter = tree.range(&mut pager, (start, end))?;
let mut out = Vec::new();
for step in iter {
out.push(step?);
}
Ok(out)
}
CollectionMode::Lazy(_) => Err(Error::ReadOnly {
operation: "internal: lazy-mode collect_range",
}),
}
}
fn resolve_unique_ids(&self, ids: Vec<u64>) -> Result<Vec<T>> {
let mut out = Vec::with_capacity(ids.len());
for raw in ids {
let id =
Id::from_be_bytes(&raw.to_be_bytes()).ok_or(Error::Corruption { page_id: 0 })?;
let doc = self.get(id)?.ok_or(Error::Corruption { page_id: 0 })?;
out.push(doc);
}
Ok(out)
}
pub fn count_all(&self) -> Result<u64> {
#[allow(clippy::redundant_closure_for_method_calls)]
if let Some(r) = self.dispatch_lazy(|c| c.count_all()) {
return r;
}
match &self.mode {
CollectionMode::Read(r) => {
let pager = lock_pager(r.env)?;
let pid = PageId::new(self.descriptor.primary_root)
.ok_or(Error::InvalidArgument("collection primary_root is zero"))?;
let iter = BTree::<FileHandle>::range_via_snapshot(&pager, r.snapshot, pid, ..)?;
count_range_iter(iter)
}
CollectionMode::Write(w) => {
let root = self.write_primary_root(w)?;
let mut pager = lock_pager(w.env)?;
let tree = btree_handle(&pager, root)?;
let iter = tree.range(&mut pager, ..)?;
count_range_iter(iter)
}
CollectionMode::Lazy(_) => Err(Error::ReadOnly {
operation: "internal: lazy-mode count_all",
}),
}
}
pub fn count_index_range<R>(&self, index_name: &str, range: R) -> Result<u64>
where
R: std::ops::RangeBounds<obj_core::codec::Dynamic>,
{
let start = encode_dynamic_bound(range.start_bound())?;
let end = encode_dynamic_bound(range.end_bound())?;
self.count_index_range_encoded(index_name, start, end)
}
pub(crate) fn count_index_range_encoded(
&self,
index_name: &str,
start_bound: std::ops::Bound<Vec<u8>>,
end_bound: std::ops::Bound<Vec<u8>>,
) -> Result<u64> {
if let Some(r) = self.dispatch_lazy(|c| {
c.count_index_range_encoded(index_name, start_bound.clone(), end_bound.clone())
}) {
return r;
}
let descriptor = self.active_index(index_name)?;
let (start, end) =
crate::index_bound::widen_bounds_for_kind(start_bound, end_bound, descriptor.kind);
let entries = self.collect_range(descriptor, start, end)?;
u64::try_from(entries.len()).map_err(|_| Error::BTreeInvariantViolated {
reason: "index range entry count exceeds u64",
})
}
pub fn count_distinct_ids_in_range<R>(&self, index_name: &str, range: R) -> Result<u64>
where
R: std::ops::RangeBounds<obj_core::codec::Dynamic>,
{
let start = encode_dynamic_bound(range.start_bound())?;
let end = encode_dynamic_bound(range.end_bound())?;
self.count_distinct_ids_in_range_encoded(index_name, start, end)
}
pub(crate) fn count_distinct_ids_in_range_encoded(
&self,
index_name: &str,
start_bound: std::ops::Bound<Vec<u8>>,
end_bound: std::ops::Bound<Vec<u8>>,
) -> Result<u64> {
if let Some(r) = self.dispatch_lazy(|c| {
c.count_distinct_ids_in_range_encoded(
index_name,
start_bound.clone(),
end_bound.clone(),
)
}) {
return r;
}
let descriptor = self.active_index(index_name)?;
let (start, end) =
crate::index_bound::widen_bounds_for_kind(start_bound, end_bound, descriptor.kind);
let entries = self.collect_range(descriptor, start, end)?;
let mut distinct: HashSet<u64> = HashSet::new();
for (full_key, value) in entries {
let id = id_from_index_entry(&full_key, &value, descriptor.kind)?;
if distinct.insert(id) && distinct.len() > MAX_DISTINCT_IDS {
return Err(Error::DistinctCountExceeded {
limit: MAX_DISTINCT_IDS,
});
}
}
u64::try_from(distinct.len()).map_err(|_| Error::BTreeInvariantViolated {
reason: "distinct id count exceeds u64",
})
}
pub fn all(&self) -> Result<Vec<(Id, T)>> {
#[allow(clippy::redundant_closure_for_method_calls)]
if let Some(r) = self.dispatch_lazy(|c| c.all()) {
return r;
}
match &self.mode {
CollectionMode::Write(write) => {
let root = self.write_primary_root(write)?;
let mut pager = lock_pager(write.env)?;
scan_all::<T>(&mut pager, root, self.descriptor.collection_id)
}
CollectionMode::Read(read) => snapshot_scan_via_btree::<T>(
read.snapshot,
read.env,
self.descriptor.primary_root,
self.descriptor.collection_id,
),
CollectionMode::Lazy(_) => Err(Error::ReadOnly {
operation: "internal: lazy-mode all",
}),
}
}
fn write_or_err(&self, op: &'static str) -> Result<&WriteRef<'tx>> {
match &self.mode {
CollectionMode::Write(w) => Ok(w),
CollectionMode::Read(_) | CollectionMode::Lazy(_) => {
Err(Error::ReadOnly { operation: op })
}
}
}
fn dispatch_lazy<R, F>(&self, body: F) -> Option<Result<R>>
where
F: FnOnce(&Collection<'_, T>) -> Result<R>,
{
match &self.mode {
CollectionMode::Lazy(LazyRef { db }) => {
let name: Cow<'static, str> = Cow::Owned(self.collection_name.clone().into_owned());
Some(db.read_transaction(move |tx| {
let coll = Collection::<T>::open_readonly_named(tx, name)?;
body(&coll)
}))
}
_ => None,
}
}
}
fn lock_pager(
env: &obj_core::TxnEnv<FileHandle>,
) -> Result<std::sync::MutexGuard<'_, Pager<FileHandle>>> {
env.pager().lock().map_err(|_| Error::Busy {
kind: obj_core::LockKind::WriterInProcess,
})
}
fn ensure_collection<T: Document>(
inner: &obj_core::WriteTxn<'_, FileHandle>,
catalog: &Arc<Mutex<Catalog<FileHandle>>>,
) -> Result<CollectionDescriptor> {
let mut pager = inner.lock_pager()?;
let mut catalog_guard = lock_catalog(catalog)?;
if let Some(d) = catalog_guard.get(&mut pager, T::COLLECTION)? {
return Ok(d);
}
let tree = BTree::<FileHandle>::empty(&mut pager)?;
let descriptor = CollectionDescriptor::new(0, tree.root().get(), T::VERSION);
let _id = catalog_guard.insert(&mut pager, T::COLLECTION, descriptor)?;
catalog_guard
.get(&mut pager, T::COLLECTION)?
.ok_or(Error::Corruption { page_id: 0 })
}
fn reread_descriptor<T: Document>(
inner: &obj_core::WriteTxn<'_, FileHandle>,
catalog: &Arc<Mutex<Catalog<FileHandle>>>,
) -> Result<CollectionDescriptor> {
let mut pager = inner.lock_pager()?;
let catalog_guard = lock_catalog(catalog)?;
catalog_guard
.get(&mut pager, T::COLLECTION)?
.ok_or(Error::Corruption { page_id: 0 })
}
fn reconcile_indexes_once<T: Document>(
inner: &obj_core::WriteTxn<'_, FileHandle>,
catalog: &Arc<Mutex<Catalog<FileHandle>>>,
reconciled: &Arc<Mutex<HashSet<(String, u32)>>>,
staged: &mut HashSet<(String, u32)>,
) -> Result<()> {
reconcile_specs_once(
inner,
catalog,
reconciled,
staged,
T::COLLECTION,
T::VERSION,
&T::indexes(),
)
}
pub(crate) fn reconcile_specs_once(
inner: &obj_core::WriteTxn<'_, FileHandle>,
catalog: &Arc<Mutex<Catalog<FileHandle>>>,
reconciled: &Arc<Mutex<HashSet<(String, u32)>>>,
staged: &mut HashSet<(String, u32)>,
collection: &str,
version: u32,
specs: &[obj_core::IndexSpec],
) -> Result<()> {
let key = (collection.to_owned(), version);
if staged.contains(&key) {
return Ok(());
}
{
let cache = lock_reconciled(reconciled)?;
if cache.contains(&key) {
return Ok(());
}
}
for spec in specs {
spec.validate()?;
}
{
let mut pager = inner.lock_pager()?;
let mut catalog_guard = lock_catalog(catalog)?;
let _post = catalog_guard.reconcile_indexes(&mut pager, collection, specs)?;
}
staged.insert(key);
Ok(())
}
fn lock_reconciled(
reconciled: &Arc<Mutex<HashSet<(String, u32)>>>,
) -> Result<std::sync::MutexGuard<'_, HashSet<(String, u32)>>> {
reconciled.lock().map_err(|_| Error::Busy {
kind: obj_core::LockKind::WriterInProcess,
})
}
fn read_descriptor_via_snapshot_named(
env: &obj_core::TxnEnv<FileHandle>,
snapshot: &obj_core::ReaderSnapshot<FileHandle>,
name: &str,
) -> Result<Option<CollectionDescriptor>> {
let pager = lock_pager(env)?;
Catalog::<FileHandle>::lookup_via_snapshot(&pager, snapshot, name)
}
pub(crate) fn fused_point_get<T: Document>(tx: &ReadTxn<'_>, id: Id) -> Result<Option<T>> {
let (namespace, _tail) = crate::db::split_namespace(T::COLLECTION);
if namespace.is_some() {
return Collection::<T>::open_readonly(tx)?.get(id);
}
let key = id.to_be_bytes();
let resolved =
snapshot_resolve_and_get(tx.inner.snapshot(), tx.inner.env(), T::COLLECTION, &key)?;
match resolved {
Some((descriptor, bytes)) => Ok(Some(decode::<T>(&bytes, descriptor.collection_id)?)),
None => Ok(None),
}
}
fn catalog_get_required(
pager: &mut Pager<FileHandle>,
catalog: &Catalog<FileHandle>,
name: &str,
) -> Result<CollectionDescriptor> {
catalog
.get(pager, name)?
.ok_or(Error::Corruption { page_id: 0 })
}
fn btree_handle(pager: &Pager<FileHandle>, root: u64) -> Result<BTree<FileHandle>> {
let root_pid =
PageId::new(root).ok_or(Error::InvalidArgument("collection primary_root is zero"))?;
BTree::<FileHandle>::open(pager, root_pid)
}
fn count_range_iter<I>(iter: I) -> Result<u64>
where
I: Iterator<Item = Result<(Vec<u8>, Vec<u8>)>>,
{
let mut n: u64 = 0;
for step in iter {
let _ = step?;
n = n.checked_add(1).ok_or(Error::BTreeInvariantViolated {
reason: "primary tree entry count exceeds u64",
})?;
}
Ok(n)
}
fn snapshot_get_via_btree(
snap: &obj_core::ReaderSnapshot<FileHandle>,
env: &obj_core::TxnEnv<FileHandle>,
primary_root: u64,
key: &[u8],
) -> Result<Option<Vec<u8>>> {
let pager = lock_pager(env)?;
let root_pid = PageId::new(primary_root)
.ok_or(Error::InvalidArgument("collection primary_root is zero"))?;
obj_core::btree::BTree::<FileHandle>::get_via_snapshot(&pager, snap, root_pid, key)
}
fn snapshot_resolve_and_get(
snap: &obj_core::ReaderSnapshot<FileHandle>,
env: &obj_core::TxnEnv<FileHandle>,
name: &str,
key: &[u8],
) -> Result<Option<(CollectionDescriptor, Vec<u8>)>> {
let pager = lock_pager(env)?;
let Some(descriptor) = Catalog::<FileHandle>::lookup_via_snapshot(&pager, snap, name)? else {
return Err(Error::CollectionNotFound {
name: name.to_owned(),
});
};
let root_pid = PageId::new(descriptor.primary_root)
.ok_or(Error::InvalidArgument("collection primary_root is zero"))?;
debug_assert_eq!(
root_pid.get(),
descriptor.primary_root,
"fused get must descend the snapshot-resolved primary_root",
);
let value =
obj_core::btree::BTree::<FileHandle>::get_via_snapshot(&pager, snap, root_pid, key)?;
Ok(value.map(|v| (descriptor, v)))
}
fn scan_all<T: Document>(
pager: &mut Pager<FileHandle>,
primary_root: u64,
collection_id: u32,
) -> Result<Vec<(Id, T)>> {
let tree = btree_handle(pager, primary_root)?;
let iter = tree.range(pager, ..)?;
let mut out = Vec::new();
for entry in iter {
let (key, value) = entry?;
let id = Id::from_be_bytes(&key)
.ok_or(Error::InvalidArgument("primary B-tree key is not an Id"))?;
let doc = decode::<T>(&value, collection_id)?;
out.push((id, doc));
}
Ok(out)
}
fn snapshot_scan_via_btree<T: Document>(
_snap: &obj_core::ReaderSnapshot<FileHandle>,
env: &obj_core::TxnEnv<FileHandle>,
primary_root: u64,
collection_id: u32,
) -> Result<Vec<(Id, T)>> {
let mut pager = lock_pager(env)?;
scan_all::<T>(&mut pager, primary_root, collection_id)
}
fn index_key_for_lookup(
descriptor: &obj_core::IndexDescriptor,
fields: &[obj_core::codec::Dynamic],
) -> Result<obj_core::index::EncodedIndexKey> {
obj_core::index::encode_index_key_parts(descriptor.kind, &descriptor.key_paths, fields)
}
fn encode_dynamic_bound(
b: std::ops::Bound<&obj_core::codec::Dynamic>,
) -> Result<std::ops::Bound<Vec<u8>>> {
match b {
std::ops::Bound::Included(v) => Ok(std::ops::Bound::Included(encode_bound_value(v)?)),
std::ops::Bound::Excluded(v) => Ok(std::ops::Bound::Excluded(encode_bound_value(v)?)),
std::ops::Bound::Unbounded => Ok(std::ops::Bound::Unbounded),
}
}
fn encode_bound_value(v: &obj_core::codec::Dynamic) -> Result<Vec<u8>> {
match v {
obj_core::codec::Dynamic::Seq(fields) => {
let mut out = vec![obj_core::index::COMPOSITE_TAG];
for f in fields {
out.extend_from_slice(obj_core::index::encode_field(f)?.as_bytes());
}
Ok(out)
}
_ => Ok(obj_core::index::encode_field(v)?.into_bytes()),
}
}
fn append_max_id(prefix: &[u8]) -> Vec<u8> {
let mut out = Vec::with_capacity(prefix.len() + 8);
out.extend_from_slice(prefix);
out.extend_from_slice(&u64::MAX.to_be_bytes());
out
}
fn strip_id_suffix(full_key: &[u8], kind: obj_core::IndexKind) -> Vec<u8> {
match kind {
obj_core::IndexKind::Unique => full_key.to_vec(),
_ if full_key.len() >= 8 => full_key[..full_key.len() - 8].to_vec(),
_ => full_key.to_vec(),
}
}
fn id_from_index_entry(full_key: &[u8], value: &[u8], kind: obj_core::IndexKind) -> Result<u64> {
let bytes: &[u8] = if kind == obj_core::IndexKind::Unique {
value
} else {
if full_key.len() < 8 {
return Err(Error::Corruption { page_id: 0 });
}
&full_key[full_key.len() - 8..]
};
let id = Id::from_be_bytes(bytes).ok_or(Error::Corruption { page_id: 0 })?;
Ok(id.get())
}
enum InitialResume {
Included(Vec<u8>),
Excluded(Vec<u8>),
Unbounded,
}
enum StagedEntry<T> {
Pending(Vec<u8>, Id),
Resolved(Vec<u8>, T),
}
pub struct IterIndexRange<'a, T: Document> {
coll: &'a Collection<'a, T>,
descriptor_kind: obj_core::IndexKind,
index_root: u64,
initial_resume: Option<InitialResume>,
last_full_key: Option<Vec<u8>>,
end_bound: Bound<Vec<u8>>,
buffer: VecDeque<Result<StagedEntry<T>>>,
emitted_ids: HashSet<u64>,
finished: bool,
}
impl<T: Document> std::fmt::Debug for IterIndexRange<'_, T> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("IterIndexRange")
.field("descriptor_kind", &self.descriptor_kind)
.field("index_root", &self.index_root)
.field("buffer_len", &self.buffer.len())
.field("emitted_ids_len", &self.emitted_ids.len())
.field("finished", &self.finished)
.finish_non_exhaustive()
}
}
impl<T: Document> IterIndexRange<'_, T> {
fn refill(&mut self) -> Result<()> {
let env = match &self.coll.mode {
CollectionMode::Write(w) => w.env,
CollectionMode::Read(r) => r.env,
CollectionMode::Lazy(_) => {
return Err(Error::ReadOnly {
operation: "internal: iter_range refill in Lazy mode",
});
}
};
let root_pid = PageId::new(self.index_root)
.ok_or(Error::InvalidArgument("index root_page_id is zero"))?;
let start = self.next_start_bound();
let end = clone_bound_ref(&self.end_bound);
let mut pager = lock_pager(env)?;
let tree = BTree::<FileHandle>::open(&pager, root_pid)?;
let iter = tree.range(&mut pager, (start, end))?;
let mut staged: VecDeque<Result<StagedEntry<T>>> =
VecDeque::with_capacity(ITER_INDEX_RANGE_BATCH);
let mut last_full: Option<Vec<u8>> = None;
let mut consumed: usize = 0;
for step in iter {
if consumed >= ITER_INDEX_RANGE_BATCH {
break;
}
consumed = consumed
.checked_add(1)
.ok_or(Error::BTreeInvariantViolated {
reason: "iter_range batch counter overflow",
})?;
self.stage_one(&mut staged, &mut last_full, step);
}
if consumed < ITER_INDEX_RANGE_BATCH {
self.finished = true;
}
drop(pager);
self.buffer.extend(staged);
if let Some(k) = last_full {
self.last_full_key = Some(k);
}
Ok(())
}
fn stage_one(
&mut self,
staged: &mut VecDeque<Result<StagedEntry<T>>>,
last_full: &mut Option<Vec<u8>>,
step: Result<(Vec<u8>, Vec<u8>)>,
) {
let (full_key, id_bytes) = match step {
Ok(kv) => kv,
Err(e) => {
staged.push_back(Err(e));
return;
}
};
*last_full = Some(full_key.clone());
let Some(id) = Id::from_be_bytes(&id_bytes) else {
staged.push_back(Err(Error::Corruption { page_id: 0 }));
return;
};
if self.descriptor_kind == obj_core::IndexKind::Each && !self.emitted_ids.insert(id.get()) {
return;
}
let user_key = strip_id_suffix(&full_key, self.descriptor_kind);
staged.push_back(Ok(StagedEntry::Pending(user_key, id)));
}
fn next_start_bound(&mut self) -> Bound<Vec<u8>> {
if let Some(initial) = self.initial_resume.take() {
return match initial {
InitialResume::Included(k) => Bound::Included(k),
InitialResume::Excluded(k) => Bound::Excluded(k),
InitialResume::Unbounded => Bound::Unbounded,
};
}
match &self.last_full_key {
Some(k) => Bound::Excluded(k.clone()),
None => Bound::Unbounded,
}
}
}
impl<T: Document> Iterator for IterIndexRange<'_, T> {
type Item = Result<(Vec<u8>, T)>;
fn next(&mut self) -> Option<Self::Item> {
loop {
if let Some(staged) = self.buffer.pop_front() {
return Some(self.resolve_one(staged));
}
if self.finished {
return None;
}
if let Err(e) = self.refill() {
self.finished = true;
return Some(Err(e));
}
}
}
}
impl<T: Document> IterIndexRange<'_, T> {
fn resolve_one(&self, staged: Result<StagedEntry<T>>) -> Result<(Vec<u8>, T)> {
match staged? {
StagedEntry::Pending(user_key, id) => match self.coll.get(id)? {
Some(doc) => Ok((user_key, doc)),
None => Err(Error::Corruption { page_id: 0 }),
},
StagedEntry::Resolved(user_key, doc) => Ok((user_key, doc)),
}
}
}
fn clone_bound_ref(b: &Bound<Vec<u8>>) -> Bound<Vec<u8>> {
match b {
Bound::Included(v) => Bound::Included(v.clone()),
Bound::Excluded(v) => Bound::Excluded(v.clone()),
Bound::Unbounded => Bound::Unbounded,
}
}