use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use tokio::sync::Notify;
static CHECKPOINT_ENABLED: AtomicBool = AtomicBool::new(false);
static EMIT_AT_CHECKPOINT: AtomicBool = AtomicBool::new(false);
static CHECKPOINT_GENERATION: AtomicUsize = AtomicUsize::new(0);
static REACHED_CHECKPOINT: Notify = Notify::const_new();
static ALLOW_CONTINUE: Notify = Notify::const_new();
pub fn enable_checkpoint() {
CHECKPOINT_ENABLED.store(true, Ordering::SeqCst);
CHECKPOINT_GENERATION.fetch_add(1, Ordering::SeqCst);
EMIT_AT_CHECKPOINT.store(false, Ordering::SeqCst);
}
pub fn disable_checkpoint() {
CHECKPOINT_ENABLED.store(false, Ordering::SeqCst);
}
pub async fn wait_at_checkpoint() {
if !CHECKPOINT_ENABLED.load(Ordering::SeqCst) {
return;
}
let my_gen = CHECKPOINT_GENERATION.load(Ordering::SeqCst);
EMIT_AT_CHECKPOINT.store(true, Ordering::SeqCst);
REACHED_CHECKPOINT.notify_one();
ALLOW_CONTINUE.notified().await;
if CHECKPOINT_GENERATION.load(Ordering::SeqCst) == my_gen {
EMIT_AT_CHECKPOINT.store(false, Ordering::SeqCst);
}
}
pub async fn wait_for_emit_at_checkpoint() {
loop {
if EMIT_AT_CHECKPOINT.load(Ordering::SeqCst) {
break;
}
tokio::task::yield_now().await;
}
}
pub fn release_checkpoint() {
CHECKPOINT_ENABLED.store(false, Ordering::SeqCst);
ALLOW_CONTINUE.notify_waiters();
}