use std::{
fmt,
fmt::Debug,
sync::{
Arc,
atomic::{AtomicU64, Ordering},
},
time::Duration,
};
use reifydb_core::common::CommitVersion;
use reifydb_runtime::{
actor::{mailbox::ActorRef, system::ActorSystem},
sync::{condvar::Condvar, mutex::Mutex},
};
use tracing::instrument;
use super::actor::{WatermarkActor, WatermarkMsg, WatermarkShared};
#[derive(Debug)]
pub struct WaiterHandle {
notified: Mutex<bool>,
condvar: Condvar,
}
impl WaiterHandle {
fn new() -> Self {
Self {
notified: Mutex::new(false),
condvar: Condvar::new(),
}
}
pub(crate) fn notify(&self) {
let mut guard = self.notified.lock();
*guard = true;
self.condvar.notify_one();
}
fn wait_timeout(&self, timeout: Duration) -> bool {
let mut guard = self.notified.lock();
if *guard {
return true;
}
!self.condvar.wait_for(&mut guard, timeout).timed_out()
}
}
pub struct WaterMark {
actor: ActorRef<WatermarkMsg>,
shared: Arc<WatermarkShared>,
}
impl Debug for WaterMark {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("WaterMark")
.field("done_until", &self.shared.done_until.load(Ordering::Relaxed))
.field("last_index", &self.shared.last_index.load(Ordering::Relaxed))
.finish()
}
}
impl WaterMark {
#[instrument(name = "transaction::watermark::new", level = "debug", skip(system), fields(task_name = %task_name))]
pub fn new(task_name: String, system: &ActorSystem) -> Self {
let shared = Arc::new(WatermarkShared {
done_until: AtomicU64::new(0),
last_index: AtomicU64::new(0),
});
let actor = WatermarkActor {
shared: shared.clone(),
};
let actor_ref = system.spawn(&task_name, actor).actor_ref().clone();
Self {
actor: actor_ref,
shared,
}
}
#[instrument(name = "transaction::watermark::begin", level = "trace", skip(self), fields(version = version.0))]
pub fn begin(&self, version: CommitVersion) {
self.shared.last_index.fetch_max(version.0, Ordering::SeqCst);
let _ = self.actor.send(WatermarkMsg::Begin {
version: version.0,
});
}
#[instrument(name = "transaction::watermark::done", level = "trace", skip(self), fields(index = version.0))]
pub fn done(&self, version: CommitVersion) {
let _ = self.actor.send(WatermarkMsg::Done {
version: version.0,
});
}
pub fn done_until(&self) -> CommitVersion {
CommitVersion(self.shared.done_until.load(Ordering::SeqCst))
}
pub fn last_index(&self) -> CommitVersion {
CommitVersion(self.shared.last_index.load(Ordering::SeqCst))
}
pub fn advance_to(&self, version: CommitVersion) {
self.shared.last_index.fetch_max(version.0, Ordering::SeqCst);
self.shared.done_until.fetch_max(version.0, Ordering::SeqCst);
}
pub fn wait_for_mark(&self, index: u64) {
self.wait_for_mark_timeout(CommitVersion(index), Duration::from_secs(30));
}
pub fn wait_for_mark_timeout(&self, index: CommitVersion, timeout: Duration) -> bool {
let current_done = self.shared.done_until.load(Ordering::SeqCst);
if current_done >= index.0 {
return true;
}
let waiter = Arc::new(WaiterHandle::new());
if self.actor
.send(WatermarkMsg::WaitFor {
version: index.0,
waiter: waiter.clone(),
})
.is_err()
{
return false;
}
waiter.wait_timeout(timeout)
}
}
#[cfg(test)]
pub mod tests {
use std::{sync::atomic::AtomicUsize, thread, thread::sleep, time::Duration};
use reifydb_runtime::{actor::system::ActorSystem, context::clock::Clock};
use super::*;
use crate::multi::watermark::OLD_VERSION_THRESHOLD;
#[test]
fn test_basic() {
init_and_close(|_| {});
}
#[test]
fn test_begin_done() {
init_and_close(|watermark| {
watermark.begin(CommitVersion(1));
watermark.begin(CommitVersion(2));
watermark.begin(CommitVersion(3));
watermark.done(CommitVersion(1));
watermark.done(CommitVersion(2));
watermark.done(CommitVersion(3));
});
}
#[test]
fn test_wait_for_mark() {
init_and_close(|watermark| {
watermark.begin(CommitVersion(1));
watermark.begin(CommitVersion(2));
watermark.begin(CommitVersion(3));
watermark.done(CommitVersion(2));
watermark.done(CommitVersion(3));
assert_eq!(watermark.done_until(), 0);
watermark.done(CommitVersion(1));
watermark.wait_for_mark(1);
watermark.wait_for_mark(3);
assert_eq!(watermark.done_until(), 3);
});
}
#[test]
fn test_done_until() {
init_and_close(|watermark| {
watermark.shared.done_until.store(1, Ordering::SeqCst);
assert_eq!(watermark.done_until(), 1);
});
}
#[test]
fn test_high_concurrency() {
let system = ActorSystem::new(1);
let watermark = Arc::new(WaterMark::new("concurrent".into(), &system));
const NUM_TASKS: usize = 50;
const OPS_PER_TASK: usize = 100;
let mut handles = vec![];
for task_id in 0..NUM_TASKS {
let wm = watermark.clone();
let handle = thread::spawn(move || {
for i in 0..OPS_PER_TASK {
let version = CommitVersion((task_id * OPS_PER_TASK + i) as u64 + 1);
wm.begin(version);
wm.done(version);
}
});
handles.push(handle);
}
for handle in handles {
handle.join().unwrap();
}
sleep(Duration::from_millis(100));
let final_done = watermark.done_until();
assert!(final_done.0 > 0, "Watermark should have progressed");
system.shutdown();
sleep(Duration::from_millis(150)); }
#[test]
fn test_concurrent_wait_for_mark() {
let system = ActorSystem::new(1);
let watermark = Arc::new(WaterMark::new("wait_concurrent".into(), &system));
let success_count = Arc::new(AtomicUsize::new(0));
for i in 1..=10 {
watermark.begin(CommitVersion(i));
}
let mut handles = vec![];
for version in 1..=10 {
let wm = watermark.clone();
let counter = success_count.clone();
let handle = thread::spawn(move || {
if wm.wait_for_mark_timeout(CommitVersion(version), Duration::from_secs(5)) {
counter.fetch_add(1, Ordering::Relaxed);
}
});
handles.push(handle);
}
sleep(Duration::from_millis(50));
for i in 1..=10 {
watermark.done(CommitVersion(i));
}
for handle in handles {
handle.join().unwrap();
}
assert_eq!(success_count.load(Ordering::Relaxed), 10);
system.shutdown();
sleep(Duration::from_millis(150)); }
#[test]
fn test_old_version_rejection() {
init_and_close(|watermark| {
for i in 1..=100 {
watermark.begin(CommitVersion(i));
watermark.done(CommitVersion(i));
}
sleep(Duration::from_millis(50));
let done_until = watermark.done_until();
assert!(done_until.0 >= 50, "Should have processed many versions");
let very_old = done_until.0.saturating_sub(OLD_VERSION_THRESHOLD + 10);
let clock = Clock::Real;
let start = clock.instant();
watermark.wait_for_mark(very_old);
let elapsed = start.elapsed();
assert!(elapsed.as_millis() < 10, "Old version wait should return immediately");
});
}
#[test]
fn test_timeout_behavior() {
init_and_close(|watermark| {
watermark.begin(CommitVersion(1));
let clock = Clock::Real;
let start = clock.instant();
let result = watermark.wait_for_mark_timeout(CommitVersion(1), Duration::from_millis(100));
let elapsed = start.elapsed();
assert!(!result, "Should timeout waiting for uncompleted version");
assert!(
elapsed.as_millis() >= 100 && elapsed.as_millis() < 200,
"Should respect timeout duration"
);
});
}
#[test]
fn test_out_of_order_begin() {
init_and_close(|watermark| {
watermark.begin(CommitVersion(3));
watermark.begin(CommitVersion(1));
watermark.begin(CommitVersion(2));
watermark.done(CommitVersion(1));
watermark.done(CommitVersion(2));
watermark.done(CommitVersion(3));
sleep(Duration::from_millis(50));
let done = watermark.done_until();
assert_eq!(done.0, 3, "Watermark should advance to 3, got {}", done.0);
});
}
#[test]
fn test_orphaned_done_before_begin() {
init_and_close(|watermark| {
watermark.done(CommitVersion(1));
sleep(Duration::from_millis(20));
assert_eq!(watermark.done_until().0, 0);
watermark.begin(CommitVersion(1));
sleep(Duration::from_millis(50));
let done = watermark.done_until();
assert_eq!(done.0, 1, "Watermark should advance to 1 after begin, got {}", done.0);
});
}
#[test]
fn test_mixed_out_of_order() {
init_and_close(|watermark| {
watermark.begin(CommitVersion(2));
watermark.done(CommitVersion(3)); watermark.begin(CommitVersion(1));
watermark.done(CommitVersion(1));
watermark.begin(CommitVersion(3));
watermark.done(CommitVersion(2));
sleep(Duration::from_millis(50));
let done = watermark.done_until();
assert_eq!(done.0, 3, "Watermark should advance to 3, got {}", done.0);
});
}
fn init_and_close<F>(f: F)
where
F: FnOnce(Arc<WaterMark>),
{
let system = ActorSystem::new(1);
let watermark = Arc::new(WaterMark::new("watermark".into(), &system));
f(watermark);
sleep(Duration::from_millis(10));
system.shutdown();
sleep(Duration::from_millis(150)); }
}