use crate::{
db::{
Db,
executor::{
delete::{
apply_delete_post_access_rows, prepare_delete_commit,
resolve_delete_candidate_rows_as,
types::{
DeleteCommitApplyFn, DeleteCountPreparation, DeletePreparation,
DeleteProjection, PreparedDeleteCommit, PreparedDeleteExecutionState,
PreparedDeleteProjection,
},
},
plan_metrics::record_rows_scanned_for_path,
projection::MaterializedProjectionRows,
saturating_u32_len,
terminal::{KernelRow, RowDecoder},
},
registry::StoreHandle,
},
error::InternalError,
traits::CanisterKind,
value::Value,
};
fn prepare_structural_delete_leaf<T>(
prepared: &PreparedDeleteExecutionState,
mut rows: Vec<KernelRow>,
package_rows: impl FnOnce(Vec<KernelRow>) -> Result<T, InternalError>,
) -> Result<T, InternalError> {
apply_delete_post_access_rows(prepared, &mut rows)?;
package_rows(rows)
}
fn package_structural_delete_rows(
rows: Vec<KernelRow>,
) -> Result<DeletePreparation, InternalError> {
let mut response_rows = Vec::with_capacity(rows.len());
let mut rollback_rows = Vec::with_capacity(rows.len());
for row in rows {
let (data_row, slots) = row.into_parts()?;
let (key, raw) = data_row;
let rollback_key = key.to_raw()?;
response_rows.push(
slots
.into_iter()
.map(|value| value.unwrap_or(Value::Null))
.collect::<Vec<_>>(),
);
rollback_rows.push((rollback_key, raw));
}
Ok(DeletePreparation {
response_rows: MaterializedProjectionRows::from_value_rows(response_rows),
rollback_rows,
})
}
fn package_structural_delete_count(
rows: Vec<KernelRow>,
) -> Result<DeleteCountPreparation, InternalError> {
let row_count = rows.len();
let mut rollback_rows = Vec::with_capacity(rows.len());
for row in rows {
let (data_row, _) = row.into_parts()?;
let (key, raw) = data_row;
let rollback_key = key.to_raw()?;
rollback_rows.push((rollback_key, raw));
}
Ok(DeleteCountPreparation {
row_count,
rollback_rows,
})
}
fn prepare_structural_delete_count<C>(
db: &Db<C>,
store: StoreHandle,
prepared: &PreparedDeleteExecutionState,
) -> Result<Option<(u32, PreparedDeleteCommit)>, InternalError>
where
C: CanisterKind,
{
let row_layout = prepared.authority.entity.row_layout();
let row_decoder = RowDecoder::structural();
let (rows, rows_scanned) = resolve_delete_candidate_rows_as(store, prepared, |data_row| {
row_decoder.decode(&row_layout, data_row)
})?;
record_rows_scanned_for_path(prepared.authority.entity.entity_path(), rows_scanned);
let structural =
prepare_structural_delete_leaf(prepared, rows, package_structural_delete_count)?;
if structural.row_count == 0 {
return Ok(None);
}
let row_count = saturating_u32_len(structural.row_count);
let commit = prepare_delete_commit(db, store, &prepared.authority, structural.rollback_rows)?;
Ok(Some((row_count, commit)))
}
fn prepare_structural_delete_projection<C>(
db: &Db<C>,
store: StoreHandle,
prepared: &PreparedDeleteExecutionState,
) -> Result<PreparedDeleteProjection, InternalError>
where
C: CanisterKind,
{
let row_layout = prepared.authority.entity.row_layout();
let row_decoder = RowDecoder::structural();
let (rows, rows_scanned) = resolve_delete_candidate_rows_as(store, prepared, |data_row| {
row_decoder.decode(&row_layout, data_row)
})?;
record_rows_scanned_for_path(prepared.authority.entity.entity_path(), rows_scanned);
let structural =
prepare_structural_delete_leaf(prepared, rows, package_structural_delete_rows)?;
if structural.response_rows.len() == 0 {
return Ok(PreparedDeleteProjection {
projection: DeleteProjection::new(MaterializedProjectionRows::empty()),
commit: PreparedDeleteCommit {
row_ops: Vec::new(),
},
});
}
let commit = prepare_delete_commit(db, store, &prepared.authority, structural.rollback_rows)?;
Ok(PreparedDeleteProjection {
projection: DeleteProjection::new(structural.response_rows),
commit,
})
}
pub(in crate::db::executor::delete) fn execute_structural_delete_projection_core<C>(
db: &Db<C>,
store: StoreHandle,
prepared: &PreparedDeleteExecutionState,
apply_delete_commit: DeleteCommitApplyFn<C>,
) -> Result<DeleteProjection, InternalError>
where
C: CanisterKind,
{
let prepared_projection = prepare_structural_delete_projection(db, store, prepared)?;
if prepared_projection.projection.row_count() == 0 {
return Ok(prepared_projection.projection);
}
apply_delete_commit(
db,
prepared.authority.entity.clone(),
prepared_projection.commit.row_ops,
"delete_row_apply",
)?;
Ok(prepared_projection.projection)
}
pub(in crate::db::executor::delete) fn execute_structural_delete_count_core<C>(
db: &Db<C>,
store: StoreHandle,
prepared: &PreparedDeleteExecutionState,
apply_delete_commit: DeleteCommitApplyFn<C>,
) -> Result<u32, InternalError>
where
C: CanisterKind,
{
let Some((row_count, commit)) = prepare_structural_delete_count(db, store, prepared)? else {
return Ok(0);
};
apply_delete_commit(
db,
prepared.authority.entity.clone(),
commit.row_ops,
"delete_row_apply",
)?;
Ok(row_count)
}