#[cfg(feature = "sql")]
use crate::db::executor::delete::{
DeleteProjection, DeleteProjectionBounds, prepare_structural_delete_count_core_with_bounds,
prepare_structural_delete_projection_core,
};
use crate::{
db::{
Db, PersistedRow,
commit::CommitRowOp,
executor::{
PreparedExecutionPlan,
delete::{
apply_delete_commit_window_for_type, package_typed_delete_rows,
prepare_delete_runtime, prepare_structural_delete_count_core,
prepare_typed_delete_core, types::PreparedDeleteExecutionState,
},
plan_metrics::{record_plan_metrics, set_rows_from_len},
},
registry::StoreHandle,
response::EntityResponse,
},
error::InternalError,
metrics::sink::{ExecKind, Span},
traits::EntityValue,
};
#[derive(Clone, Copy)]
pub(in crate::db) struct DeleteExecutor<E>
where
E: PersistedRow,
{
db: Db<E::Canister>,
}
impl<E> DeleteExecutor<E>
where
E: PersistedRow + EntityValue,
{
#[must_use]
pub(in crate::db) const fn new(db: Db<E::Canister>) -> Self {
Self { db }
}
fn apply_prepared_delete_commit(
db: &Db<E::Canister>,
prepared: &PreparedDeleteExecutionState,
row_ops: Vec<CommitRowOp>,
) -> Result<(), InternalError> {
apply_delete_commit_window_for_type::<E>(
db,
prepared.authority.entity.clone(),
row_ops,
"delete_row_apply",
)
}
fn structural_count_row_count(
prepared: &crate::db::executor::delete::types::PreparedDeleteOutput<()>,
) -> u32 {
u32::try_from(prepared.row_count).unwrap_or(u32::MAX)
}
fn execute_with_delete_runtime<T>(
self,
plan: PreparedExecutionPlan<E>,
run: impl FnOnce(
&Db<E::Canister>,
PreparedDeleteExecutionState,
StoreHandle,
) -> Result<(T, usize), InternalError>,
) -> Result<T, InternalError> {
let mut span = Span::<E>::new(ExecKind::Delete);
let result = (|| {
let (prepared, store) = prepare_delete_runtime(&self.db, plan)?;
record_plan_metrics(
prepared.authority.entity.entity_path(),
&prepared.logical_plan,
);
let (output, row_count) = run(&self.db, prepared, store)?;
set_rows_from_len(&mut span, row_count);
Ok(output)
})();
if let Err(err) = &result {
span.set_error(err);
}
result
}
pub(in crate::db) fn execute(
self,
plan: PreparedExecutionPlan<E>,
) -> Result<EntityResponse<E>, InternalError> {
self.execute_with_delete_runtime(plan, |db, prepared, store| {
let Some(typed) =
prepare_typed_delete_core(db, store, &prepared, package_typed_delete_rows::<E>)?
else {
return Ok((EntityResponse::new(Vec::new()), 0));
};
let row_count = typed.row_count;
Self::apply_prepared_delete_commit(db, &prepared, typed.commit.row_ops)?;
Ok((EntityResponse::new(typed.output), row_count))
})
}
#[cfg(feature = "sql")]
pub(in crate::db) fn execute_structural_projection_with_bounds(
self,
plan: PreparedExecutionPlan<E>,
bounds: DeleteProjectionBounds,
validate_precommit: impl FnOnce(&DeleteProjection) -> Result<(), InternalError>,
) -> Result<DeleteProjection, InternalError> {
self.execute_with_delete_runtime(plan, |db, prepared, store| {
let Some(projection) = prepare_structural_delete_projection_core(
db,
store,
&prepared,
bounds,
validate_precommit,
)?
else {
return Ok((DeleteProjection::empty(), 0));
};
let row_count = usize::try_from(projection.output.row_count()).unwrap_or(usize::MAX);
Self::apply_prepared_delete_commit(db, &prepared, projection.commit.row_ops)?;
Ok((projection.output, row_count))
})
}
pub(in crate::db) fn execute_count(
self,
plan: PreparedExecutionPlan<E>,
) -> Result<u32, InternalError> {
self.execute_with_delete_runtime(plan, |db, prepared, store| {
let Some(count) = prepare_structural_delete_count_core(db, store, &prepared)? else {
return Ok((0, 0));
};
let row_count = Self::structural_count_row_count(&count);
let row_count_len = count.row_count;
Self::apply_prepared_delete_commit(db, &prepared, count.commit.row_ops)?;
Ok((row_count, row_count_len))
})
}
#[cfg(feature = "sql")]
pub(in crate::db) fn execute_count_with_bounds(
self,
plan: PreparedExecutionPlan<E>,
bounds: DeleteProjectionBounds,
) -> Result<u32, InternalError> {
self.execute_with_delete_runtime(plan, |db, prepared, store| {
let Some(count) =
prepare_structural_delete_count_core_with_bounds(db, store, &prepared, bounds)?
else {
return Ok((0, 0));
};
let row_count = Self::structural_count_row_count(&count);
let row_count_len = count.row_count;
Self::apply_prepared_delete_commit(db, &prepared, count.commit.row_ops)?;
Ok((row_count, row_count_len))
})
}
}