#[cfg(feature = "sql")]
use crate::db::executor::terminal::{KernelRow, RowDecoder};
use crate::{
db::{
Db,
commit::{CommitRowOp, CommitSchemaFingerprint},
data::{DataKey, DataRow, PersistedRow, RawDataKey, RawRow, decode_raw_row_for_entity_key},
executor::{
AccessScanContinuationInput, EntityAuthority, ExecutableAccess, ExecutionKernel,
ExecutionPreparation, OrderReadableRow, PreparedExecutionPlan, TraversalRuntime,
mutation::{
commit_delete_row_ops_with_window, commit_delete_row_ops_with_window_for_path,
mutation_write_context, preflight_mutation_plan_for_authority,
},
plan_metrics::{record_plan_metrics, record_rows_scanned_for_path, set_rows_from_len},
planning::preparation::slot_map_for_model_plan,
read_data_row_with_consistency_from_store,
traversal::row_read_consistency_for_plan,
},
predicate::MissingRowPolicy,
query::plan::AccessPlannedQuery,
registry::StoreHandle,
response::{EntityResponse, Row},
schema::commit_schema_fingerprint_for_entity,
},
error::InternalError,
metrics::sink::{ExecKind, Span},
traits::{CanisterKind, EntityKind, EntityValue, Storable},
types::Id,
};
use std::collections::BTreeSet;
pub(in crate::db::executor) struct DeleteRow<E>
where
E: EntityKind,
{
pub(super) key: DataKey,
pub(super) raw: Option<RawRow>,
pub(super) entity: E,
}
impl<E: EntityKind> DeleteRow<E> {
pub(in crate::db::executor) const fn entity_ref(&self) -> &E {
&self.entity
}
}
struct DeleteExecutionAuthority {
entity: EntityAuthority,
schema_fingerprint: CommitSchemaFingerprint,
}
impl DeleteExecutionAuthority {
fn for_type<E>() -> Self
where
E: EntityKind,
{
Self {
entity: EntityAuthority::for_type::<E>(),
schema_fingerprint: commit_schema_fingerprint_for_entity::<E>(),
}
}
}
struct PreparedDeleteExecutionState {
authority: DeleteExecutionAuthority,
logical_plan: AccessPlannedQuery,
execution_preparation: ExecutionPreparation,
index_prefix_specs: Vec<crate::db::access::LoweredIndexPrefixSpec>,
index_range_specs: Vec<crate::db::access::LoweredIndexRangeSpec>,
}
impl PreparedDeleteExecutionState {
const fn consistency(&self) -> MissingRowPolicy {
row_read_consistency_for_plan(&self.logical_plan)
}
}
fn validate_delete_plan_shape<E>(plan: &PreparedExecutionPlan<E>) -> Result<(), InternalError>
where
E: EntityKind,
{
if plan.is_grouped() {
return Err(InternalError::delete_executor_grouped_unsupported());
}
if !plan.mode().is_delete() {
return Err(InternalError::delete_executor_delete_plan_required());
}
Ok(())
}
struct TypedDeletePreparation<E>
where
E: EntityKind,
{
response_rows: Vec<Row<E>>,
rollback_rows: Vec<(RawDataKey, RawRow)>,
}
struct DeleteCountPreparation {
row_count: u32,
rollback_rows: Vec<(RawDataKey, RawRow)>,
}
#[cfg(feature = "sql")]
pub(in crate::db) struct DeleteProjection {
rows: Vec<KernelRow>,
row_count: u32,
}
#[cfg(feature = "sql")]
impl DeleteProjection {
#[must_use]
const fn new(rows: Vec<KernelRow>, row_count: u32) -> Self {
Self { rows, row_count }
}
#[must_use]
pub(in crate::db) fn into_parts(self) -> (Vec<KernelRow>, u32) {
(self.rows, self.row_count)
}
}
#[cfg(feature = "sql")]
struct DeletePreparation {
response_rows: Vec<KernelRow>,
rollback_rows: Vec<(RawDataKey, RawRow)>,
}
struct PreparedDeleteCommit {
row_ops: Vec<CommitRowOp>,
}
#[cfg(feature = "sql")]
struct PreparedDeleteProjection {
projection: DeleteProjection,
commit: PreparedDeleteCommit,
}
#[cfg(feature = "sql")]
type DeleteCommitApplyFn<C> =
fn(&Db<C>, EntityAuthority, Vec<CommitRowOp>, &'static str) -> Result<(), InternalError>;
fn prepare_delete_execution_state(
authority: DeleteExecutionAuthority,
logical_plan: AccessPlannedQuery,
index_prefix_specs: Vec<crate::db::access::LoweredIndexPrefixSpec>,
index_range_specs: Vec<crate::db::access::LoweredIndexRangeSpec>,
) -> Result<PreparedDeleteExecutionState, InternalError> {
preflight_mutation_plan_for_authority(authority.entity, &logical_plan)?;
let execution_preparation =
ExecutionPreparation::from_plan(&logical_plan, slot_map_for_model_plan(&logical_plan));
Ok(PreparedDeleteExecutionState {
authority,
logical_plan,
execution_preparation,
index_prefix_specs,
index_range_specs,
})
}
fn resolve_delete_candidate_rows(
store: StoreHandle,
prepared: &PreparedDeleteExecutionState,
) -> Result<Vec<DataRow>, InternalError> {
let runtime = TraversalRuntime::new(store, prepared.authority.entity.entity_tag());
let bindings = crate::db::executor::AccessStreamBindings::new(
prepared.index_prefix_specs.as_slice(),
prepared.index_range_specs.as_slice(),
AccessScanContinuationInput::initial_asc(),
);
let executable_access =
ExecutableAccess::new(&prepared.logical_plan.access, bindings, None, None);
let mut key_stream = runtime.ordered_key_stream_from_runtime_access(executable_access)?;
collect_delete_rows_from_key_stream(store, &mut key_stream, prepared.consistency())
}
fn collect_delete_rows_from_key_stream<S>(
store: StoreHandle,
key_stream: &mut S,
consistency: MissingRowPolicy,
) -> Result<Vec<DataRow>, InternalError>
where
S: crate::db::executor::OrderedKeyStream + ?Sized,
{
let mut rows = Vec::with_capacity(key_stream.exact_key_count_hint().unwrap_or(0));
while let Some(key) = key_stream.next_key()? {
if let Some(row) = read_data_row_with_consistency_from_store(store, &key, consistency)? {
rows.push(row);
}
}
Ok(rows)
}
fn apply_delete_post_access_rows<R>(
prepared: &PreparedDeleteExecutionState,
rows: &mut Vec<R>,
) -> Result<(), InternalError>
where
R: OrderReadableRow,
{
let stats = ExecutionKernel::apply_delete_post_access_with_compiled_predicate(
&prepared.logical_plan,
rows,
prepared.execution_preparation.compiled_predicate(),
)?;
let _ = stats.delete_was_limited;
let _ = stats.rows_after_cursor;
Ok(())
}
fn prepare_typed_delete_leaf<E, T>(
prepared: &PreparedDeleteExecutionState,
data_rows: Vec<DataRow>,
package_rows: impl FnOnce(Vec<DeleteRow<E>>) -> Result<T, InternalError>,
) -> Result<T, InternalError>
where
E: PersistedRow + EntityValue,
{
let mut rows = data_rows
.into_iter()
.map(|row| {
let (key, raw) = row;
let (_, entity) = decode_raw_row_for_entity_key::<E>(&key, &raw)?;
Ok(DeleteRow {
key,
raw: Some(raw),
entity,
})
})
.collect::<Result<Vec<DeleteRow<E>>, InternalError>>()?;
apply_delete_post_access_rows(prepared, &mut rows)?;
package_rows(rows)
}
#[cfg(feature = "sql")]
fn prepare_structural_delete_leaf<T>(
prepared: &PreparedDeleteExecutionState,
data_rows: Vec<DataRow>,
package_rows: impl FnOnce(Vec<KernelRow>) -> Result<T, InternalError>,
) -> Result<T, InternalError> {
let row_layout = prepared.authority.entity.row_layout();
let row_decoder = RowDecoder::structural();
let mut rows = data_rows
.into_iter()
.map(|data_row| row_decoder.decode(&row_layout, data_row))
.collect::<Result<Vec<KernelRow>, InternalError>>()?;
apply_delete_post_access_rows(prepared, &mut rows)?;
package_rows(rows)
}
fn package_typed_delete_rows<E>(
rows: Vec<DeleteRow<E>>,
) -> Result<TypedDeletePreparation<E>, InternalError>
where
E: PersistedRow + EntityValue,
{
let mut response_rows = Vec::with_capacity(rows.len());
let mut rollback_rows = Vec::with_capacity(rows.len());
for mut row in rows {
let response_id = Id::from_key(row.key.try_key::<E>()?);
let rollback_row = row
.raw
.take()
.ok_or_else(InternalError::delete_rollback_row_required)?;
let rollback_key = row.key.to_raw()?;
response_rows.push(Row::new(response_id, row.entity));
rollback_rows.push((rollback_key, rollback_row));
}
Ok(TypedDeletePreparation {
response_rows,
rollback_rows,
})
}
fn package_typed_delete_count<E>(
rows: Vec<DeleteRow<E>>,
) -> Result<DeleteCountPreparation, InternalError>
where
E: PersistedRow + EntityValue,
{
let row_count = u32::try_from(rows.len()).unwrap_or(u32::MAX);
let mut rollback_rows = Vec::with_capacity(rows.len());
for mut row in rows {
let rollback_row = row
.raw
.take()
.ok_or_else(InternalError::delete_rollback_row_required)?;
let rollback_key = row.key.to_raw()?;
rollback_rows.push((rollback_key, rollback_row));
}
Ok(DeleteCountPreparation {
row_count,
rollback_rows,
})
}
#[cfg(feature = "sql")]
fn package_structural_delete_rows(
rows: Vec<KernelRow>,
) -> Result<DeletePreparation, InternalError> {
let mut response_rows = Vec::with_capacity(rows.len());
let mut rollback_rows = Vec::with_capacity(rows.len());
for row in rows {
let (data_row, slots) = row.into_parts()?;
let (key, raw) = data_row;
let rollback_key = key.to_raw()?;
response_rows.push(KernelRow::new((key, raw.clone()), slots));
rollback_rows.push((rollback_key, raw));
}
Ok(DeletePreparation {
response_rows,
rollback_rows,
})
}
#[inline(never)]
fn prepare_delete_commit<C>(
db: &Db<C>,
_store: StoreHandle,
authority: &DeleteExecutionAuthority,
rollback_rows: Vec<(RawDataKey, RawRow)>,
) -> Result<PreparedDeleteCommit, InternalError>
where
C: CanisterKind,
{
let deleted_target_keys = rollback_rows
.iter()
.map(|(raw_key, _)| *raw_key)
.collect::<BTreeSet<_>>();
db.validate_delete_strong_relations(authority.entity.entity_path(), &deleted_target_keys)?;
let row_ops = rollback_rows
.into_iter()
.map(|(raw_key, raw_row)| {
Ok(CommitRowOp::new(
authority.entity.entity_path(),
raw_key,
Some(raw_row.into_bytes()),
None,
authority.schema_fingerprint,
))
})
.collect::<Result<Vec<_>, InternalError>>()?;
Ok(PreparedDeleteCommit { row_ops })
}
#[cfg(feature = "sql")]
fn prepare_structural_delete_projection<C>(
db: &Db<C>,
store: StoreHandle,
prepared: &PreparedDeleteExecutionState,
) -> Result<PreparedDeleteProjection, InternalError>
where
C: CanisterKind,
{
let data_rows = resolve_delete_candidate_rows(store, prepared)?;
record_rows_scanned_for_path(prepared.authority.entity.entity_path(), data_rows.len());
let structural =
prepare_structural_delete_leaf(prepared, data_rows, package_structural_delete_rows)?;
if structural.response_rows.is_empty() {
return Ok(PreparedDeleteProjection {
projection: DeleteProjection::new(Vec::new(), 0),
commit: PreparedDeleteCommit {
row_ops: Vec::new(),
},
});
}
let commit = prepare_delete_commit(db, store, &prepared.authority, structural.rollback_rows)?;
let row_count = u32::try_from(structural.response_rows.len()).unwrap_or(u32::MAX);
Ok(PreparedDeleteProjection {
projection: DeleteProjection::new(structural.response_rows, row_count),
commit,
})
}
#[cfg(feature = "sql")]
fn execute_structural_delete_projection_core<C>(
db: &Db<C>,
store: StoreHandle,
prepared: &PreparedDeleteExecutionState,
apply_delete_commit: DeleteCommitApplyFn<C>,
) -> Result<DeleteProjection, InternalError>
where
C: CanisterKind,
{
let prepared_projection = prepare_structural_delete_projection(db, store, prepared)?;
if prepared_projection.projection.row_count == 0 {
return Ok(DeleteProjection::new(Vec::new(), 0));
}
apply_delete_commit(
db,
prepared.authority.entity,
prepared_projection.commit.row_ops,
"delete_row_apply",
)?;
Ok(prepared_projection.projection)
}
#[cfg(feature = "sql")]
fn apply_delete_commit_window_for_type<E>(
db: &Db<E::Canister>,
authority: EntityAuthority,
row_ops: Vec<CommitRowOp>,
apply_phase: &'static str,
) -> Result<(), InternalError>
where
E: EntityKind + EntityValue,
{
if db.has_runtime_hooks() {
commit_delete_row_ops_with_window_for_path(
db,
authority.entity_path(),
row_ops,
apply_phase,
)
} else {
commit_delete_row_ops_with_window::<E>(db, row_ops, apply_phase)
}
}
#[derive(Clone, Copy)]
pub(in crate::db) struct DeleteExecutor<E>
where
E: PersistedRow,
{
db: Db<E::Canister>,
}
impl<E> DeleteExecutor<E>
where
E: PersistedRow + EntityValue,
{
#[must_use]
pub(in crate::db) const fn new(db: Db<E::Canister>) -> Self {
Self { db }
}
pub(in crate::db) fn execute(
self,
plan: PreparedExecutionPlan<E>,
) -> Result<EntityResponse<E>, InternalError> {
validate_delete_plan_shape(&plan)?;
(|| {
let authority = DeleteExecutionAuthority::for_type::<E>();
let index_prefix_specs = plan.index_prefix_specs()?.to_vec();
let index_range_specs = plan.index_range_specs()?.to_vec();
let logical_plan = plan.into_plan();
let prepared = prepare_delete_execution_state(
authority,
logical_plan,
index_prefix_specs,
index_range_specs,
)?;
let ctx = mutation_write_context::<E>(&self.db)?;
let store = ctx.structural_store()?;
let mut span = Span::<E>::new(ExecKind::Delete);
record_plan_metrics(&prepared.logical_plan.access);
let data_rows = resolve_delete_candidate_rows(store, &prepared)?;
record_rows_scanned_for_path(prepared.authority.entity.entity_path(), data_rows.len());
let typed =
prepare_typed_delete_leaf(&prepared, data_rows, package_typed_delete_rows::<E>)?;
if typed.response_rows.is_empty() {
set_rows_from_len(&mut span, 0);
return Ok(EntityResponse::new(Vec::new()));
}
let commit =
prepare_delete_commit(&self.db, store, &prepared.authority, typed.rollback_rows)?;
if self.db.has_runtime_hooks() {
commit_delete_row_ops_with_window_for_path(
&self.db,
prepared.authority.entity.entity_path(),
commit.row_ops,
"delete_row_apply",
)?;
} else {
commit_delete_row_ops_with_window::<E>(
&self.db,
commit.row_ops,
"delete_row_apply",
)?;
}
set_rows_from_len(&mut span, typed.response_rows.len());
Ok(EntityResponse::new(typed.response_rows))
})()
}
#[cfg(feature = "sql")]
pub(in crate::db) fn execute_structural_projection(
self,
plan: PreparedExecutionPlan<E>,
) -> Result<DeleteProjection, InternalError> {
validate_delete_plan_shape(&plan)?;
(|| {
let authority = DeleteExecutionAuthority::for_type::<E>();
let index_prefix_specs = plan.index_prefix_specs()?.to_vec();
let index_range_specs = plan.index_range_specs()?.to_vec();
let logical_plan = plan.into_plan();
let prepared = prepare_delete_execution_state(
authority,
logical_plan,
index_prefix_specs,
index_range_specs,
)?;
let ctx = mutation_write_context::<E>(&self.db)?;
let store = ctx.structural_store()?;
let mut span = Span::<E>::new(ExecKind::Delete);
record_plan_metrics(&prepared.logical_plan.access);
let projection = execute_structural_delete_projection_core(
&self.db,
store,
&prepared,
apply_delete_commit_window_for_type::<E>,
)?;
if projection.row_count == 0 {
set_rows_from_len(&mut span, 0);
return Ok(DeleteProjection::new(Vec::new(), 0));
}
set_rows_from_len(
&mut span,
usize::try_from(projection.row_count).unwrap_or(usize::MAX),
);
Ok(projection)
})()
}
pub(in crate::db) fn execute_count(
self,
plan: PreparedExecutionPlan<E>,
) -> Result<u32, InternalError> {
validate_delete_plan_shape(&plan)?;
(|| {
let authority = DeleteExecutionAuthority::for_type::<E>();
let index_prefix_specs = plan.index_prefix_specs()?.to_vec();
let index_range_specs = plan.index_range_specs()?.to_vec();
let logical_plan = plan.into_plan();
let prepared = prepare_delete_execution_state(
authority,
logical_plan,
index_prefix_specs,
index_range_specs,
)?;
let ctx = mutation_write_context::<E>(&self.db)?;
let store = ctx.structural_store()?;
let mut span = Span::<E>::new(ExecKind::Delete);
record_plan_metrics(&prepared.logical_plan.access);
let data_rows = resolve_delete_candidate_rows(store, &prepared)?;
record_rows_scanned_for_path(prepared.authority.entity.entity_path(), data_rows.len());
let counted =
prepare_typed_delete_leaf(&prepared, data_rows, package_typed_delete_count::<E>)?;
if counted.row_count == 0 {
set_rows_from_len(&mut span, 0);
return Ok(0);
}
let commit =
prepare_delete_commit(&self.db, store, &prepared.authority, counted.rollback_rows)?;
if self.db.has_runtime_hooks() {
commit_delete_row_ops_with_window_for_path(
&self.db,
prepared.authority.entity.entity_path(),
commit.row_ops,
"delete_row_apply",
)?;
} else {
commit_delete_row_ops_with_window::<E>(
&self.db,
commit.row_ops,
"delete_row_apply",
)?;
}
set_rows_from_len(
&mut span,
usize::try_from(counted.row_count).unwrap_or(usize::MAX),
);
Ok(counted.row_count)
})()
}
}