use crate::{
db::{
Db, PersistedRow,
data::DataRow,
executor::{
AccessScanContinuationInput, AccessStreamBindings, ExecutionKernel,
ExecutionPreparation, OrderReadableRow, PreparedExecutionPlan, TraversalRuntime,
mutation::{mutation_write_context, preflight_mutation_plan_for_authority},
pipeline::contracts::{
ExecutionInputs, ExecutionRuntimeAdapter, PreparedExecutionInputParts,
PreparedExecutionProjection, ProjectionMaterializationMode,
},
pipeline::runtime::ExecutionAttemptKernel,
planning::preparation::slot_map_for_model_plan,
read_data_row_with_consistency_from_store,
route::{RoutePlanRequest, build_execution_route_plan},
},
index::IndexCompilePolicy,
predicate::MissingRowPolicy,
query::plan::AccessPlannedQuery,
registry::StoreHandle,
},
error::InternalError,
traits::EntityValue,
};
use std::sync::Arc;
use crate::db::executor::delete::types::{
DeleteExecutionAuthority, PreparedDeleteExecutionState, validate_delete_plan_shape,
};
fn prepare_delete_execution_state(
authority: DeleteExecutionAuthority,
logical_plan: Arc<AccessPlannedQuery>,
index_prefix_specs: Arc<[crate::db::access::LoweredIndexPrefixSpec]>,
index_range_specs: Arc<[crate::db::access::LoweredIndexRangeSpec]>,
) -> Result<PreparedDeleteExecutionState, InternalError> {
preflight_mutation_plan_for_authority(authority.entity, &logical_plan)?;
let route_plan = build_execution_route_plan(&logical_plan, RoutePlanRequest::MutationDelete)?;
let slot_map = slot_map_for_model_plan(&logical_plan);
let execution_preparation = ExecutionPreparation::from_runtime_plan(&logical_plan, slot_map);
Ok(PreparedDeleteExecutionState {
authority,
logical_plan,
route_plan,
execution_preparation,
index_prefix_specs,
index_range_specs,
})
}
pub(in crate::db::executor::delete) fn prepare_delete_runtime<E>(
db: &Db<E::Canister>,
plan: PreparedExecutionPlan<E>,
) -> Result<(PreparedDeleteExecutionState, StoreHandle), InternalError>
where
E: PersistedRow + EntityValue,
{
validate_delete_plan_shape(&plan)?;
let prepared = plan.into_access_plan_parts()?;
let authority = DeleteExecutionAuthority::for_type::<E>();
let prepared = prepare_delete_execution_state(
authority,
prepared.plan,
prepared.index_prefix_specs,
prepared.index_range_specs,
)?;
let ctx = mutation_write_context::<E>(db)?;
let store = ctx.structural_store()?;
Ok((prepared, store))
}
pub(in crate::db::executor::delete) fn resolve_delete_candidate_rows(
store: StoreHandle,
prepared: &PreparedDeleteExecutionState,
) -> Result<Vec<DataRow>, InternalError> {
let runtime = ExecutionRuntimeAdapter::from_stream_runtime_parts(TraversalRuntime::new(
store,
prepared.authority.entity.entity_tag(),
));
let execution_inputs = ExecutionInputs::new_prepared(PreparedExecutionInputParts {
runtime: &runtime,
plan: &prepared.logical_plan,
executable_access: prepared.logical_plan.access.executable_contract(),
stream_bindings: AccessStreamBindings::new(
prepared.index_prefix_specs.as_ref(),
prepared.index_range_specs.as_ref(),
AccessScanContinuationInput::initial_asc(),
),
execution_preparation: &prepared.execution_preparation,
projection_materialization: ProjectionMaterializationMode::None,
prepared_projection: PreparedExecutionProjection::empty(),
emit_cursor: false,
});
let mut resolved = ExecutionAttemptKernel::new(&execution_inputs)
.resolve_execution_key_stream(
&prepared.route_plan,
IndexCompilePolicy::ConservativeSubset,
)?;
collect_delete_rows_from_key_stream(store, resolved.key_stream_mut(), prepared.consistency())
}
fn collect_delete_rows_from_key_stream<S>(
store: StoreHandle,
key_stream: &mut S,
consistency: MissingRowPolicy,
) -> Result<Vec<DataRow>, InternalError>
where
S: crate::db::executor::OrderedKeyStream + ?Sized,
{
let mut rows = Vec::with_capacity(key_stream.exact_key_count_hint().unwrap_or(0));
while let Some(key) = key_stream.next_key()? {
if let Some(row) = read_data_row_with_consistency_from_store(store, &key, consistency)? {
rows.push(row);
}
}
Ok(rows)
}
pub(in crate::db::executor::delete) fn apply_delete_post_access_rows<R>(
prepared: &PreparedDeleteExecutionState,
rows: &mut Vec<R>,
) -> Result<(), InternalError>
where
R: OrderReadableRow,
{
let stats = ExecutionKernel::apply_delete_post_access_with_filter_program(
&prepared.logical_plan,
rows,
prepared.logical_plan.effective_runtime_filter_program(),
)?;
let _ = stats.delete_was_limited;
let _ = stats.rows_after_cursor;
Ok(())
}