use super::maintenance_gate::{MaintenanceGate, MaintenanceReadGuard, MaintenanceWriteGuard};
#[derive(Debug)]
pub(crate) struct CommitGate {
gate: MaintenanceGate,
}
impl Default for CommitGate {
fn default() -> Self {
Self::new()
}
}
impl CommitGate {
#[must_use]
pub(crate) const fn new() -> Self {
Self {
gate: MaintenanceGate::new(),
}
}
pub(crate) fn enter_writer(&self) -> CommitWriteGuard<'_> {
CommitWriteGuard {
_inner: self.gate.enter_shared(),
}
}
pub(crate) fn enter_checkpoint(&self) -> CommitCheckpointGuard<'_> {
CommitCheckpointGuard {
_inner: self.gate.enter_exclusive(),
}
}
}
#[derive(Debug)]
pub(crate) struct CommitWriteGuard<'a> {
_inner: MaintenanceReadGuard<'a>,
}
#[derive(Debug)]
pub(crate) struct CommitCheckpointGuard<'a> {
_inner: MaintenanceWriteGuard<'a>,
}
#[cfg(test)]
mod tests {
use super::*;
use std::sync::mpsc::sync_channel;
use std::sync::Arc;
use std::thread;
use std::time::Duration;
#[test]
fn writers_can_enter_concurrently() {
let gate = CommitGate::new();
let _a = gate.enter_writer();
let _b = gate.enter_writer();
}
#[test]
fn checkpoint_waits_for_admitted_writer_and_blocks_new_writers() {
let gate = Arc::new(CommitGate::new());
let writer = gate.enter_writer();
let ck_gate = Arc::clone(&gate);
let (ck_started_tx, ck_started_rx) = sync_channel(0);
let (ck_done_tx, ck_done_rx) = sync_channel(0);
let ck = thread::spawn(move || {
ck_started_tx.send(()).unwrap();
let _ck = ck_gate.enter_checkpoint();
ck_done_tx.send(()).unwrap();
});
ck_started_rx.recv().unwrap();
assert!(ck_done_rx.recv_timeout(Duration::from_millis(50)).is_err());
let writer_gate = Arc::clone(&gate);
let (writer_done_tx, writer_done_rx) = sync_channel(0);
let blocked_writer = thread::spawn(move || {
let _w = writer_gate.enter_writer();
writer_done_tx.send(()).unwrap();
});
assert!(
writer_done_rx
.recv_timeout(Duration::from_millis(50))
.is_err(),
"new writer must wait behind a pending checkpoint"
);
drop(writer);
ck_done_rx.recv_timeout(Duration::from_secs(1)).unwrap();
ck.join().unwrap();
writer_done_rx.recv_timeout(Duration::from_secs(1)).unwrap();
blocked_writer.join().unwrap();
}
}