#![cfg_attr(docsrs, feature(doc_auto_cfg))]
#![doc = include_str!("../README.md")]
#![deny(missing_docs)]
use core::{future::Future, time::Duration};
use std::sync::Arc;
use tokio::sync::{mpsc, oneshot, Mutex};
enum Closed {
NotClosed(Option<oneshot::Receiver<()>>),
Closed,
}
#[derive(Clone)]
pub struct TaskHandle {
run_now: mpsc::Sender<()>,
close: mpsc::Sender<()>,
closed: Arc<Mutex<Closed>>,
}
pub struct Task {
run_now: mpsc::Receiver<()>,
close: mpsc::Receiver<()>,
closed: oneshot::Sender<()>,
}
impl Task {
pub fn new() -> (Self, TaskHandle) {
let (run_now_send, run_now_recv) = mpsc::channel(1);
let (close_send, close_recv) = mpsc::channel(1);
let (closed_send, closed_recv) = oneshot::channel();
(
Self { run_now: run_now_recv, close: close_recv, closed: closed_send },
TaskHandle {
run_now: run_now_send,
close: close_send,
closed: Arc::new(Mutex::new(Closed::NotClosed(Some(closed_recv)))),
},
)
}
}
impl TaskHandle {
pub fn run_now(&self) {
#[allow(clippy::match_same_arms)]
match self.run_now.try_send(()) {
Ok(()) => {}
Err(mpsc::error::TrySendError::Full(())) => {}
Err(mpsc::error::TrySendError::Closed(())) => {
panic!("task was unexpectedly closed when calling run_now")
}
}
}
pub async fn close(self) {
let _ = self.close.send(()).await;
let mut closed = self.closed.lock().await;
match &mut *closed {
Closed::NotClosed(ref mut recv) => {
assert_eq!(recv.take().unwrap().await, Ok(()), "continually ran task dropped itself?");
*closed = Closed::Closed;
}
Closed::Closed => {}
}
}
}
pub trait ContinuallyRan: Sized + Send {
const DELAY_BETWEEN_ITERATIONS: u64 = 5;
const MAX_DELAY_BETWEEN_ITERATIONS: u64 = 120;
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, String>>;
fn continually_run(
mut self,
mut task: Task,
dependents: Vec<TaskHandle>,
) -> impl Send + Future<Output = ()> {
async move {
let default_sleep_before_next_task = Self::DELAY_BETWEEN_ITERATIONS;
let mut current_sleep_before_next_task = default_sleep_before_next_task;
let increase_sleep_before_next_task = |current_sleep_before_next_task: &mut u64| {
let new_sleep = *current_sleep_before_next_task + default_sleep_before_next_task;
*current_sleep_before_next_task = new_sleep.max(Self::MAX_DELAY_BETWEEN_ITERATIONS);
};
loop {
{
let should_close = task.close.try_recv();
match should_close {
Ok(()) | Err(mpsc::error::TryRecvError::Disconnected) => break,
Err(mpsc::error::TryRecvError::Empty) => {}
}
}
match self.run_iteration().await {
Ok(run_dependents) => {
current_sleep_before_next_task = default_sleep_before_next_task;
if run_dependents {
for dependent in &dependents {
dependent.run_now();
}
}
}
Err(e) => {
log::warn!("{}", e);
increase_sleep_before_next_task(&mut current_sleep_before_next_task);
}
}
tokio::select! {
() = tokio::time::sleep(Duration::from_secs(current_sleep_before_next_task)) => {},
msg = task.run_now.recv() => {
if msg.is_none() {
break;
}
},
}
}
task.closed.send(()).unwrap();
}
}
}