use crate::{
db::commit::{
marker::{COMMIT_ID_BYTES, CommitMarker, CommitRowOp, generate_commit_id},
store::{CommitStore, with_commit_store, with_commit_store_infallible},
},
error::InternalError,
};
use std::panic::{AssertUnwindSafe, catch_unwind};
pub(crate) struct CommitApplyGuard {
phase: &'static str,
finished: bool,
rollback: Option<Box<dyn FnOnce()>>,
}
impl CommitApplyGuard {
pub(crate) const fn new(phase: &'static str) -> Self {
Self {
phase,
finished: false,
rollback: None,
}
}
pub(crate) fn record_rollback(&mut self, rollback: impl FnOnce() + 'static) {
let rollback = Box::new(rollback);
self.rollback = Some(match self.rollback.take() {
None => rollback,
Some(previous) => Box::new(move || {
rollback();
previous();
}),
});
}
pub(crate) fn finish(mut self) -> Result<(), InternalError> {
if self.finished {
return Err(InternalError::executor_invariant(format!(
"commit apply guard invariant violated: finish called twice ({})",
self.phase
)));
}
self.finished = true;
self.rollback = None;
Ok(())
}
fn rollback_best_effort(&mut self) {
if self.finished {
return;
}
while let Some(rollback) = self.rollback.take() {
let _ = catch_unwind(AssertUnwindSafe(rollback));
}
}
}
impl Drop for CommitApplyGuard {
fn drop(&mut self) {
if !self.finished {
self.rollback_best_effort();
}
}
}
#[derive(Clone, Debug)]
pub(crate) struct CommitGuard {
commit_id: [u8; COMMIT_ID_BYTES],
marker: Option<CommitMarker>,
}
impl CommitGuard {
const fn for_persisted_id(commit_id: [u8; COMMIT_ID_BYTES]) -> Self {
Self {
commit_id,
marker: None,
}
}
pub(in crate::db) const fn from_marker(marker: CommitMarker) -> Self {
let commit_id = marker.id;
Self {
commit_id,
marker: Some(marker),
}
}
pub(in crate::db) fn row_ops(&self) -> Option<&[CommitRowOp]> {
self.marker.as_ref().map(|marker| marker.row_ops.as_slice())
}
fn clear(self) {
let _ = self;
with_commit_store_infallible(CommitStore::clear_infallible);
}
}
pub(crate) fn begin_commit(marker: CommitMarker) -> Result<CommitGuard, InternalError> {
with_commit_store(|store| {
let commit_id = marker.id;
store.set_if_empty(&marker)?;
Ok(CommitGuard::for_persisted_id(commit_id))
})
}
pub(crate) fn begin_single_row_commit(row_op: CommitRowOp) -> Result<CommitGuard, InternalError> {
with_commit_store(|store| {
let commit_id = generate_commit_id()?;
store.set_single_row_op_if_empty(commit_id, &row_op)?;
Ok(CommitGuard::for_persisted_id(commit_id))
})
}
pub(crate) fn begin_commit_with_migration_state(
marker: CommitMarker,
migration_state_bytes: Vec<u8>,
) -> Result<CommitGuard, InternalError> {
with_commit_store(|store| {
if !store.marker_is_empty()? {
return Err(InternalError::store_invariant(
"commit marker already present before begin",
));
}
store.set_with_migration_state(&marker, migration_state_bytes)?;
Ok(CommitGuard::from_marker(marker))
})
}
pub(crate) fn finish_commit(
mut guard: CommitGuard,
apply: impl FnOnce(&mut CommitGuard) -> Result<(), InternalError>,
) -> Result<(), InternalError> {
let result = apply(&mut guard);
let commit_id = guard.commit_id;
if result.is_ok() {
guard.clear();
assert!(
with_commit_store_infallible(|store| store.is_empty()),
"commit marker must be cleared after successful finish_commit (commit_id={commit_id:?})"
);
} else {
assert!(
with_commit_store_infallible(|store| !store.is_empty()),
"commit marker must remain persisted after failed finish_commit (commit_id={commit_id:?})"
);
}
result
}