Skip to main content

shodh_redb/
group_commit.rs

1use crate::Error;
2use crate::error::StorageError;
3use crate::transactions::WriteTransaction;
4use std::fmt::{Display, Formatter};
5use std::sync::Mutex;
6use std::sync::mpsc;
7
8/// Error from a group commit operation.
9#[derive(Debug)]
10#[non_exhaustive]
11pub enum GroupCommitError {
12    /// This batch's operations caused an error.
13    BatchFailed(Error),
14    /// This batch was rolled back because another batch in the group failed.
15    /// The caller may retry by resubmitting.
16    PeerFailed,
17    /// The write transaction could not be acquired.
18    TransactionFailed(StorageError),
19    /// The commit itself failed (fsync error, etc.).
20    CommitFailed(StorageError),
21    /// The database is shutting down.
22    Shutdown,
23    /// An internal mutex was poisoned (a thread panicked while holding the lock).
24    LockPoisoned,
25}
26
27impl Display for GroupCommitError {
28    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
29        match self {
30            Self::BatchFailed(e) => write!(f, "Batch operation failed: {e}"),
31            Self::PeerFailed => write!(f, "Rolled back: another batch in the group failed"),
32            Self::TransactionFailed(e) => write!(f, "Transaction acquisition failed: {e}"),
33            Self::CommitFailed(e) => write!(f, "Commit failed: {e}"),
34            Self::Shutdown => write!(f, "Database is shutting down"),
35            Self::LockPoisoned => write!(f, "Internal mutex poisoned"),
36        }
37    }
38}
39
40impl std::error::Error for GroupCommitError {}
41
42type BatchFn =
43    Box<dyn FnOnce(&WriteTransaction) -> std::result::Result<(), Error> + Send + 'static>;
44
45/// A batch of write operations submitted to group commit.
46///
47/// Create a batch from a closure that receives a `&WriteTransaction` and performs
48/// mutations (open tables, insert, remove, etc.). The group committer manages the
49/// transaction lifecycle -- do not call `commit()` or `abort()` within the closure.
50///
51/// # Example
52///
53/// ```ignore
54/// use shodh_redb::{TableDefinition, WriteBatch};
55///
56/// const TABLE: TableDefinition<&str, u64> = TableDefinition::new("my_data");
57///
58/// let batch = WriteBatch::new(|txn| {
59///     let mut table = txn.open_table(TABLE)?;
60///     table.insert("key", &42)?;
61///     Ok(())
62/// });
63/// db.submit_write_batch(batch)?;
64/// ```
65pub struct WriteBatch {
66    operations: BatchFn,
67}
68
69impl WriteBatch {
70    /// Create a batch from a closure that receives a shared `&WriteTransaction`.
71    ///
72    /// The closure should open tables, insert/remove entries, etc.
73    /// Do not call `commit()` or `abort()` -- the group committer manages the lifecycle.
74    pub fn new<F>(f: F) -> Self
75    where
76        F: FnOnce(&WriteTransaction) -> std::result::Result<(), Error> + Send + 'static,
77    {
78        Self {
79            operations: Box::new(f),
80        }
81    }
82
83    pub(crate) fn apply(self, txn: &WriteTransaction) -> std::result::Result<(), Error> {
84        (self.operations)(txn)
85    }
86}
87
88pub(crate) struct PendingBatch {
89    pub batch: WriteBatch,
90    pub result_tx: mpsc::SyncSender<Result<(), GroupCommitError>>,
91}
92
93struct GroupCommitState {
94    pending: Vec<PendingBatch>,
95    active_leader: bool,
96    shutdown: bool,
97}
98
99pub(crate) struct GroupCommitter {
100    state: Mutex<GroupCommitState>,
101}
102
103impl GroupCommitter {
104    pub fn new() -> Self {
105        Self {
106            state: Mutex::new(GroupCommitState {
107                pending: Vec::new(),
108                active_leader: false,
109                shutdown: false,
110            }),
111        }
112    }
113
114    /// Enqueue a batch and determine whether this thread should become the leader.
115    /// Returns `(should_lead, result_rx)`.
116    ///
117    /// A poisoned mutex means a previous leader panicked while holding the lock,
118    /// leaving `GroupCommitState` in an unknown (potentially half-mutated) state.
119    /// We intentionally do **not** call `into_inner()` to recover: the pending
120    /// batches inside may reference already-dropped resources, and replaying them
121    /// could corrupt the write-ahead log. Returning `LockPoisoned` forces callers
122    /// to treat this as a fatal, non-recoverable error.
123    pub fn enqueue(
124        &self,
125        batch: WriteBatch,
126    ) -> Result<(bool, mpsc::Receiver<Result<(), GroupCommitError>>), GroupCommitError> {
127        let (result_tx, result_rx) = mpsc::sync_channel(1);
128        let mut state = self
129            .state
130            .lock()
131            .map_err(|_| GroupCommitError::LockPoisoned)?;
132        if state.shutdown {
133            return Err(GroupCommitError::Shutdown);
134        }
135        let should_lead = !state.active_leader;
136        if should_lead {
137            state.active_leader = true;
138        }
139        state.pending.push(PendingBatch { batch, result_tx });
140        Ok((should_lead, result_rx))
141    }
142
143    /// Drain all pending batches. Called by the leader.
144    pub fn drain_pending(&self) -> Result<Vec<PendingBatch>, GroupCommitError> {
145        let mut state = self
146            .state
147            .lock()
148            .map_err(|_| GroupCommitError::LockPoisoned)?;
149        Ok(std::mem::take(&mut state.pending))
150    }
151
152    /// Atomically relinquish leadership, draining any batches that arrived
153    /// while the leader was processing the previous round.
154    ///
155    /// Returns `Ok(batches)` -- if non-empty the caller must process them
156    /// before calling `finish_leader` again, preventing orphaned batches.
157    pub fn finish_leader(&self) -> Result<Vec<PendingBatch>, GroupCommitError> {
158        let mut state = self
159            .state
160            .lock()
161            .map_err(|_| GroupCommitError::LockPoisoned)?;
162        let remaining = std::mem::take(&mut state.pending);
163        if remaining.is_empty() {
164            state.active_leader = false;
165        }
166        // If remaining is non-empty we keep active_leader = true so no
167        // other thread tries to become leader while we process the leftovers.
168        Ok(remaining)
169    }
170
171    /// Shut down the group committer, failing all pending batches.
172    pub fn shutdown(&self) {
173        if let Ok(mut state) = self.state.lock() {
174            state.shutdown = true;
175            let pending = std::mem::take(&mut state.pending);
176            drop(state);
177            for p in pending {
178                let _ = p.result_tx.send(Err(GroupCommitError::Shutdown));
179            }
180        }
181    }
182}