1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
use crate::Error;
use crate::error::StorageError;
use crate::transactions::WriteTransaction;
use std::fmt::{Display, Formatter};
use std::sync::Mutex;
use std::sync::mpsc;
/// Error from a group commit operation.
#[derive(Debug)]
#[non_exhaustive]
pub enum GroupCommitError {
/// This batch's operations caused an error.
BatchFailed(Error),
/// This batch was rolled back because another batch in the group failed.
/// The caller may retry by resubmitting.
PeerFailed,
/// The write transaction could not be acquired.
TransactionFailed(StorageError),
/// The commit itself failed (fsync error, etc.).
CommitFailed(StorageError),
/// The database is shutting down.
Shutdown,
/// An internal mutex was poisoned (a thread panicked while holding the lock).
LockPoisoned,
}
impl Display for GroupCommitError {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
match self {
Self::BatchFailed(e) => write!(f, "Batch operation failed: {e}"),
Self::PeerFailed => write!(f, "Rolled back: another batch in the group failed"),
Self::TransactionFailed(e) => write!(f, "Transaction acquisition failed: {e}"),
Self::CommitFailed(e) => write!(f, "Commit failed: {e}"),
Self::Shutdown => write!(f, "Database is shutting down"),
Self::LockPoisoned => write!(f, "Internal mutex poisoned"),
}
}
}
impl std::error::Error for GroupCommitError {}
type BatchFn =
Box<dyn FnOnce(&WriteTransaction) -> std::result::Result<(), Error> + Send + 'static>;
/// A batch of write operations submitted to group commit.
///
/// Create a batch from a closure that receives a `&WriteTransaction` and performs
/// mutations (open tables, insert, remove, etc.). The group committer manages the
/// transaction lifecycle -- do not call `commit()` or `abort()` within the closure.
///
/// # Example
///
/// ```ignore
/// use shodh_redb::{TableDefinition, WriteBatch};
///
/// const TABLE: TableDefinition<&str, u64> = TableDefinition::new("my_data");
///
/// let batch = WriteBatch::new(|txn| {
/// let mut table = txn.open_table(TABLE)?;
/// table.insert("key", &42)?;
/// Ok(())
/// });
/// db.submit_write_batch(batch)?;
/// ```
pub struct WriteBatch {
operations: BatchFn,
}
impl WriteBatch {
/// Create a batch from a closure that receives a shared `&WriteTransaction`.
///
/// The closure should open tables, insert/remove entries, etc.
/// Do not call `commit()` or `abort()` -- the group committer manages the lifecycle.
pub fn new<F>(f: F) -> Self
where
F: FnOnce(&WriteTransaction) -> std::result::Result<(), Error> + Send + 'static,
{
Self {
operations: Box::new(f),
}
}
pub(crate) fn apply(self, txn: &WriteTransaction) -> std::result::Result<(), Error> {
(self.operations)(txn)
}
}
pub(crate) struct PendingBatch {
pub batch: WriteBatch,
pub result_tx: mpsc::SyncSender<Result<(), GroupCommitError>>,
}
struct GroupCommitState {
pending: Vec<PendingBatch>,
active_leader: bool,
shutdown: bool,
}
pub(crate) struct GroupCommitter {
state: Mutex<GroupCommitState>,
}
impl GroupCommitter {
pub fn new() -> Self {
Self {
state: Mutex::new(GroupCommitState {
pending: Vec::new(),
active_leader: false,
shutdown: false,
}),
}
}
/// Enqueue a batch and determine whether this thread should become the leader.
/// Returns `(should_lead, result_rx)`.
///
/// A poisoned mutex means a previous leader panicked while holding the lock,
/// leaving `GroupCommitState` in an unknown (potentially half-mutated) state.
/// We intentionally do **not** call `into_inner()` to recover: the pending
/// batches inside may reference already-dropped resources, and replaying them
/// could corrupt the write-ahead log. Returning `LockPoisoned` forces callers
/// to treat this as a fatal, non-recoverable error.
pub fn enqueue(
&self,
batch: WriteBatch,
) -> Result<(bool, mpsc::Receiver<Result<(), GroupCommitError>>), GroupCommitError> {
let (result_tx, result_rx) = mpsc::sync_channel(1);
let mut state = self
.state
.lock()
.map_err(|_| GroupCommitError::LockPoisoned)?;
if state.shutdown {
return Err(GroupCommitError::Shutdown);
}
let should_lead = !state.active_leader;
if should_lead {
state.active_leader = true;
}
state.pending.push(PendingBatch { batch, result_tx });
Ok((should_lead, result_rx))
}
/// Drain all pending batches. Called by the leader.
pub fn drain_pending(&self) -> Result<Vec<PendingBatch>, GroupCommitError> {
let mut state = self
.state
.lock()
.map_err(|_| GroupCommitError::LockPoisoned)?;
Ok(std::mem::take(&mut state.pending))
}
/// Atomically relinquish leadership, draining any batches that arrived
/// while the leader was processing the previous round.
///
/// Returns `Ok(batches)` -- if non-empty the caller must process them
/// before calling `finish_leader` again, preventing orphaned batches.
pub fn finish_leader(&self) -> Result<Vec<PendingBatch>, GroupCommitError> {
let mut state = self
.state
.lock()
.map_err(|_| GroupCommitError::LockPoisoned)?;
let remaining = std::mem::take(&mut state.pending);
if remaining.is_empty() {
state.active_leader = false;
}
// If remaining is non-empty we keep active_leader = true so no
// other thread tries to become leader while we process the leftovers.
Ok(remaining)
}
/// Shut down the group committer, failing all pending batches.
pub fn shutdown(&self) {
if let Ok(mut state) = self.state.lock() {
state.shutdown = true;
let pending = std::mem::take(&mut state.pending);
drop(state);
for p in pending {
let _ = p.result_tx.send(Err(GroupCommitError::Shutdown));
}
}
}
}