shodh_redb/
group_commit.rs1use crate::Error;
2use crate::error::StorageError;
3use crate::transactions::WriteTransaction;
4use std::fmt::{Display, Formatter};
5use std::sync::Mutex;
6use std::sync::mpsc;
7
8#[derive(Debug)]
10#[non_exhaustive]
11pub enum GroupCommitError {
12 BatchFailed(Error),
14 PeerFailed,
17 TransactionFailed(StorageError),
19 CommitFailed(StorageError),
21 Shutdown,
23 LockPoisoned,
25}
26
27impl Display for GroupCommitError {
28 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
29 match self {
30 Self::BatchFailed(e) => write!(f, "Batch operation failed: {e}"),
31 Self::PeerFailed => write!(f, "Rolled back: another batch in the group failed"),
32 Self::TransactionFailed(e) => write!(f, "Transaction acquisition failed: {e}"),
33 Self::CommitFailed(e) => write!(f, "Commit failed: {e}"),
34 Self::Shutdown => write!(f, "Database is shutting down"),
35 Self::LockPoisoned => write!(f, "Internal mutex poisoned"),
36 }
37 }
38}
39
40impl std::error::Error for GroupCommitError {}
41
42type BatchFn =
43 Box<dyn FnOnce(&WriteTransaction) -> std::result::Result<(), Error> + Send + 'static>;
44
45pub struct WriteBatch {
66 operations: BatchFn,
67}
68
69impl WriteBatch {
70 pub fn new<F>(f: F) -> Self
75 where
76 F: FnOnce(&WriteTransaction) -> std::result::Result<(), Error> + Send + 'static,
77 {
78 Self {
79 operations: Box::new(f),
80 }
81 }
82
83 pub(crate) fn apply(self, txn: &WriteTransaction) -> std::result::Result<(), Error> {
84 (self.operations)(txn)
85 }
86}
87
88pub(crate) struct PendingBatch {
89 pub batch: WriteBatch,
90 pub result_tx: mpsc::SyncSender<Result<(), GroupCommitError>>,
91}
92
93struct GroupCommitState {
94 pending: Vec<PendingBatch>,
95 active_leader: bool,
96 shutdown: bool,
97}
98
99pub(crate) struct GroupCommitter {
100 state: Mutex<GroupCommitState>,
101}
102
103impl GroupCommitter {
104 pub fn new() -> Self {
105 Self {
106 state: Mutex::new(GroupCommitState {
107 pending: Vec::new(),
108 active_leader: false,
109 shutdown: false,
110 }),
111 }
112 }
113
114 pub fn enqueue(
124 &self,
125 batch: WriteBatch,
126 ) -> Result<(bool, mpsc::Receiver<Result<(), GroupCommitError>>), GroupCommitError> {
127 let (result_tx, result_rx) = mpsc::sync_channel(1);
128 let mut state = self
129 .state
130 .lock()
131 .map_err(|_| GroupCommitError::LockPoisoned)?;
132 if state.shutdown {
133 return Err(GroupCommitError::Shutdown);
134 }
135 let should_lead = !state.active_leader;
136 if should_lead {
137 state.active_leader = true;
138 }
139 state.pending.push(PendingBatch { batch, result_tx });
140 Ok((should_lead, result_rx))
141 }
142
143 pub fn drain_pending(&self) -> Result<Vec<PendingBatch>, GroupCommitError> {
145 let mut state = self
146 .state
147 .lock()
148 .map_err(|_| GroupCommitError::LockPoisoned)?;
149 Ok(std::mem::take(&mut state.pending))
150 }
151
152 pub fn finish_leader(&self) -> Result<Vec<PendingBatch>, GroupCommitError> {
158 let mut state = self
159 .state
160 .lock()
161 .map_err(|_| GroupCommitError::LockPoisoned)?;
162 let remaining = std::mem::take(&mut state.pending);
163 if remaining.is_empty() {
164 state.active_leader = false;
165 }
166 Ok(remaining)
169 }
170
171 pub fn shutdown(&self) {
173 if let Ok(mut state) = self.state.lock() {
174 state.shutdown = true;
175 let pending = std::mem::take(&mut state.pending);
176 drop(state);
177 for p in pending {
178 let _ = p.result_tx.send(Err(GroupCommitError::Shutdown));
179 }
180 }
181 }
182}