#[cfg(feature = "sql")]
use crate::{
db::executor::{
delete::DeleteProjectionBounds, delete::types::DeleteProjection,
projection::MaterializedProjectionRows,
},
value::Value,
};
use crate::{
db::{
Db,
executor::{
delete::{
apply_delete_post_access_rows, prepare_delete_output_from_leaf,
resolve_delete_candidate_rows_recorded_as,
types::{DeleteLeaf, PreparedDeleteExecutionState, PreparedDeleteOutput},
},
terminal::{KernelRow, RowDecoder},
},
registry::StoreHandle,
},
error::InternalError,
traits::CanisterKind,
};
#[cfg(feature = "sql")]
use icydb_diagnostic_code::SqlWriteBoundaryCode;
#[cfg(feature = "sql")]
fn package_structural_delete_rows(
rows: Vec<KernelRow>,
) -> Result<DeleteLeaf<DeleteProjection>, InternalError> {
let mut response_rows = Vec::with_capacity(rows.len());
let mut rollback_rows = Vec::with_capacity(rows.len());
for row in rows {
let (data_row, slots) = row.into_data_row_and_slots()?;
let (key, raw) = data_row;
let rollback_key = key.to_raw()?;
response_rows.push(
slots
.into_iter()
.map(|value| value.unwrap_or(Value::Null))
.collect::<Vec<_>>(),
);
rollback_rows.push((rollback_key, raw));
}
Ok(DeleteLeaf {
output: DeleteProjection::new(MaterializedProjectionRows::from_value_rows(response_rows)),
row_count: rollback_rows.len(),
rollback_rows,
})
}
fn package_structural_delete_count(rows: Vec<KernelRow>) -> Result<DeleteLeaf<()>, InternalError> {
let row_count = rows.len();
let mut rollback_rows = Vec::with_capacity(rows.len());
for row in rows {
let (data_row, _) = row.into_data_row_and_slots()?;
let (key, raw) = data_row;
let rollback_key = key.to_raw()?;
rollback_rows.push((rollback_key, raw));
}
Ok(DeleteLeaf {
output: (),
row_count,
rollback_rows,
})
}
fn resolve_structural_delete_kernel_rows(
store: StoreHandle,
prepared: &PreparedDeleteExecutionState,
) -> Result<Vec<KernelRow>, InternalError> {
let row_layout = prepared.authority.entity.row_layout()?;
let row_decoder = RowDecoder::structural();
resolve_delete_candidate_rows_recorded_as(store, prepared, |data_row| {
row_decoder.decode(&row_layout, data_row)
})
}
fn prepare_structural_delete_leaf_from_access<T>(
store: StoreHandle,
prepared: &PreparedDeleteExecutionState,
max_selected_rows: Option<u32>,
package_rows: impl FnOnce(Vec<KernelRow>) -> Result<DeleteLeaf<T>, InternalError>,
) -> Result<DeleteLeaf<T>, InternalError> {
let mut rows = resolve_structural_delete_kernel_rows(store, prepared)?;
apply_delete_post_access_rows(prepared, &mut rows)?;
#[cfg(feature = "sql")]
validate_structural_delete_selected_row_count_bounds(rows.len(), max_selected_rows)?;
#[cfg(not(feature = "sql"))]
let _ = max_selected_rows;
package_rows(rows)
}
fn prepare_structural_delete_output<C, T>(
db: &Db<C>,
store: StoreHandle,
prepared: &PreparedDeleteExecutionState,
max_selected_rows: Option<u32>,
package_rows: impl FnOnce(Vec<KernelRow>) -> Result<DeleteLeaf<T>, InternalError>,
) -> Result<Option<PreparedDeleteOutput<T>>, InternalError>
where
C: CanisterKind,
{
let structural = prepare_structural_delete_leaf_from_access(
store,
prepared,
max_selected_rows,
package_rows,
)?;
prepare_delete_output_from_leaf(db, store, prepared, structural)
}
#[cfg(feature = "sql")]
fn validate_structural_delete_selected_row_count_bounds(
row_count: usize,
max_rows: Option<u32>,
) -> Result<(), InternalError> {
let Some(max_rows) = max_rows else {
return Ok(());
};
let max_rows = usize::try_from(max_rows).unwrap_or(usize::MAX);
if row_count <= max_rows {
return Ok(());
}
Err(InternalError::query_sql_write_boundary(
SqlWriteBoundaryCode::StagedRowsTooMany,
))
}
#[cfg(feature = "sql")]
fn validate_structural_delete_projection_bounds(
projection: &DeleteProjection,
bounds: DeleteProjectionBounds,
) -> Result<(), InternalError> {
validate_structural_delete_row_count_bounds(projection.row_count(), bounds)
}
#[cfg(feature = "sql")]
fn validate_structural_delete_row_count_bounds(
row_count: u32,
bounds: DeleteProjectionBounds,
) -> Result<(), InternalError> {
let Some(max_rows) = bounds.row_limit() else {
return Ok(());
};
validate_structural_delete_selected_row_count_bounds(row_count as usize, Some(max_rows))
}
#[cfg(feature = "sql")]
pub(in crate::db::executor::delete) fn prepare_structural_delete_projection_core<C>(
db: &Db<C>,
store: StoreHandle,
prepared: &PreparedDeleteExecutionState,
bounds: DeleteProjectionBounds,
validate_precommit: impl FnOnce(&DeleteProjection) -> Result<(), InternalError>,
) -> Result<Option<PreparedDeleteOutput<DeleteProjection>>, InternalError>
where
C: CanisterKind,
{
let Some(prepared_projection) = prepare_structural_delete_output(
db,
store,
prepared,
bounds.row_limit(),
package_structural_delete_rows,
)?
else {
let projection = DeleteProjection::new(MaterializedProjectionRows::empty());
validate_structural_delete_projection_bounds(&projection, bounds)?;
validate_precommit(&projection)?;
return Ok(None);
};
validate_structural_delete_projection_bounds(&prepared_projection.output, bounds)?;
validate_precommit(&prepared_projection.output)?;
Ok(Some(prepared_projection))
}
pub(in crate::db::executor::delete) fn prepare_structural_delete_count_core<C>(
db: &Db<C>,
store: StoreHandle,
prepared: &PreparedDeleteExecutionState,
) -> Result<Option<PreparedDeleteOutput<()>>, InternalError>
where
C: CanisterKind,
{
prepare_structural_delete_count_core_with_optional_bounds(db, store, prepared, None)
}
fn prepare_structural_delete_count_core_with_optional_bounds<C>(
db: &Db<C>,
store: StoreHandle,
prepared: &PreparedDeleteExecutionState,
max_rows: Option<u32>,
) -> Result<Option<PreparedDeleteOutput<()>>, InternalError>
where
C: CanisterKind,
{
let Some(prepared_count) = prepare_structural_delete_output(
db,
store,
prepared,
max_rows,
package_structural_delete_count,
)?
else {
return Ok(None);
};
#[cfg(not(feature = "sql"))]
let _ = max_rows;
#[cfg(feature = "sql")]
if let Some(max_rows) = max_rows {
let row_count = u32::try_from(prepared_count.row_count).unwrap_or(u32::MAX);
validate_structural_delete_row_count_bounds(
row_count,
DeleteProjectionBounds::max_rows(max_rows),
)?;
}
Ok(Some(prepared_count))
}
#[cfg(feature = "sql")]
pub(in crate::db::executor::delete) fn prepare_structural_delete_count_core_with_bounds<C>(
db: &Db<C>,
store: StoreHandle,
prepared: &PreparedDeleteExecutionState,
bounds: DeleteProjectionBounds,
) -> Result<Option<PreparedDeleteOutput<()>>, InternalError>
where
C: CanisterKind,
{
prepare_structural_delete_count_core_with_optional_bounds(
db,
store,
prepared,
bounds.row_limit(),
)
}