use std::{
sync::{
Arc,
atomic::{AtomicBool, AtomicU64, Ordering},
},
time::Duration,
};
use tokio::sync::watch;
#[derive(Debug)]
pub(crate) struct ActivityTracker {
epoch: AtomicU64,
is_active: AtomicBool,
last_commited_epoch: AtomicU64,
is_active_watch: watch::Sender<u64>,
epoch_interval: Duration,
}
impl ActivityTracker {
pub(crate) fn new(epoch_interval: Duration) -> Arc<Self> {
let tracker = Arc::new(Self {
epoch: AtomicU64::new(0), is_active: AtomicBool::new(false),
last_commited_epoch: AtomicU64::new(0),
is_active_watch: watch::Sender::new(0u64),
epoch_interval,
});
tracker.run_is_active_task();
tracker.run_epoch_task();
tracker
}
fn run_epoch_task(self: &Arc<Self>) {
self.epoch.fetch_add(1, Ordering::Relaxed);
let epoch_interval = self.epoch_interval;
let weak = Arc::downgrade(self);
tokio::spawn(async move {
loop {
tokio::time::sleep(epoch_interval).await;
let Some(tracker) = weak.upgrade() else {
break;
};
tracker.epoch.fetch_add(1, Ordering::Relaxed);
}
});
}
fn run_is_active_task(self: &Arc<Self>) {
let mut is_active_watch = self.subscribe();
let sleep_interval = self.epoch_interval / 2;
let weak = Arc::downgrade(self);
tokio::spawn(async move {
loop {
tokio::select! {
val = is_active_watch.changed() => {
if val.is_err() {
break;
}
let Some(tracker) = weak.upgrade() else {
break;
};
tracker.is_active.store(true, Ordering::Release);
}
_ = tokio::time::sleep(sleep_interval) => {
let Some(tracker) = weak.upgrade() else {
break;
};
tracker.is_active.store(false, Ordering::Release);
}
}
}
});
}
#[inline(always)]
pub(crate) fn signal(&self) {
let epoch = self.epoch.load(Ordering::Relaxed);
if self.last_commited_epoch.load(Ordering::Relaxed) < epoch {
let _ = self.is_active_watch.send(epoch);
self.last_commited_epoch.store(epoch, Ordering::Relaxed);
}
}
pub(crate) fn subscribe(&self) -> watch::Receiver<u64> {
self.is_active_watch.subscribe()
}
pub(crate) fn get_is_active(&self) -> bool {
self.is_active.load(Ordering::Acquire)
}
}