mod load;
use crate::{
db::{
Db,
data::{DataKey, DataRow, DataStore, RawRow},
direction::Direction,
executor::{ExecutorError, OrderedKeyStream, saturating_row_len},
predicate::MissingRowPolicy,
registry::StoreHandle,
},
error::InternalError,
traits::{CanisterKind, EntityKind, EntityValue, Path},
};
use std::ops::Bound;
#[derive(Clone, Copy)]
pub(in crate::db) struct Context<'a, E: EntityKind + EntityValue> {
pub(in crate::db::executor) db: &'a Db<E::Canister>,
}
pub(in crate::db) trait StoreLookup {
fn try_get_store(&self, path: &str) -> Result<StoreHandle, InternalError>;
}
impl<C> StoreLookup for Db<C>
where
C: CanisterKind,
{
fn try_get_store(&self, path: &str) -> Result<StoreHandle, InternalError> {
self.with_store_registry(|registry| registry.try_get_store(path))
}
}
#[derive(Clone, Copy)]
pub(in crate::db) struct StoreResolver<'a> {
lookup: &'a dyn StoreLookup,
}
impl<'a> StoreResolver<'a> {
#[must_use]
pub(in crate::db) const fn new(lookup: &'a dyn StoreLookup) -> Self {
Self { lookup }
}
pub(in crate::db) fn try_get_store(self, path: &str) -> Result<StoreHandle, InternalError> {
self.lookup.try_get_store(path)
}
}
impl<'a, E> Context<'a, E>
where
E: EntityKind + EntityValue,
{
#[must_use]
pub(in crate::db) const fn new(db: &'a Db<E::Canister>) -> Self {
Self { db }
}
pub(in crate::db) fn with_store<R>(
&self,
f: impl FnOnce(&DataStore) -> R,
) -> Result<R, InternalError> {
self.db.with_store_registry(|reg| {
reg.try_get_store(E::Store::PATH)
.map(|store| store.with_data(f))
})
}
pub(in crate::db::executor) fn structural_store(&self) -> Result<StoreHandle, InternalError> {
self.db
.with_store_registry(|reg| reg.try_get_store(E::Store::PATH))
}
pub(in crate::db) fn read(&self, key: &DataKey) -> Result<RawRow, InternalError> {
self.with_store(|s| {
let raw = key.to_raw()?;
s.get(&raw)
.ok_or_else(|| InternalError::store_not_found(key.to_string()))
})?
}
}
pub(in crate::db::executor) fn read_row_with_consistency_from_store(
store: StoreHandle,
key: &DataKey,
consistency: MissingRowPolicy,
) -> Result<Option<RawRow>, InternalError> {
let read_row = |key: &DataKey| -> Result<Option<RawRow>, InternalError> {
let raw = key.to_raw()?;
Ok(store.with_data(|data| data.get(&raw)))
};
match consistency {
MissingRowPolicy::Error => match read_row(key)? {
Some(row) => Ok(Some(row)),
None => Err(ExecutorError::missing_row(key).into()),
},
MissingRowPolicy::Ignore => read_row(key),
}
}
pub(in crate::db::executor) fn read_data_row_with_consistency_from_store(
store: StoreHandle,
key: &DataKey,
consistency: MissingRowPolicy,
) -> Result<Option<DataRow>, InternalError> {
let Some(row) = read_row_with_consistency_from_store(store, key, consistency)? else {
return Ok(None);
};
Ok(Some((key.clone(), row)))
}
pub(in crate::db::executor) fn sum_row_payload_bytes_full_scan_window_with_store(
store: StoreHandle,
direction: Direction,
offset: usize,
limit: Option<usize>,
) -> u64 {
store.with_data(|store| match direction {
Direction::Asc => sum_payload_bytes_from_row_lengths(
store.iter().map(|entry| entry.value().len()),
offset,
limit,
),
Direction::Desc => sum_payload_bytes_from_row_lengths(
store.iter().rev().map(|entry| entry.value().len()),
offset,
limit,
),
})
}
pub(in crate::db::executor) fn sum_row_payload_bytes_key_range_window_with_store(
store: StoreHandle,
start: &DataKey,
end: &DataKey,
direction: Direction,
offset: usize,
limit: Option<usize>,
) -> Result<u64, InternalError> {
let start_raw = start.to_raw()?;
let end_raw = end.to_raw()?;
let total = store.with_data(|store| match direction {
Direction::Asc => sum_payload_bytes_from_row_lengths(
store
.range((Bound::Included(start_raw), Bound::Included(end_raw)))
.map(|entry| entry.value().len()),
offset,
limit,
),
Direction::Desc => sum_payload_bytes_from_row_lengths(
store
.range((Bound::Included(start_raw), Bound::Included(end_raw)))
.rev()
.map(|entry| entry.value().len()),
offset,
limit,
),
});
Ok(total)
}
pub(in crate::db::executor) fn sum_row_payload_bytes_from_ordered_key_stream_with_store(
store: StoreHandle,
key_stream: &mut dyn OrderedKeyStream,
consistency: MissingRowPolicy,
offset: usize,
limit: Option<usize>,
) -> Result<u64, InternalError> {
sum_row_payload_bytes_from_ordered_key_stream_shared(
key_stream,
&mut |key| read_row_with_consistency_from_store(store, key, consistency),
offset,
limit,
)
}
const fn payload_window_limit_exhausted(limit_remaining: Option<usize>) -> bool {
matches!(limit_remaining, Some(0))
}
const fn payload_window_accept_row(
offset_remaining: &mut usize,
limit_remaining: &mut Option<usize>,
) -> bool {
if *offset_remaining > 0 {
*offset_remaining = offset_remaining.saturating_sub(1);
return false;
}
if let Some(remaining) = limit_remaining.as_mut() {
if *remaining == 0 {
return false;
}
*remaining = remaining.saturating_sub(1);
}
true
}
fn sum_payload_bytes_from_row_lengths(
row_lengths: impl Iterator<Item = usize>,
offset: usize,
limit: Option<usize>,
) -> u64 {
let mut total = 0u64;
let mut offset_remaining = offset;
let mut limit_remaining = limit;
for row_len in row_lengths {
if payload_window_limit_exhausted(limit_remaining) {
break;
}
if !payload_window_accept_row(&mut offset_remaining, &mut limit_remaining) {
continue;
}
total = total.saturating_add(saturating_row_len(row_len));
}
total
}
fn sum_row_payload_bytes_from_ordered_key_stream_shared(
key_stream: &mut dyn OrderedKeyStream,
read_row: &mut dyn FnMut(&DataKey) -> Result<Option<RawRow>, InternalError>,
offset: usize,
limit: Option<usize>,
) -> Result<u64, InternalError> {
let mut total = 0u64;
let mut offset_remaining = offset;
let mut limit_remaining = limit;
while let Some(key) = key_stream.next_key()? {
if payload_window_limit_exhausted(limit_remaining) {
break;
}
let Some(row) = read_row(&key)? else {
continue;
};
if !payload_window_accept_row(&mut offset_remaining, &mut limit_remaining) {
continue;
}
total = total.saturating_add(saturating_row_len(row.len()));
}
Ok(total)
}