use crate::{
db::{
Db,
commit::{
CommitApplyGuard, CommitGuard, CommitMarker, CommitRowOp, PreparedIndexDeltaKind,
PreparedRowCommitOp, begin_commit, finish_commit,
prepare_row_commit_for_entity_with_readers, rollback_prepared_row_ops_reverse,
snapshot_row_rollback,
},
data::{DataKey, RawDataKey, RawRow, StorageKey},
index::{
IndexEntryReader, IndexStore, PrimaryRowReader, RawIndexEntry, RawIndexKey,
SealedIndexEntryReader, SealedPrimaryRowReader, SealedStructuralIndexEntryReader,
SealedStructuralPrimaryRowReader, StructuralIndexEntryReader,
StructuralPrimaryRowReader, key_within_envelope,
},
},
error::InternalError,
metrics::sink::{MetricsEvent, record},
model::index::IndexModel,
traits::{CanisterKind, EntityKind, EntityValue, Path},
};
use std::{cell::RefCell, collections::BTreeMap, ops::Bound, ptr, thread::LocalKey};
pub(in crate::db::executor) struct PreparedRowOpDelta {
pub(in crate::db::executor) index_inserts: usize,
pub(in crate::db::executor) index_removes: usize,
pub(in crate::db::executor) reverse_index_inserts: usize,
pub(in crate::db::executor) reverse_index_removes: usize,
}
pub(in crate::db::executor) struct OpenCommitWindow {
pub(in crate::db::executor) commit: CommitGuard,
pub(in crate::db::executor) prepared_row_ops: Vec<PreparedRowCommitOp>,
pub(in crate::db::executor) index_store_guards: Vec<IndexStoreGenerationGuard>,
pub(in crate::db::executor) delta: PreparedRowOpDelta,
}
pub(in crate::db::executor) struct IndexStoreGenerationGuard {
store: &'static LocalKey<RefCell<IndexStore>>,
expected_generation: u64,
}
struct PreflightStoreOverlay<'a, C: CanisterKind> {
db: &'a Db<C>,
data_overrides: BTreeMap<RawDataKey, Option<RawRow>>,
index_overrides: BTreeMap<usize, BTreeMap<RawIndexKey, Option<RawIndexEntry>>>,
}
impl<'a, C: CanisterKind> PreflightStoreOverlay<'a, C> {
const fn new(db: &'a Db<C>) -> Self {
Self {
db,
data_overrides: BTreeMap::new(),
index_overrides: BTreeMap::new(),
}
}
fn stage_prepared_row_op(&mut self, row_op: &PreparedRowCommitOp) {
for index_op in &row_op.index_ops {
let store_id = index_store_id(index_op.store);
self.index_overrides
.entry(store_id)
.or_default()
.insert(index_op.key.clone(), index_op.value.clone());
}
self.data_overrides.insert(
row_op.data_key,
row_op
.data_value
.as_ref()
.map(|row| row.as_raw_row().clone()),
);
}
}
impl<C: CanisterKind> StructuralPrimaryRowReader for PreflightStoreOverlay<'_, C> {
fn read_primary_row_structural(&self, key: &DataKey) -> Result<Option<RawRow>, InternalError> {
let raw_key = key.to_raw()?;
if let Some(override_row) = self.data_overrides.get(&raw_key) {
return Ok(override_row.clone());
}
let hooks = self.db.runtime_hook_for_entity_tag(key.entity_tag())?;
let store = self.db.recovered_store(hooks.store_path)?;
Ok(store.with_data(|data_store| data_store.get(&raw_key)))
}
}
impl<C: CanisterKind> SealedStructuralPrimaryRowReader for PreflightStoreOverlay<'_, C> {}
impl<E> PrimaryRowReader<E> for PreflightStoreOverlay<'_, E::Canister>
where
E: EntityKind + EntityValue,
{
fn read_primary_row(&self, key: &DataKey) -> Result<Option<RawRow>, InternalError> {
let raw_key = key.to_raw()?;
if let Some(override_row) = self.data_overrides.get(&raw_key) {
return Ok(override_row.clone());
}
let store = self.db.recovered_store(E::Store::PATH)?;
Ok(store.with_data(|data_store| data_store.get(&raw_key)))
}
}
impl<E> SealedPrimaryRowReader<E> for PreflightStoreOverlay<'_, E::Canister> where
E: EntityKind + EntityValue
{
}
impl<C: CanisterKind> StructuralIndexEntryReader for PreflightStoreOverlay<'_, C> {
fn read_index_entry_structural(
&self,
store: &'static LocalKey<RefCell<IndexStore>>,
key: &RawIndexKey,
) -> Result<Option<RawIndexEntry>, InternalError> {
let store_id = index_store_id(store);
if let Some(store_overrides) = self.index_overrides.get(&store_id)
&& let Some(override_entry) = store_overrides.get(key)
{
return Ok(override_entry.clone());
}
Ok(store.with_borrow(|index_store| index_store.get(key)))
}
fn read_index_keys_in_raw_range_structural(
&self,
entity_path: &'static str,
_entity_tag: crate::types::EntityTag,
store: &'static LocalKey<RefCell<IndexStore>>,
index: &IndexModel,
bounds: (&Bound<RawIndexKey>, &Bound<RawIndexKey>),
limit: usize,
) -> Result<Vec<StorageKey>, InternalError> {
let mut effective_entries = store
.with_borrow(IndexStore::entries)
.into_iter()
.filter(|(raw_key, _)| key_within_bounds(raw_key, bounds))
.collect::<BTreeMap<RawIndexKey, RawIndexEntry>>();
let store_id = index_store_id(store);
if let Some(store_overrides) = self.index_overrides.get(&store_id) {
for (raw_key, raw_entry) in store_overrides {
if !key_within_bounds(raw_key, bounds) {
continue;
}
if let Some(raw_entry) = raw_entry {
effective_entries.insert(raw_key.clone(), raw_entry.clone());
} else {
effective_entries.remove(raw_key);
}
}
}
let mut out = Vec::new();
for (_, raw_entry) in effective_entries {
let entry = raw_entry.try_decode().map_err(|err| {
InternalError::index_plan_index_corruption(format!(
"index corrupted: {} ({}) -> {}",
entity_path,
index.fields().join(", "),
err
))
})?;
for key in entry.iter_ids() {
out.push(key);
if out.len() >= limit {
return Ok(out);
}
}
}
Ok(out)
}
}
impl<C: CanisterKind> SealedStructuralIndexEntryReader for PreflightStoreOverlay<'_, C> {}
impl<E> IndexEntryReader<E> for PreflightStoreOverlay<'_, E::Canister>
where
E: EntityKind + EntityValue,
{
fn read_index_entry(
&self,
store: &'static LocalKey<RefCell<IndexStore>>,
key: &RawIndexKey,
) -> Result<Option<RawIndexEntry>, InternalError> {
self.read_index_entry_structural(store, key)
}
fn read_index_keys_in_raw_range(
&self,
store: &'static LocalKey<RefCell<IndexStore>>,
index: &IndexModel,
bounds: (&Bound<RawIndexKey>, &Bound<RawIndexKey>),
limit: usize,
) -> Result<Vec<StorageKey>, InternalError> {
self.read_index_keys_in_raw_range_structural(
E::PATH,
E::ENTITY_TAG,
store,
index,
bounds,
limit,
)
}
}
impl<E> SealedIndexEntryReader<E> for PreflightStoreOverlay<'_, E::Canister> where
E: EntityKind + EntityValue
{
}
#[must_use]
pub(in crate::db::executor) fn summarize_prepared_row_ops(
prepared_row_ops: &[PreparedRowCommitOp],
) -> PreparedRowOpDelta {
let mut summary = PreparedRowOpDelta {
index_inserts: 0,
index_removes: 0,
reverse_index_inserts: 0,
reverse_index_removes: 0,
};
for row_op in prepared_row_ops {
for index_op in &row_op.index_ops {
record_prepared_index_delta(&mut summary, index_op.delta_kind);
}
}
summary
}
const fn record_prepared_index_delta(
summary: &mut PreparedRowOpDelta,
delta_kind: PreparedIndexDeltaKind,
) {
let (index_inserts, index_removes, reverse_index_inserts, reverse_index_removes) =
delta_kind.counter_increments();
summary.index_inserts = summary.index_inserts.saturating_add(index_inserts);
summary.index_removes = summary.index_removes.saturating_add(index_removes);
summary.reverse_index_inserts = summary
.reverse_index_inserts
.saturating_add(reverse_index_inserts);
summary.reverse_index_removes = summary
.reverse_index_removes
.saturating_add(reverse_index_removes);
}
pub(in crate::db::executor) fn emit_index_delta_metrics<E: EntityKind>(delta: &PreparedRowOpDelta) {
emit_index_delta_metrics_for_path(E::PATH, delta);
}
pub(in crate::db::executor) fn preflight_prepare_row_ops<E: EntityKind + EntityValue>(
db: &Db<E::Canister>,
row_ops: &[CommitRowOp],
) -> Result<Vec<PreparedRowCommitOp>, InternalError> {
let mut prepared = Vec::with_capacity(row_ops.len());
let mut overlay = PreflightStoreOverlay::<E::Canister>::new(db);
for row_op in row_ops {
let row =
prepare_row_commit_for_entity_with_readers::<E, _, _>(db, row_op, &overlay, &overlay)?;
overlay.stage_prepared_row_op(&row);
prepared.push(row);
}
Ok(prepared)
}
pub(in crate::db::executor) fn preflight_prepare_row_ops_structural<C: CanisterKind>(
db: &Db<C>,
row_ops: &[CommitRowOp],
) -> Result<Vec<PreparedRowCommitOp>, InternalError> {
let mut prepared = Vec::with_capacity(row_ops.len());
let mut overlay = PreflightStoreOverlay::<C>::new(db);
for row_op in row_ops {
let row = db.prepare_row_commit_op_with_readers(row_op, &overlay, &overlay)?;
overlay.stage_prepared_row_op(&row);
prepared.push(row);
}
Ok(prepared)
}
pub(in crate::db::executor) fn open_commit_window<E: EntityKind + EntityValue>(
db: &Db<E::Canister>,
row_ops: Vec<CommitRowOp>,
) -> Result<OpenCommitWindow, InternalError> {
let prepared_row_ops = preflight_prepare_row_ops::<E>(db, &row_ops)?;
let index_store_guards = snapshot_index_store_generations(&prepared_row_ops);
let delta = summarize_prepared_row_ops(&prepared_row_ops);
let marker = CommitMarker::new(row_ops)?;
let commit = begin_commit(marker)?;
Ok(OpenCommitWindow {
commit,
prepared_row_ops,
index_store_guards,
delta,
})
}
pub(in crate::db::executor) fn open_commit_window_structural<C: CanisterKind>(
db: &Db<C>,
row_ops: Vec<CommitRowOp>,
) -> Result<OpenCommitWindow, InternalError> {
let prepared_row_ops = preflight_prepare_row_ops_structural(db, &row_ops)?;
let index_store_guards = snapshot_index_store_generations(&prepared_row_ops);
let delta = summarize_prepared_row_ops(&prepared_row_ops);
let marker = CommitMarker::new(row_ops)?;
let commit = begin_commit(marker)?;
Ok(OpenCommitWindow {
commit,
prepared_row_ops,
index_store_guards,
delta,
})
}
pub(in crate::db::executor) fn apply_prepared_row_ops(
commit: CommitGuard,
apply_phase: &'static str,
prepared_row_ops: Vec<PreparedRowCommitOp>,
index_store_guards: Vec<IndexStoreGenerationGuard>,
on_index_applied: impl FnOnce(),
on_data_applied: impl FnOnce(),
) -> Result<(), InternalError> {
finish_commit(commit, |guard| {
let mut apply_guard = CommitApplyGuard::new(apply_phase);
let _ = guard;
verify_index_store_generations(index_store_guards.as_slice())?;
let mut rollback = Vec::with_capacity(prepared_row_ops.len());
for row_op in &prepared_row_ops {
rollback.push(snapshot_row_rollback(row_op));
}
apply_guard.record_rollback(move || rollback_prepared_row_ops_reverse(rollback));
for row_op in prepared_row_ops {
row_op.apply();
}
on_index_applied();
on_data_applied();
apply_guard.finish()?;
Ok(())
})
}
pub(in crate::db::executor) fn commit_row_ops_with_window<E: EntityKind + EntityValue>(
db: &Db<E::Canister>,
row_ops: Vec<CommitRowOp>,
apply_phase: &'static str,
on_index_applied: impl FnOnce(&PreparedRowOpDelta),
on_data_applied: impl FnOnce(),
) -> Result<(), InternalError> {
let OpenCommitWindow {
commit,
prepared_row_ops,
index_store_guards,
delta,
} = open_commit_window::<E>(db, row_ops)?;
apply_prepared_row_ops(
commit,
apply_phase,
prepared_row_ops,
index_store_guards,
|| on_index_applied(&delta),
on_data_applied,
)?;
Ok(())
}
pub(in crate::db::executor) fn commit_save_row_ops_with_window<E: EntityKind + EntityValue>(
db: &Db<E::Canister>,
row_ops: Vec<CommitRowOp>,
apply_phase: &'static str,
on_data_applied: impl FnOnce(),
) -> Result<(), InternalError> {
commit_row_ops_with_window::<E>(
db,
row_ops,
apply_phase,
|delta| emit_index_delta_metrics::<E>(delta),
on_data_applied,
)
}
pub(in crate::db::executor) fn commit_delete_row_ops_with_window<E: EntityKind + EntityValue>(
db: &Db<E::Canister>,
row_ops: Vec<CommitRowOp>,
apply_phase: &'static str,
) -> Result<(), InternalError> {
commit_row_ops_with_window::<E>(
db,
row_ops,
apply_phase,
|delta| emit_index_delta_metrics::<E>(delta),
|| {},
)
}
pub(in crate::db::executor) fn commit_delete_row_ops_with_window_for_path<C: CanisterKind>(
db: &Db<C>,
entity_path: &'static str,
row_ops: Vec<CommitRowOp>,
apply_phase: &'static str,
) -> Result<(), InternalError> {
let OpenCommitWindow {
commit,
prepared_row_ops,
index_store_guards,
delta,
} = open_commit_window_structural(db, row_ops)?;
apply_prepared_row_ops(
commit,
apply_phase,
prepared_row_ops,
index_store_guards,
|| {
emit_index_delta_metrics_for_path(
entity_path,
&PreparedRowOpDelta {
index_inserts: 0,
index_removes: delta.index_removes,
reverse_index_inserts: 0,
reverse_index_removes: delta.reverse_index_removes,
},
);
},
|| {},
)?;
Ok(())
}
fn snapshot_index_store_generations(
prepared_row_ops: &[PreparedRowCommitOp],
) -> Vec<IndexStoreGenerationGuard> {
let mut guards = Vec::<IndexStoreGenerationGuard>::new();
for row_op in prepared_row_ops {
for index_op in &row_op.index_ops {
if guards
.iter()
.any(|existing| ptr::eq(existing.store, index_op.store))
{
continue;
}
let expected_generation = index_op.store.with_borrow(IndexStore::generation);
guards.push(IndexStoreGenerationGuard {
store: index_op.store,
expected_generation,
});
}
}
guards
}
fn verify_index_store_generations(
guards: &[IndexStoreGenerationGuard],
) -> Result<(), InternalError> {
for guard in guards {
let observed_generation = guard.store.with_borrow(IndexStore::generation);
if observed_generation != guard.expected_generation {
return Err(InternalError::mutation_index_store_generation_changed(
guard.expected_generation,
observed_generation,
));
}
}
Ok(())
}
fn index_store_id(store: &'static LocalKey<RefCell<IndexStore>>) -> usize {
std::ptr::from_ref::<LocalKey<RefCell<IndexStore>>>(store) as usize
}
fn emit_index_delta_metrics_for_path(entity_path: &'static str, delta: &PreparedRowOpDelta) {
record(MetricsEvent::IndexDelta {
entity_path,
inserts: u64::try_from(delta.index_inserts).unwrap_or(u64::MAX),
removes: u64::try_from(delta.index_removes).unwrap_or(u64::MAX),
});
record(MetricsEvent::ReverseIndexDelta {
entity_path,
inserts: u64::try_from(delta.reverse_index_inserts).unwrap_or(u64::MAX),
removes: u64::try_from(delta.reverse_index_removes).unwrap_or(u64::MAX),
});
}
fn key_within_bounds(
key: &RawIndexKey,
bounds: (&Bound<RawIndexKey>, &Bound<RawIndexKey>),
) -> bool {
key_within_envelope(key, bounds.0, bounds.1)
}