use std::{
future::Future,
sync::{
atomic::{AtomicBool, Ordering},
Arc,
},
time::Duration,
};
use tokio::{
sync::mpsc::{channel, Sender},
time,
};
#[derive(Debug)]
enum Msg {
UpdateTerm(Duration),
Stop,
}
#[derive(Debug, Clone, Default)]
pub struct Timeout {
tx: Option<Sender<Msg>>,
running: Arc<AtomicBool>,
}
impl Timeout {
pub async fn set<T>(
mut timeout: Duration,
f: impl Future<Output = T> + Send + 'static,
) -> Self {
let (tx, mut rx) = channel(1);
let running = Arc::new(AtomicBool::new(true));
let running_inner = Arc::clone(&running);
tokio::spawn(async move {
loop {
tokio::select! {
_ = time::sleep(timeout) => {
f.await;
running_inner.store(false, Ordering::Release);
break;
},
Some(msg) = rx.recv() => {
match msg {
Msg::UpdateTerm(duration) => {
timeout = duration;
continue;
},
Msg::Stop => break,
}
},
else => {
running_inner.store(false, Ordering::Release);
break
},
}
}
});
Timeout {
tx: Some(tx),
running,
}
}
pub async fn reset(&self, timeout: Duration) -> std::io::Result<()> {
if let Some(timer) = self.tx.as_ref() {
if let false = self.finished() {
let _ = timer.send(Msg::UpdateTerm(timeout)).await;
return Ok(());
}
}
Err(std::io::Error::other(
"Could not reset timeout: it is not running",
))
}
pub async fn restart<T>(
&mut self,
timeout: Duration,
f: impl Future<Output = T> + Send + 'static,
) {
self.stop().await;
self.tx = Timeout::set(timeout, f).await.tx;
self.running.store(true, Ordering::Release);
}
pub async fn stop(&mut self) {
if let Some(timer) = self.tx.as_ref() {
let _ = timer.send(Msg::Stop).await;
self.running.store(false, Ordering::Release);
}
}
pub fn finished(&self) -> bool {
match self.tx.as_ref().is_none() {
true => true,
false => match self.running.load(Ordering::Acquire) {
true => false,
false => true,
},
}
}
}