reddb-io-server 1.2.0

RedDB server-side engine: storage, runtime, replication, MCP, AI, and the gRPC/HTTP/RedWire/PG-wire dispatchers. Re-exported by the umbrella `reddb` crate.
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
//! Cooperative group-commit coordinator for the WAL.
//!
//! Mirrors PostgreSQL's `XLogFlush` waiter logic
//! (`src/backend/access/transam/xlog.c`). The single-writer commit
//! path used to call `wal.sync()` once per commit, so N concurrent
//! writers paid N independent fsyncs (~N × 100 µs on SSD). Group
//! commit collapses those into **one** fsync that covers every byte
//! appended up to the slowest writer's LSN.
//!
//! # Algorithm
//!
//! 1. A writer appends its records under the WAL lock and captures
//!    the resulting `commit_lsn = wal.current_lsn()` after its
//!    `Commit` record.
//! 2. The writer releases the WAL lock and calls
//!    [`GroupCommit::commit_at_least(commit_lsn, &wal)`].
//! 3. Inside `commit_at_least`:
//!    - **Fast path:** if `flushed_lsn >= commit_lsn`, the write is
//!      already durable from a piggyback on a previous fsync.
//!      Return immediately.
//!    - Otherwise take the coordinator state lock. Re-check the
//!      flushed LSN (another leader may have raced).
//!    - If a leader is already mid-flush, wait on the condvar until
//!      that flush finishes. Re-check `flushed_lsn`; if the batch now
//!      covers `commit_lsn`, return, otherwise race to become the next
//!      leader for the remaining tail.
//!    - If no leader is in progress, become the leader: mark
//!      `in_progress = true`, drop the state lock, take the WAL
//!      lock, call `wal.sync()`, publish the new `flushed_lsn`,
//!      take the state lock again, clear `in_progress`, and
//!      notify all waiters.
//!
//! # Why this works
//!
//! Between the first writer's `append` and the leader's `wal.sync()`,
//! other writers can grab the WAL lock and append more records.
//! When the leader finally calls `sync()`, it flushes **everything**
//! that has been appended so far — not just its own records. Writers
//! whose LSN is now covered return immediately; writers that appended
//! after the leader captured `target_lsn` wake up, see they still need
//! more durability, and one of them becomes the next leader.
//!
//! So `commit_at_least` produces one fsync per *batch* of concurrent
//! writers, not per writer. On a workload with 8 concurrent
//! committers, the throughput goes from ~8 × 100 µs ≈ 1 250
//! commits/s to ~1 × 100 µs ≈ 10 000 commits/s, an 8× win.
//!
//! # Correctness
//!
//! - `flushed_lsn` is monotonic: only the leader writes it, and
//!   only after a successful `sync()`.
//! - The state lock + condvar guarantee that exactly one leader is
//!   ever in flight, so we never have two parallel fsyncs racing.
//! - Waiters re-check `flushed_lsn` under the state lock before
//!   sleeping, so we never miss a wake-up.
//! - The leader does **only** the WAL `sync()` while holding
//!   leadership — no extra work — to keep the critical section as
//!   short as possible.

use std::io;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::{Condvar, Mutex};

use super::writer::WalWriter;

/// Coordinator state guarded by `GroupCommit::state`.
struct GroupCommitState {
    /// True when a leader is currently inside `wal.sync()`.
    in_progress: bool,
}

/// Cooperative WAL flush coordinator.
///
/// Owned by the [`super::transaction::TransactionManager`]; cheap to
/// share across writers via `Arc`.
pub struct GroupCommit {
    /// Highest LSN known to be durable. Atomic so the fast path can
    /// read it without taking the state lock.
    flushed_lsn: AtomicU64,
    /// Coordination state.
    state: Mutex<GroupCommitState>,
    /// Wakes waiters when `flushed_lsn` advances.
    cond: Condvar,
}

struct LeadershipGuard<'a> {
    group_commit: &'a GroupCommit,
}

impl GroupCommit {
    /// Create a new coordinator initialised with the WAL's current
    /// durable position. Pass `wal.durable_lsn()` from a freshly
    /// opened `WalWriter`.
    pub fn new(initial_durable_lsn: u64) -> Self {
        Self {
            flushed_lsn: AtomicU64::new(initial_durable_lsn),
            state: Mutex::new(GroupCommitState { in_progress: false }),
            cond: Condvar::new(),
        }
    }

    /// Highest LSN that is known durable on disk. Cheap atomic read,
    /// no lock taken. Used by tests and by the diagnostics surface.
    pub fn flushed_lsn(&self) -> u64 {
        self.flushed_lsn.load(Ordering::Acquire)
    }

    /// Block the caller until the WAL is durable up to at least
    /// `target`. If another writer is already mid-flush, piggyback
    /// on it; otherwise become the leader and do the fsync ourselves.
    ///
    /// `wal` is the same `Mutex<WalWriter>` the transaction manager
    /// holds. The leader briefly takes that lock to call `sync()`.
    pub fn commit_at_least(&self, target: u64, wal: &Mutex<WalWriter>) -> io::Result<()> {
        // ── Fast path: already flushed past us. ─────────────────────
        if self.flushed_lsn.load(Ordering::Acquire) >= target {
            return Ok(());
        }

        let mut state = self.state.lock().unwrap_or_else(|p| p.into_inner());

        loop {
            // Re-check under the lock: another leader may have raced
            // and already flushed past our target between the load
            // above and the lock acquisition.
            if self.flushed_lsn.load(Ordering::Acquire) >= target {
                return Ok(());
            }

            if state.in_progress {
                // Another leader is mid-flush. Wait until they wake us,
                // then re-check whether our target is covered. If not,
                // loop and try to become the next leader instead of
                // going back to sleep forever.
                state = self
                    .cond
                    .wait_while(state, |state| {
                        state.in_progress && self.flushed_lsn.load(Ordering::Acquire) < target
                    })
                    .unwrap_or_else(|p| p.into_inner());
                continue;
            }

            // ── We are the leader. ──────────────────────────────────
            state.in_progress = true;
            break;
        }

        // Drop the state lock so waiters can re-check `flushed_lsn`
        // without bouncing on this mutex.
        drop(state);
        let _leader = LeadershipGuard { group_commit: self };

        // Phase 1 — take the WAL lock BRIEFLY to drain the BufWriter
        // into the kernel and capture both the LSN and a cloned
        // file handle for the fsync. The whole point of this dance
        // is that we DO NOT hold the WAL lock during the expensive
        // sync_all() call below.
        let (target_lsn, sync_handle) = {
            let mut wal_guard = wal.lock().unwrap_or_else(|p| p.into_inner());
            wal_guard.drain_for_group_sync()?
        };

        // Phase 2 — the lock is released. Other writers can now
        // append into the BufWriter while we wait on fsync. Their
        // bytes will either be picked up by THIS sync_all() (if
        // they reach the kernel before the syscall returns) or
        // by the NEXT leader.
        //
        // sync_all() on the cloned handle flushes the same kernel
        // inode the BufWriter writes to, so coverage is correct.
        sync_handle.sync_all()?;

        // Phase 3 — take the WAL lock briefly to publish the new
        // durable LSN. Other writers may have appended in the
        // meantime; their bytes that reached the kernel are NOW
        // also durable, but we conservatively only mark `target_lsn`
        // (the LSN we drained) as durable. Any later writer's
        // commit_at_least call will quickly become a no-op via the
        // fast path if the kernel already had their bytes when we
        // fsync'd.
        {
            let mut wal_guard = wal.lock().unwrap_or_else(|p| p.into_inner());
            wal_guard.mark_durable(target_lsn);
        }

        // Publish the new flushed LSN to atomic readers, then
        // let the leadership guard wake every waiter.
        self.flushed_lsn.store(target_lsn, Ordering::Release);

        Ok(())
    }

    fn release_leadership(&self) {
        let mut state = self.state.lock().unwrap_or_else(|p| p.into_inner());
        state.in_progress = false;
        drop(state);
        self.cond.notify_all();
    }
}

impl std::fmt::Debug for GroupCommit {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        f.debug_struct("GroupCommit")
            .field("flushed_lsn", &self.flushed_lsn.load(Ordering::Acquire))
            .finish()
    }
}

impl Drop for LeadershipGuard<'_> {
    fn drop(&mut self) {
        self.group_commit.release_leadership();
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use crate::storage::wal::record::WalRecord;
    use crate::storage::wal::writer::WalWriter;
    use std::path::PathBuf;
    use std::sync::mpsc;
    use std::sync::Arc;
    use std::thread;
    use std::time::{Duration, SystemTime, UNIX_EPOCH};

    struct FileGuard {
        path: PathBuf,
    }

    impl Drop for FileGuard {
        fn drop(&mut self) {
            let _ = std::fs::remove_file(&self.path);
        }
    }

    fn temp_wal(name: &str) -> (FileGuard, PathBuf) {
        let nanos = SystemTime::now()
            .duration_since(UNIX_EPOCH)
            .unwrap()
            .as_nanos();
        let path = std::env::temp_dir().join(format!(
            "rb_group_commit_{}_{}_{}.wal",
            name,
            std::process::id(),
            nanos
        ));
        let _ = std::fs::remove_file(&path);
        (FileGuard { path: path.clone() }, path)
    }

    #[test]
    fn fast_path_when_already_flushed() {
        let (_g, path) = temp_wal("fast_path");
        let wal = Mutex::new(WalWriter::open(&path).unwrap());
        let initial = wal.lock().unwrap().durable_lsn();
        let gc = GroupCommit::new(initial);
        // Target equal to the initial flushed_lsn → no fsync.
        gc.commit_at_least(initial, &wal).unwrap();
        assert_eq!(gc.flushed_lsn(), initial);
    }

    #[test]
    fn single_writer_advances_flushed_lsn() {
        let (_g, path) = temp_wal("single_writer");
        let wal = Mutex::new(WalWriter::open(&path).unwrap());
        let initial = wal.lock().unwrap().durable_lsn();
        let gc = GroupCommit::new(initial);

        // Append a record, capture its LSN, then commit.
        let target = {
            let mut w = wal.lock().unwrap();
            w.append(&WalRecord::Begin { tx_id: 1 }).unwrap();
            w.append(&WalRecord::Commit { tx_id: 1 }).unwrap();
            w.current_lsn()
        };
        assert!(target > initial);

        gc.commit_at_least(target, &wal).unwrap();
        assert!(gc.flushed_lsn() >= target);
    }

    #[test]
    fn flushed_lsn_is_monotonic() {
        let (_g, path) = temp_wal("monotonic");
        let wal = Mutex::new(WalWriter::open(&path).unwrap());
        let initial = wal.lock().unwrap().durable_lsn();
        let gc = GroupCommit::new(initial);

        // First commit.
        let lo = {
            let mut w = wal.lock().unwrap();
            w.append(&WalRecord::Begin { tx_id: 1 }).unwrap();
            w.current_lsn()
        };
        gc.commit_at_least(lo, &wal).unwrap();
        let after_lo = gc.flushed_lsn();

        // Second commit advances further.
        let hi = {
            let mut w = wal.lock().unwrap();
            w.append(&WalRecord::Commit { tx_id: 1 }).unwrap();
            w.current_lsn()
        };
        gc.commit_at_least(hi, &wal).unwrap();
        let after_hi = gc.flushed_lsn();

        assert!(after_hi >= after_lo);
        // Calling commit_at_least with `lo` after `hi` is a no-op.
        gc.commit_at_least(lo, &wal).unwrap();
        assert_eq!(gc.flushed_lsn(), after_hi);
    }

    #[test]
    fn concurrent_writers_coalesce_through_one_coordinator() {
        // Two threads each commit a few records. Both must succeed
        // and `flushed_lsn` must reflect every byte they wrote.
        // We can't directly count fsyncs at this layer (the WAL
        // doesn't expose a sync counter), but the absence of
        // deadlock and the correct final LSN are the contract.
        let (_g, path) = temp_wal("two_writers");
        let wal = Arc::new(Mutex::new(WalWriter::open(&path).unwrap()));
        let initial = wal.lock().unwrap().durable_lsn();
        let gc = Arc::new(GroupCommit::new(initial));

        let mut handles = Vec::new();
        for tx in 0..2u64 {
            let wal_c = Arc::clone(&wal);
            let gc_c = Arc::clone(&gc);
            handles.push(thread::spawn(move || -> io::Result<()> {
                for i in 0..10u64 {
                    let target = {
                        let mut w = wal_c.lock().unwrap();
                        w.append(&WalRecord::Begin {
                            tx_id: tx * 100 + i,
                        })?;
                        w.append(&WalRecord::Commit {
                            tx_id: tx * 100 + i,
                        })?;
                        w.current_lsn()
                    };
                    gc_c.commit_at_least(target, &wal_c)?;
                }
                Ok(())
            }));
        }

        for h in handles {
            h.join().unwrap().unwrap();
        }

        let final_durable = wal.lock().unwrap().durable_lsn();
        assert!(gc.flushed_lsn() >= final_durable);
        // 20 commits worth of 13-byte Begin + 13-byte Commit
        // records = 520 bytes minimum on top of the 8-byte header.
        assert!(final_durable >= 8 + 520);
    }

    #[test]
    fn waiter_retries_after_wakeup_if_previous_flush_was_too_small() {
        let (_g, path) = temp_wal("waiter_retry");
        let wal = Arc::new(Mutex::new(WalWriter::open(&path).unwrap()));
        let initial = wal.lock().unwrap().durable_lsn();
        let gc = Arc::new(GroupCommit::new(initial));

        let target = {
            let mut w = wal.lock().unwrap();
            w.append(&WalRecord::Begin { tx_id: 1 }).unwrap();
            w.append(&WalRecord::Commit { tx_id: 1 }).unwrap();
            w.current_lsn()
        };

        {
            let mut state = gc.state.lock().unwrap();
            state.in_progress = true;
        }

        let (done_tx, done_rx) = mpsc::channel();
        let wal_c = Arc::clone(&wal);
        let gc_c = Arc::clone(&gc);
        let waiter = thread::spawn(move || {
            let result = gc_c.commit_at_least(target, &wal_c);
            let _ = done_tx.send(result);
        });

        thread::sleep(Duration::from_millis(50));

        {
            let mut state = gc.state.lock().unwrap_or_else(|p| p.into_inner());
            state.in_progress = false;
        }
        gc.cond.notify_all();

        done_rx
            .recv_timeout(Duration::from_secs(2))
            .expect("waiter should retry as the next leader")
            .unwrap();
        waiter.join().unwrap();

        assert!(gc.flushed_lsn() >= target);
        assert!(wal.lock().unwrap().durable_lsn() >= target);
    }

    #[test]
    fn high_concurrency_eight_writers_no_deadlock() {
        // 8 threads × 50 commits each. Stress the coordinator:
        // expected to complete without deadlock and with the WAL
        // fully durable up to the largest committed LSN.
        let (_g, path) = temp_wal("eight_writers");
        let wal = Arc::new(Mutex::new(WalWriter::open(&path).unwrap()));
        let initial = wal.lock().unwrap().durable_lsn();
        let gc = Arc::new(GroupCommit::new(initial));

        let mut handles = Vec::new();
        for tx in 0..8u64 {
            let wal_c = Arc::clone(&wal);
            let gc_c = Arc::clone(&gc);
            handles.push(thread::spawn(move || -> io::Result<()> {
                for i in 0..50u64 {
                    let target = {
                        let mut w = wal_c.lock().unwrap();
                        w.append(&WalRecord::Begin {
                            tx_id: tx * 1000 + i,
                        })?;
                        w.append(&WalRecord::Commit {
                            tx_id: tx * 1000 + i,
                        })?;
                        w.current_lsn()
                    };
                    gc_c.commit_at_least(target, &wal_c)?;
                }
                Ok(())
            }));
        }

        for h in handles {
            h.join().unwrap().unwrap();
        }

        let current = wal.lock().unwrap().current_lsn();
        let durable = wal.lock().unwrap().durable_lsn();
        assert_eq!(durable, current, "every appended byte must be durable");
        assert!(gc.flushed_lsn() >= current);
    }

    #[test]
    fn writers_recover_from_poisoned_state() {
        // If a previous panic poisoned the state mutex, subsequent
        // writers must still be able to commit (we recover via
        // `unwrap_or_else(into_inner)`).
        let (_g, path) = temp_wal("poison_recovery");
        let wal = Arc::new(Mutex::new(WalWriter::open(&path).unwrap()));
        let initial = wal.lock().unwrap().durable_lsn();
        let gc = Arc::new(GroupCommit::new(initial));

        // Poison the state mutex by panicking inside it.
        let gc_c = Arc::clone(&gc);
        let _ = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
            let _state = gc_c.state.lock().unwrap();
            panic!("intentional poison");
        }));

        // The mutex is now poisoned — but commit_at_least must
        // still work because we recover from poisoning.
        let target = {
            let mut w = wal.lock().unwrap();
            w.append(&WalRecord::Begin { tx_id: 1 }).unwrap();
            w.current_lsn()
        };
        gc.commit_at_least(target, &wal).unwrap();
        assert!(gc.flushed_lsn() >= target);
    }
}