#[cfg(test)]
mod tests;
use crate::{
db::{
Db,
data::{DataKey, DataRow, DataStore, RawRow},
direction::Direction,
executor::{ExecutorError, OrderedKeyStream, saturating_row_len},
predicate::MissingRowPolicy,
registry::StoreHandle,
},
error::InternalError,
traits::{CanisterKind, EntityKind, EntityValue, Path},
};
#[cfg(test)]
use crate::{types::EntityTag, value::StorageKey};
#[cfg(any(test, feature = "diagnostics"))]
use std::cell::RefCell;
use std::ops::Bound;
#[derive(Clone, Copy)]
pub(in crate::db) struct Context<'a, E: EntityKind + EntityValue> {
pub(in crate::db::executor) db: &'a Db<E::Canister>,
}
impl<E> crate::db::executor::pipeline::contracts::LoadExecutor<E>
where
E: EntityKind + EntityValue,
{
#[must_use]
pub(in crate::db) const fn new(db: Db<E::Canister>, debug: bool) -> Self {
Self { db, debug }
}
}
pub(in crate::db) trait StoreLookup {
fn try_get_store(&self, path: &str) -> Result<StoreHandle, InternalError>;
}
impl<C> StoreLookup for Db<C>
where
C: CanisterKind,
{
fn try_get_store(&self, path: &str) -> Result<StoreHandle, InternalError> {
self.with_store_registry(|registry| registry.try_get_store(path))
}
}
#[derive(Clone, Copy)]
pub(in crate::db) struct StoreResolver<'a> {
lookup: &'a dyn StoreLookup,
}
impl<'a> StoreResolver<'a> {
#[must_use]
pub(in crate::db) const fn new(lookup: &'a dyn StoreLookup) -> Self {
Self { lookup }
}
pub(in crate::db) fn try_get_store(self, path: &str) -> Result<StoreHandle, InternalError> {
self.lookup.try_get_store(path)
}
}
#[cfg(any(test, feature = "diagnostics"))]
#[cfg_attr(all(test, not(feature = "diagnostics")), allow(unreachable_pub))]
#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
pub struct RowCheckMetrics {
pub index_entries_scanned: u64,
pub index_membership_single_key_entries: u64,
pub index_membership_multi_key_entries: u64,
pub index_membership_keys_decoded: u64,
pub row_check_covering_candidates_seen: u64,
pub row_check_rows_emitted: u64,
pub row_presence_probe_count: u64,
pub row_presence_probe_hits: u64,
pub row_presence_probe_misses: u64,
pub row_presence_probe_borrowed_data_store_count: u64,
pub row_presence_probe_store_handle_count: u64,
pub row_presence_key_to_raw_encodes: u64,
}
#[cfg(any(test, feature = "diagnostics"))]
std::thread_local! {
static ROW_CHECK_METRICS: RefCell<Option<RowCheckMetrics>> = const {
RefCell::new(None)
};
}
#[cfg(any(test, feature = "diagnostics"))]
fn update_row_check_metrics(update: impl FnOnce(&mut RowCheckMetrics)) {
ROW_CHECK_METRICS.with(|metrics| {
let mut metrics = metrics.borrow_mut();
let Some(metrics) = metrics.as_mut() else {
return;
};
update(metrics);
});
}
enum RowPresenceProbeSource {
BorrowedDataStore,
#[cfg(test)]
StoreHandle,
}
#[cfg(test)]
#[derive(Clone, Copy)]
pub(in crate::db::executor) struct FusedSecondaryCoveringAuthority<'a> {
data: &'a DataStore,
entity_tag: EntityTag,
consistency: MissingRowPolicy,
}
#[cfg(test)]
impl<'a> FusedSecondaryCoveringAuthority<'a> {
#[must_use]
pub(in crate::db::executor) const fn new(
data: &'a DataStore,
entity_tag: EntityTag,
consistency: MissingRowPolicy,
) -> Self {
Self {
data,
entity_tag,
consistency,
}
}
pub(in crate::db::executor) fn admits_storage_key(
self,
storage_key: StorageKey,
) -> Result<bool, InternalError> {
record_row_check_covering_candidate_seen();
record_row_presence_probe_source(RowPresenceProbeSource::BorrowedDataStore);
record_row_presence_key_to_raw_encode();
let raw_key = DataKey::raw_from_parts(self.entity_tag, storage_key)?;
let row_exists = self.data.contains(&raw_key);
record_row_presence_probe_result(row_exists);
match self.consistency {
MissingRowPolicy::Error => {
if row_exists {
Ok(true)
} else {
Err(
ExecutorError::missing_row(&DataKey::new(self.entity_tag, storage_key))
.into(),
)
}
}
MissingRowPolicy::Ignore => Ok(row_exists),
}
}
}
#[cfg(any(test, feature = "diagnostics"))]
pub(in crate::db) fn record_row_check_index_entry_scanned() {
update_row_check_metrics(|metrics| {
metrics.index_entries_scanned = metrics.index_entries_scanned.saturating_add(1);
});
}
#[cfg(not(any(test, feature = "diagnostics")))]
pub(in crate::db) const fn record_row_check_index_entry_scanned() {}
#[cfg(any(test, feature = "diagnostics"))]
pub(in crate::db) fn record_row_check_index_membership_single_key_entry() {
update_row_check_metrics(|metrics| {
metrics.index_membership_single_key_entries = metrics
.index_membership_single_key_entries
.saturating_add(1);
});
}
#[cfg(not(any(test, feature = "diagnostics")))]
pub(in crate::db) const fn record_row_check_index_membership_single_key_entry() {}
#[cfg(any(test, feature = "diagnostics"))]
pub(in crate::db) fn record_row_check_index_membership_multi_key_entry() {
update_row_check_metrics(|metrics| {
metrics.index_membership_multi_key_entries =
metrics.index_membership_multi_key_entries.saturating_add(1);
});
}
#[cfg(not(any(test, feature = "diagnostics")))]
pub(in crate::db) const fn record_row_check_index_membership_multi_key_entry() {}
#[cfg(any(test, feature = "diagnostics"))]
pub(in crate::db) fn record_row_check_index_membership_key_decoded() {
update_row_check_metrics(|metrics| {
metrics.index_membership_keys_decoded =
metrics.index_membership_keys_decoded.saturating_add(1);
});
}
#[cfg(not(any(test, feature = "diagnostics")))]
pub(in crate::db) const fn record_row_check_index_membership_key_decoded() {}
#[cfg(any(test, feature = "diagnostics"))]
pub(in crate::db) fn record_row_check_covering_candidate_seen() {
update_row_check_metrics(|metrics| {
metrics.row_check_covering_candidates_seen =
metrics.row_check_covering_candidates_seen.saturating_add(1);
});
}
#[cfg(not(any(test, feature = "diagnostics")))]
pub(in crate::db) const fn record_row_check_covering_candidate_seen() {}
#[cfg(any(test, feature = "diagnostics"))]
pub(in crate::db) fn record_row_check_row_emitted() {
update_row_check_metrics(|metrics| {
metrics.row_check_rows_emitted = metrics.row_check_rows_emitted.saturating_add(1);
});
}
#[cfg(not(any(test, feature = "diagnostics")))]
pub(in crate::db) const fn record_row_check_row_emitted() {}
#[cfg(any(test, feature = "diagnostics"))]
fn record_row_presence_probe_source(source: RowPresenceProbeSource) {
update_row_check_metrics(|metrics| match source {
RowPresenceProbeSource::BorrowedDataStore => {
metrics.row_presence_probe_borrowed_data_store_count = metrics
.row_presence_probe_borrowed_data_store_count
.saturating_add(1);
}
#[cfg(test)]
RowPresenceProbeSource::StoreHandle => {
metrics.row_presence_probe_store_handle_count = metrics
.row_presence_probe_store_handle_count
.saturating_add(1);
}
});
}
#[cfg(not(any(test, feature = "diagnostics")))]
const fn record_row_presence_probe_source(_source: RowPresenceProbeSource) {}
#[cfg(any(test, feature = "diagnostics"))]
fn record_row_presence_key_to_raw_encode() {
update_row_check_metrics(|metrics| {
metrics.row_presence_key_to_raw_encodes =
metrics.row_presence_key_to_raw_encodes.saturating_add(1);
});
}
#[cfg(not(any(test, feature = "diagnostics")))]
const fn record_row_presence_key_to_raw_encode() {}
#[cfg(any(test, feature = "diagnostics"))]
fn record_row_presence_probe_result(row_exists: bool) {
update_row_check_metrics(|metrics| {
metrics.row_presence_probe_count = metrics.row_presence_probe_count.saturating_add(1);
if row_exists {
metrics.row_presence_probe_hits = metrics.row_presence_probe_hits.saturating_add(1);
} else {
metrics.row_presence_probe_misses = metrics.row_presence_probe_misses.saturating_add(1);
}
});
}
#[cfg(not(any(test, feature = "diagnostics")))]
const fn record_row_presence_probe_result(_row_exists: bool) {}
#[cfg(any(test, feature = "diagnostics"))]
#[cfg_attr(all(test, not(feature = "diagnostics")), allow(unreachable_pub))]
pub fn with_row_check_metrics<T>(f: impl FnOnce() -> T) -> (T, RowCheckMetrics) {
ROW_CHECK_METRICS.with(|metrics| {
debug_assert!(
metrics.borrow().is_none(),
"row_check metrics captures should not nest"
);
*metrics.borrow_mut() = Some(RowCheckMetrics::default());
});
let result = f();
let metrics = ROW_CHECK_METRICS.with(|metrics| metrics.borrow_mut().take().unwrap_or_default());
(result, metrics)
}
impl<'a, E> Context<'a, E>
where
E: EntityKind + EntityValue,
{
#[must_use]
pub(in crate::db) const fn new(db: &'a Db<E::Canister>) -> Self {
Self { db }
}
pub(in crate::db) fn with_store<R>(
&self,
f: impl FnOnce(&DataStore) -> R,
) -> Result<R, InternalError> {
self.db.with_store_registry(|reg| {
reg.try_get_store(E::Store::PATH)
.map(|store| store.with_data(f))
})
}
pub(in crate::db::executor) fn structural_store(&self) -> Result<StoreHandle, InternalError> {
self.db
.with_store_registry(|reg| reg.try_get_store(E::Store::PATH))
}
pub(in crate::db) fn read(&self, key: &DataKey) -> Result<RawRow, InternalError> {
self.with_store(|s| {
let raw = key.to_raw()?;
s.get(&raw)
.ok_or_else(|| InternalError::store_not_found(key.to_string()))
})?
}
}
pub(in crate::db::executor) fn read_row_with_consistency_from_store(
store: StoreHandle,
key: &DataKey,
consistency: MissingRowPolicy,
) -> Result<Option<RawRow>, InternalError> {
let read_row = |key: &DataKey| -> Result<Option<RawRow>, InternalError> {
let raw = key.to_raw()?;
Ok(store.with_data(|data| data.get(&raw)))
};
match consistency {
MissingRowPolicy::Error => match read_row(key)? {
Some(row) => Ok(Some(row)),
None => Err(ExecutorError::missing_row(key).into()),
},
MissingRowPolicy::Ignore => read_row(key),
}
}
#[cfg(test)]
pub(in crate::db::executor) fn read_row_presence_with_consistency_from_store(
store: StoreHandle,
key: &DataKey,
consistency: MissingRowPolicy,
) -> Result<bool, InternalError> {
store.with_data(|data| {
read_row_presence_with_consistency(
data,
key,
consistency,
RowPresenceProbeSource::StoreHandle,
)
})
}
pub(in crate::db::executor) fn read_row_presence_with_consistency_from_data_store(
data: &DataStore,
key: &DataKey,
consistency: MissingRowPolicy,
) -> Result<bool, InternalError> {
read_row_presence_with_consistency(
data,
key,
consistency,
RowPresenceProbeSource::BorrowedDataStore,
)
}
fn read_row_presence_with_consistency(
data: &DataStore,
key: &DataKey,
consistency: MissingRowPolicy,
source: RowPresenceProbeSource,
) -> Result<bool, InternalError> {
let row_exists = probe_row_presence(data, key, source)?;
match consistency {
MissingRowPolicy::Error => {
if row_exists {
Ok(true)
} else {
Err(ExecutorError::missing_row(key).into())
}
}
MissingRowPolicy::Ignore => Ok(row_exists),
}
}
fn probe_row_presence(
data: &DataStore,
key: &DataKey,
source: RowPresenceProbeSource,
) -> Result<bool, InternalError> {
record_row_presence_probe_source(source);
record_row_presence_key_to_raw_encode();
let raw = key.to_raw()?;
let row_exists = data.contains(&raw);
record_row_presence_probe_result(row_exists);
Ok(row_exists)
}
pub(in crate::db::executor) fn read_data_row_with_consistency_from_store(
store: StoreHandle,
key: &DataKey,
consistency: MissingRowPolicy,
) -> Result<Option<DataRow>, InternalError> {
let Some(row) = read_row_with_consistency_from_store(store, key, consistency)? else {
return Ok(None);
};
Ok(Some((key.clone(), row)))
}
pub(in crate::db::executor) fn sum_row_payload_bytes_full_scan_window_with_store(
store: StoreHandle,
direction: Direction,
offset: usize,
limit: Option<usize>,
) -> u64 {
store.with_data(|store| match direction {
Direction::Asc => sum_payload_bytes_from_row_lengths(
store.iter().map(|entry| entry.value().len()),
offset,
limit,
),
Direction::Desc => sum_payload_bytes_from_row_lengths(
store.iter().rev().map(|entry| entry.value().len()),
offset,
limit,
),
})
}
pub(in crate::db::executor) fn sum_row_payload_bytes_key_range_window_with_store(
store: StoreHandle,
start: &DataKey,
end: &DataKey,
direction: Direction,
offset: usize,
limit: Option<usize>,
) -> Result<u64, InternalError> {
let start_raw = start.to_raw()?;
let end_raw = end.to_raw()?;
let total = store.with_data(|store| match direction {
Direction::Asc => sum_payload_bytes_from_row_lengths(
store
.range((Bound::Included(start_raw), Bound::Included(end_raw)))
.map(|entry| entry.value().len()),
offset,
limit,
),
Direction::Desc => sum_payload_bytes_from_row_lengths(
store
.range((Bound::Included(start_raw), Bound::Included(end_raw)))
.rev()
.map(|entry| entry.value().len()),
offset,
limit,
),
});
Ok(total)
}
pub(in crate::db::executor) fn sum_row_payload_bytes_from_ordered_key_stream_with_store<S>(
store: StoreHandle,
key_stream: &mut S,
consistency: MissingRowPolicy,
offset: usize,
limit: Option<usize>,
) -> Result<u64, InternalError>
where
S: OrderedKeyStream + ?Sized,
{
sum_row_payload_bytes_from_ordered_key_stream_shared(
key_stream,
&mut |key| read_row_with_consistency_from_store(store, key, consistency),
offset,
limit,
)
}
const fn payload_window_limit_exhausted(limit_remaining: Option<usize>) -> bool {
matches!(limit_remaining, Some(0))
}
const fn payload_window_accept_row(
offset_remaining: &mut usize,
limit_remaining: &mut Option<usize>,
) -> bool {
if *offset_remaining > 0 {
*offset_remaining = offset_remaining.saturating_sub(1);
return false;
}
if let Some(remaining) = limit_remaining.as_mut() {
if *remaining == 0 {
return false;
}
*remaining = remaining.saturating_sub(1);
}
true
}
fn sum_payload_bytes_from_row_lengths(
row_lengths: impl Iterator<Item = usize>,
offset: usize,
limit: Option<usize>,
) -> u64 {
let mut total = 0u64;
let mut offset_remaining = offset;
let mut limit_remaining = limit;
for row_len in row_lengths {
if payload_window_limit_exhausted(limit_remaining) {
break;
}
if !payload_window_accept_row(&mut offset_remaining, &mut limit_remaining) {
continue;
}
total = total.saturating_add(saturating_row_len(row_len));
}
total
}
fn sum_row_payload_bytes_from_ordered_key_stream_shared<S, F>(
key_stream: &mut S,
read_row: &mut F,
offset: usize,
limit: Option<usize>,
) -> Result<u64, InternalError>
where
S: OrderedKeyStream + ?Sized,
F: FnMut(&DataKey) -> Result<Option<RawRow>, InternalError>,
{
let mut total = 0u64;
let mut offset_remaining = offset;
let mut limit_remaining = limit;
while let Some(key) = key_stream.next_key()? {
if payload_window_limit_exhausted(limit_remaining) {
break;
}
let Some(row) = read_row(&key)? else {
continue;
};
if !payload_window_accept_row(&mut offset_remaining, &mut limit_remaining) {
continue;
}
total = total.saturating_add(saturating_row_len(row.len()));
}
Ok(total)
}