#[cfg(feature = "sql")]
use crate::{
db::executor::{
delete::DeleteProjectionBounds, delete::types::DeleteProjection,
projection::MaterializedProjectionRows,
},
value::Value,
};
use crate::{
db::{
Db,
executor::{
delete::{
prepare_delete_leaf_rows, prepare_delete_output_from_leaf,
resolve_delete_candidate_rows_recorded_as,
types::{DeleteLeaf, PreparedDeleteExecutionState, PreparedDeleteOutput},
},
terminal::{KernelRow, RowDecoder},
},
registry::StoreHandle,
},
error::InternalError,
traits::CanisterKind,
};
#[cfg(feature = "sql")]
use icydb_diagnostic_code::SqlWriteBoundaryCode;
#[cfg(feature = "sql")]
fn package_structural_delete_rows(
rows: Vec<KernelRow>,
) -> Result<DeleteLeaf<DeleteProjection>, InternalError> {
let mut response_rows = Vec::with_capacity(rows.len());
let mut rollback_rows = Vec::with_capacity(rows.len());
for row in rows {
let (data_row, slots) = row.into_data_row_and_slots()?;
let (key, raw) = data_row;
let rollback_key = key.to_raw()?;
response_rows.push(
slots
.into_iter()
.map(|value| value.unwrap_or(Value::Null))
.collect::<Vec<_>>(),
);
rollback_rows.push((rollback_key, raw));
}
Ok(DeleteLeaf {
output: DeleteProjection::new(MaterializedProjectionRows::from_value_rows(response_rows)),
row_count: rollback_rows.len(),
rollback_rows,
})
}
fn package_structural_delete_count(rows: Vec<KernelRow>) -> Result<DeleteLeaf<()>, InternalError> {
let row_count = rows.len();
let mut rollback_rows = Vec::with_capacity(rows.len());
for row in rows {
let (data_row, _) = row.into_data_row_and_slots()?;
let (key, raw) = data_row;
let rollback_key = key.to_raw()?;
rollback_rows.push((rollback_key, raw));
}
Ok(DeleteLeaf {
output: (),
row_count,
rollback_rows,
})
}
fn resolve_structural_delete_kernel_rows(
store: StoreHandle,
prepared: &PreparedDeleteExecutionState,
) -> Result<Vec<KernelRow>, InternalError> {
let row_layout = prepared.authority.entity.row_layout()?;
let row_decoder = RowDecoder::structural();
resolve_delete_candidate_rows_recorded_as(store, prepared, |data_row| {
row_decoder.decode(&row_layout, data_row)
})
}
fn prepare_structural_delete_leaf_from_access<T>(
store: StoreHandle,
prepared: &PreparedDeleteExecutionState,
package_rows: impl FnOnce(Vec<KernelRow>) -> Result<DeleteLeaf<T>, InternalError>,
) -> Result<DeleteLeaf<T>, InternalError> {
let rows = resolve_structural_delete_kernel_rows(store, prepared)?;
prepare_delete_leaf_rows(prepared, rows, package_rows)
}
fn prepare_structural_delete_output<C, T>(
db: &Db<C>,
store: StoreHandle,
prepared: &PreparedDeleteExecutionState,
package_rows: impl FnOnce(Vec<KernelRow>) -> Result<DeleteLeaf<T>, InternalError>,
) -> Result<Option<PreparedDeleteOutput<T>>, InternalError>
where
C: CanisterKind,
{
let structural = prepare_structural_delete_leaf_from_access(store, prepared, package_rows)?;
prepare_delete_output_from_leaf(db, store, prepared, structural)
}
#[cfg(feature = "sql")]
fn validate_structural_delete_projection_bounds(
projection: &DeleteProjection,
bounds: DeleteProjectionBounds,
) -> Result<(), InternalError> {
validate_structural_delete_row_count_bounds(projection.row_count(), bounds)
}
#[cfg(feature = "sql")]
fn validate_structural_delete_row_count_bounds(
row_count: u32,
bounds: DeleteProjectionBounds,
) -> Result<(), InternalError> {
let Some(max_rows) = bounds.row_limit() else {
return Ok(());
};
if row_count <= max_rows {
return Ok(());
}
Err(InternalError::query_sql_write_boundary(
SqlWriteBoundaryCode::StagedRowsTooMany,
))
}
#[cfg(feature = "sql")]
pub(in crate::db::executor::delete) fn prepare_structural_delete_projection_core<C>(
db: &Db<C>,
store: StoreHandle,
prepared: &PreparedDeleteExecutionState,
bounds: DeleteProjectionBounds,
validate_precommit: impl FnOnce(&DeleteProjection) -> Result<(), InternalError>,
) -> Result<Option<PreparedDeleteOutput<DeleteProjection>>, InternalError>
where
C: CanisterKind,
{
let Some(prepared_projection) =
prepare_structural_delete_output(db, store, prepared, package_structural_delete_rows)?
else {
let projection = DeleteProjection::new(MaterializedProjectionRows::empty());
validate_structural_delete_projection_bounds(&projection, bounds)?;
validate_precommit(&projection)?;
return Ok(None);
};
validate_structural_delete_projection_bounds(&prepared_projection.output, bounds)?;
validate_precommit(&prepared_projection.output)?;
Ok(Some(prepared_projection))
}
pub(in crate::db::executor::delete) fn prepare_structural_delete_count_core<C>(
db: &Db<C>,
store: StoreHandle,
prepared: &PreparedDeleteExecutionState,
) -> Result<Option<PreparedDeleteOutput<()>>, InternalError>
where
C: CanisterKind,
{
prepare_structural_delete_count_core_with_optional_bounds(db, store, prepared, None)
}
fn prepare_structural_delete_count_core_with_optional_bounds<C>(
db: &Db<C>,
store: StoreHandle,
prepared: &PreparedDeleteExecutionState,
max_rows: Option<u32>,
) -> Result<Option<PreparedDeleteOutput<()>>, InternalError>
where
C: CanisterKind,
{
let Some(prepared_count) =
prepare_structural_delete_output(db, store, prepared, package_structural_delete_count)?
else {
return Ok(None);
};
#[cfg(not(feature = "sql"))]
let _ = max_rows;
#[cfg(feature = "sql")]
if let Some(max_rows) = max_rows {
let row_count = u32::try_from(prepared_count.row_count).unwrap_or(u32::MAX);
validate_structural_delete_row_count_bounds(
row_count,
DeleteProjectionBounds::max_rows(max_rows),
)?;
}
Ok(Some(prepared_count))
}
#[cfg(feature = "sql")]
pub(in crate::db::executor::delete) fn prepare_structural_delete_count_core_with_bounds<C>(
db: &Db<C>,
store: StoreHandle,
prepared: &PreparedDeleteExecutionState,
bounds: DeleteProjectionBounds,
) -> Result<Option<PreparedDeleteOutput<()>>, InternalError>
where
C: CanisterKind,
{
prepare_structural_delete_count_core_with_optional_bounds(
db,
store,
prepared,
bounds.row_limit(),
)
}