use crate::{
db::{
Db,
commit::{CommitRowOp, PreparedRowCommitOp, rollback_prepared_row_ops_reverse},
registry::StoreRecoveryCapability,
},
error::InternalError,
traits::CanisterKind,
};
pub(in crate::db) fn replay_commit_marker_row_ops(
db: &Db<impl CanisterKind>,
row_ops: &[CommitRowOp],
) -> Result<(), InternalError> {
let mut rollbacks = Vec::<PreparedRowCommitOp>::with_capacity(row_ops.len());
for row_op in row_ops {
let recovery = match row_op_recovery_capability(db, row_op) {
Ok(recovery) => recovery,
Err(err) => {
rollback_prepared_row_ops_reverse(rollbacks);
return Err(err);
}
};
match recovery {
StoreRecoveryCapability::StableCommitReplay => {}
StoreRecoveryCapability::StableBasePlusJournalReplay => {
rollback_prepared_row_ops_reverse(rollbacks);
return Err(InternalError::store_unsupported(
"journaled row-op recovery is unsupported; journaled recovery must use marker-bound journal batches",
));
}
StoreRecoveryCapability::None => continue,
}
let prepared = match db.prepare_row_commit_op(row_op) {
Ok(op) => op,
Err(err) => {
rollback_prepared_row_ops_reverse(rollbacks);
return Err(err);
}
};
rollbacks.push(prepared.snapshot_row_only_rollback());
prepared.apply_row_only();
}
Ok(())
}
fn row_op_recovery_capability(
db: &Db<impl CanisterKind>,
row_op: &CommitRowOp,
) -> Result<StoreRecoveryCapability, InternalError> {
let hooks = db.runtime_hook_for_entity_path(row_op.entity_path.as_ref())?;
let handle = db.store_handle(hooks.store_path)?;
Ok(handle.storage_capabilities().recovery())
}