use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use event_listener::{Event, EventListener};
use crate::types::GlobalSequence;
#[derive(Debug)]
pub(crate) struct WriteWatcherInner {
committed_next_global_seq: AtomicU64,
event: Event,
}
impl WriteWatcherInner {
pub(crate) fn new(initial_next_seq: GlobalSequence) -> Self {
Self {
committed_next_global_seq: AtomicU64::new(initial_next_seq.0),
event: Event::new(),
}
}
pub(crate) fn committed_next_global_seq(&self) -> GlobalSequence {
GlobalSequence(self.committed_next_global_seq.load(Ordering::Acquire))
}
pub(crate) fn notify(&self, new_next_seq: GlobalSequence) {
self.committed_next_global_seq
.fetch_max(new_next_seq.0, Ordering::AcqRel);
self.event.notify(usize::MAX); }
pub(crate) fn listen(&self) -> EventListener {
self.event.listen()
}
}
#[derive(Debug, Clone)]
pub struct WriteWatcher {
inner: Arc<WriteWatcherInner>,
}
impl WriteWatcher {
pub(crate) fn new(initial_next_seq: GlobalSequence) -> Self {
Self {
inner: Arc::new(WriteWatcherInner::new(initial_next_seq)),
}
}
pub fn committed_next_global_seq(&self) -> GlobalSequence {
self.inner.committed_next_global_seq()
}
pub async fn wait_for_global_seq(&self, from: GlobalSequence) -> GlobalSequence {
loop {
let listener = self.inner.listen();
let current = self.inner.committed_next_global_seq();
if current > from {
return current;
}
listener.await;
}
}
pub(crate) fn notify(&self, new_next_seq: GlobalSequence) {
self.inner.notify(new_next_seq);
}
}
#[cfg(test)]
mod tests {
use super::*;
use rstest::rstest;
use std::thread;
use std::time::Duration;
#[rstest]
#[case(GlobalSequence(0))]
#[case(GlobalSequence(5))]
#[case(GlobalSequence(42))]
fn committed_next_global_seq_starts_at_initial(#[case] initial: GlobalSequence) {
let watcher = WriteWatcher::new(initial);
assert_eq!(watcher.committed_next_global_seq(), initial);
}
#[rstest]
fn clone_shares_state() {
let watcher = WriteWatcher::new(GlobalSequence(0));
let watcher2 = watcher.clone();
watcher.notify(GlobalSequence(3));
assert_eq!(watcher2.committed_next_global_seq(), GlobalSequence(3));
}
#[rstest]
fn notify_is_monotonic() {
let watcher = WriteWatcher::new(GlobalSequence(5));
watcher.notify(GlobalSequence(10));
assert_eq!(watcher.committed_next_global_seq(), GlobalSequence(10));
watcher.notify(GlobalSequence(7));
assert_eq!(watcher.committed_next_global_seq(), GlobalSequence(10));
}
#[rstest]
fn wait_returns_immediately_when_already_ahead() {
let watcher = WriteWatcher::new(GlobalSequence(10));
let got = pollster::block_on(watcher.wait_for_global_seq(GlobalSequence(5)));
assert_eq!(got, GlobalSequence(10));
}
#[rstest]
fn wait_ignores_spurious_notifications_when_watermark_does_not_advance() {
let watcher = WriteWatcher::new(GlobalSequence(0));
let notifier = watcher.clone();
thread::spawn(move || {
thread::sleep(Duration::from_millis(10));
notifier.notify(GlobalSequence(0));
thread::sleep(Duration::from_millis(10));
notifier.notify(GlobalSequence(1));
});
let got = pollster::block_on(watcher.wait_for_global_seq(GlobalSequence(0)));
assert_eq!(got, GlobalSequence(1));
}
#[rstest]
fn wait_wakes_multiple_waiters() {
let watcher = WriteWatcher::new(GlobalSequence(0));
let w1 = watcher.clone();
let w2 = watcher.clone();
let t1 =
thread::spawn(move || pollster::block_on(w1.wait_for_global_seq(GlobalSequence(0))));
let t2 =
thread::spawn(move || pollster::block_on(w2.wait_for_global_seq(GlobalSequence(0))));
thread::sleep(Duration::from_millis(10));
watcher.notify(GlobalSequence(1));
assert_eq!(t1.join().unwrap(), GlobalSequence(1));
assert_eq!(t2.join().unwrap(), GlobalSequence(1));
}
}