use std::{
fmt,
fmt::Debug,
sync::{
Arc,
atomic::{AtomicU64, Ordering},
},
time::Duration,
};
use reifydb_core::{actors::watermark::WatermarkMessage, common::CommitVersion};
use reifydb_runtime::{
actor::{mailbox::ActorRef, system::ActorSystem},
sync::waiter::WaiterHandle,
};
use tracing::instrument;
use super::actor::{WatermarkActor, WatermarkShared};
pub struct WaterMark {
actor: ActorRef<WatermarkMessage>,
shared: Arc<WatermarkShared>,
}
impl Debug for WaterMark {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("WaterMark")
.field("done_until", &self.shared.done_until.load(Ordering::Relaxed))
.field("last_index", &self.shared.last_index.load(Ordering::Relaxed))
.finish()
}
}
impl WaterMark {
#[instrument(name = "transaction::watermark::new", level = "debug", skip(system), fields(task_name = %task_name))]
pub fn new(task_name: String, system: &ActorSystem) -> Self {
let shared = Arc::new(WatermarkShared {
done_until: AtomicU64::new(0),
last_index: AtomicU64::new(0),
});
let actor = WatermarkActor {
shared: shared.clone(),
};
let actor_ref = system.spawn_system(&task_name, actor).actor_ref().clone();
Self {
actor: actor_ref,
shared,
}
}
#[instrument(name = "transaction::watermark::register_in_flight", level = "trace", skip(self), fields(version = version.0))]
pub fn register_in_flight(&self, version: CommitVersion) {
self.shared.last_index.fetch_max(version.0, Ordering::SeqCst);
let _ = self.actor.send(WatermarkMessage::Begin {
version: version.0,
});
}
#[instrument(name = "transaction::watermark::mark_finished", level = "trace", skip(self), fields(index = version.0))]
pub fn mark_finished(&self, version: CommitVersion) {
let _ = self.actor.send(WatermarkMessage::Done {
version: version.0,
});
}
pub fn done_until(&self) -> CommitVersion {
CommitVersion(self.shared.done_until.load(Ordering::SeqCst))
}
pub fn last_index(&self) -> CommitVersion {
CommitVersion(self.shared.last_index.load(Ordering::SeqCst))
}
pub fn advance_to(&self, version: CommitVersion) {
self.shared.last_index.fetch_max(version.0, Ordering::SeqCst);
self.shared.done_until.fetch_max(version.0, Ordering::SeqCst);
}
pub fn wait_for_mark(&self, index: u64) {
self.wait_for_mark_timeout(CommitVersion(index), Duration::from_secs(30));
}
pub fn wait_for_mark_timeout(&self, index: CommitVersion, timeout: Duration) -> bool {
let current_done = self.shared.done_until.load(Ordering::SeqCst);
if current_done >= index.0 {
return true;
}
let waiter = Arc::new(WaiterHandle::new());
if self.actor
.send(WatermarkMessage::WaitFor {
version: index.0,
waiter: waiter.clone(),
})
.is_err()
{
return false;
}
waiter.wait_timeout(timeout)
}
}
#[cfg(test)]
pub mod tests {
use std::{sync::atomic::AtomicUsize, thread, thread::sleep, time::Duration};
use reifydb_runtime::{actor::system::ActorSystem, context::clock::Clock, pool::Pools};
use super::*;
use crate::multi::watermark::OLD_VERSION_THRESHOLD;
#[test]
fn test_basic() {
init_and_close(|_| {});
}
#[test]
fn test_begin_done() {
init_and_close(|watermark| {
watermark.register_in_flight(CommitVersion(1));
watermark.register_in_flight(CommitVersion(2));
watermark.register_in_flight(CommitVersion(3));
watermark.mark_finished(CommitVersion(1));
watermark.mark_finished(CommitVersion(2));
watermark.mark_finished(CommitVersion(3));
});
}
#[test]
fn test_wait_for_mark() {
init_and_close(|watermark| {
watermark.register_in_flight(CommitVersion(1));
watermark.register_in_flight(CommitVersion(2));
watermark.register_in_flight(CommitVersion(3));
watermark.mark_finished(CommitVersion(2));
watermark.mark_finished(CommitVersion(3));
assert_eq!(watermark.done_until(), 0);
watermark.mark_finished(CommitVersion(1));
watermark.wait_for_mark(1);
watermark.wait_for_mark(3);
assert_eq!(watermark.done_until(), 3);
});
}
#[test]
fn test_done_until() {
init_and_close(|watermark| {
watermark.shared.done_until.store(1, Ordering::SeqCst);
assert_eq!(watermark.done_until(), 1);
});
}
#[test]
fn test_high_concurrency() {
let system = ActorSystem::new(Pools::default(), Clock::Real);
let watermark = Arc::new(WaterMark::new("concurrent".into(), &system));
const NUM_TASKS: usize = 50;
const OPS_PER_TASK: usize = 100;
let mut handles = vec![];
for task_id in 0..NUM_TASKS {
let wm = watermark.clone();
let handle = thread::spawn(move || {
for i in 0..OPS_PER_TASK {
let version = CommitVersion((task_id * OPS_PER_TASK + i) as u64 + 1);
wm.register_in_flight(version);
wm.mark_finished(version);
}
});
handles.push(handle);
}
for handle in handles {
handle.join().unwrap();
}
sleep(Duration::from_millis(100));
let final_done = watermark.done_until();
assert!(final_done.0 > 0, "Watermark should have progressed");
system.shutdown();
sleep(Duration::from_millis(150)); }
#[test]
fn test_concurrent_wait_for_mark() {
let system = ActorSystem::new(Pools::default(), Clock::Real);
let watermark = Arc::new(WaterMark::new("wait_concurrent".into(), &system));
let success_count = Arc::new(AtomicUsize::new(0));
for i in 1..=10 {
watermark.register_in_flight(CommitVersion(i));
}
let mut handles = vec![];
for version in 1..=10 {
let wm = watermark.clone();
let counter = success_count.clone();
let handle = thread::spawn(move || {
if wm.wait_for_mark_timeout(CommitVersion(version), Duration::from_secs(5)) {
counter.fetch_add(1, Ordering::Relaxed);
}
});
handles.push(handle);
}
sleep(Duration::from_millis(50));
for i in 1..=10 {
watermark.mark_finished(CommitVersion(i));
}
for handle in handles {
handle.join().unwrap();
}
assert_eq!(success_count.load(Ordering::Relaxed), 10);
system.shutdown();
sleep(Duration::from_millis(150)); }
#[test]
fn test_old_version_rejection() {
init_and_close(|watermark| {
for i in 1..=100 {
watermark.register_in_flight(CommitVersion(i));
watermark.mark_finished(CommitVersion(i));
}
let reached = watermark.wait_for_mark_timeout(CommitVersion(100), Duration::from_secs(5));
assert!(reached, "Should have processed all 100 versions");
let done_until = watermark.done_until();
let very_old = done_until.0.saturating_sub(OLD_VERSION_THRESHOLD + 10);
let clock = Clock::Real;
let start = clock.instant();
watermark.wait_for_mark(very_old);
let elapsed = start.elapsed();
assert!(elapsed.as_millis() < 10, "Old version wait should return immediately");
});
}
#[test]
fn test_timeout_behavior() {
init_and_close(|watermark| {
watermark.register_in_flight(CommitVersion(1));
let clock = Clock::Real;
let start = clock.instant();
let result = watermark.wait_for_mark_timeout(CommitVersion(1), Duration::from_millis(100));
let elapsed = start.elapsed();
assert!(!result, "Should timeout waiting for uncompleted version");
assert!(
elapsed.as_millis() >= 100 && elapsed.as_millis() < 200,
"Should respect timeout duration"
);
});
}
#[test]
fn test_out_of_order_begin() {
init_and_close(|watermark| {
watermark.register_in_flight(CommitVersion(3));
watermark.register_in_flight(CommitVersion(1));
watermark.register_in_flight(CommitVersion(2));
watermark.mark_finished(CommitVersion(1));
watermark.mark_finished(CommitVersion(2));
watermark.mark_finished(CommitVersion(3));
let reached = watermark.wait_for_mark_timeout(CommitVersion(3), Duration::from_secs(5));
assert!(reached, "Timed out waiting for watermark to advance to 3");
let done = watermark.done_until();
assert_eq!(done.0, 3, "Watermark should advance to 3, got {}", done.0);
});
}
#[test]
fn test_orphaned_done_before_begin() {
init_and_close(|watermark| {
watermark.mark_finished(CommitVersion(1));
sleep(Duration::from_millis(20));
assert_eq!(watermark.done_until().0, 0);
watermark.register_in_flight(CommitVersion(1));
sleep(Duration::from_millis(50));
let done = watermark.done_until();
assert_eq!(done.0, 1, "Watermark should advance to 1 after begin, got {}", done.0);
});
}
#[test]
fn test_mixed_out_of_order() {
init_and_close(|watermark| {
watermark.register_in_flight(CommitVersion(2));
watermark.mark_finished(CommitVersion(3)); watermark.register_in_flight(CommitVersion(1));
watermark.mark_finished(CommitVersion(1));
watermark.register_in_flight(CommitVersion(3));
watermark.mark_finished(CommitVersion(2));
let reached = watermark.wait_for_mark_timeout(CommitVersion(3), Duration::from_secs(5));
assert!(reached, "Timed out waiting for watermark to advance to 3");
let done = watermark.done_until();
assert_eq!(done.0, 3, "Watermark should advance to 3, got {}", done.0);
});
}
fn init_and_close<F>(f: F)
where
F: FnOnce(Arc<WaterMark>),
{
let system = ActorSystem::new(Pools::default(), Clock::Real);
let watermark = Arc::new(WaterMark::new("watermark".into(), &system));
f(watermark);
sleep(Duration::from_millis(10));
system.shutdown();
sleep(Duration::from_millis(150)); }
}