1#![cfg_attr(docsrs, feature(doc_auto_cfg))]
2#![doc = include_str!("../README.md")]
3#![deny(missing_docs)]
4
5use core::{future::Future, time::Duration};
6use std::sync::Arc;
7
8use tokio::sync::{mpsc, oneshot, Mutex};
9
10enum Closed {
11 NotClosed(Option<oneshot::Receiver<()>>),
12 Closed,
13}
14
15#[derive(Clone)]
17pub struct TaskHandle {
18 run_now: mpsc::Sender<()>,
19 close: mpsc::Sender<()>,
20 closed: Arc<Mutex<Closed>>,
21}
22pub struct Task {
24 run_now: mpsc::Receiver<()>,
25 close: mpsc::Receiver<()>,
26 closed: oneshot::Sender<()>,
27}
28
29impl Task {
30 pub fn new() -> (Self, TaskHandle) {
32 let (run_now_send, run_now_recv) = mpsc::channel(1);
35 let (close_send, close_recv) = mpsc::channel(1);
37 let (closed_send, closed_recv) = oneshot::channel();
38 (
39 Self { run_now: run_now_recv, close: close_recv, closed: closed_send },
40 TaskHandle {
41 run_now: run_now_send,
42 close: close_send,
43 closed: Arc::new(Mutex::new(Closed::NotClosed(Some(closed_recv)))),
44 },
45 )
46 }
47}
48
49impl TaskHandle {
50 pub fn run_now(&self) {
54 #[allow(clippy::match_same_arms)]
55 match self.run_now.try_send(()) {
56 Ok(()) => {}
57 Err(mpsc::error::TrySendError::Full(())) => {}
59 Err(mpsc::error::TrySendError::Closed(())) => {
60 panic!("task was unexpectedly closed when calling run_now")
61 }
62 }
63 }
64
65 pub async fn close(self) {
70 let _ = self.close.send(()).await;
72 let mut closed = self.closed.lock().await;
74 match &mut *closed {
75 Closed::NotClosed(ref mut recv) => {
76 assert_eq!(recv.take().unwrap().await, Ok(()), "continually ran task dropped itself?");
77 *closed = Closed::Closed;
78 }
79 Closed::Closed => {}
80 }
81 }
82}
83
84pub trait ContinuallyRan: Sized + Send {
86 const DELAY_BETWEEN_ITERATIONS: u64 = 5;
88 const MAX_DELAY_BETWEEN_ITERATIONS: u64 = 120;
92
93 fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, String>>;
98
99 fn continually_run(
101 mut self,
102 mut task: Task,
103 dependents: Vec<TaskHandle>,
104 ) -> impl Send + Future<Output = ()> {
105 async move {
106 let default_sleep_before_next_task = Self::DELAY_BETWEEN_ITERATIONS;
108 let mut current_sleep_before_next_task = default_sleep_before_next_task;
111 let increase_sleep_before_next_task = |current_sleep_before_next_task: &mut u64| {
112 let new_sleep = *current_sleep_before_next_task + default_sleep_before_next_task;
113 *current_sleep_before_next_task = new_sleep.max(Self::MAX_DELAY_BETWEEN_ITERATIONS);
115 };
116
117 loop {
118 {
120 let should_close = task.close.try_recv();
121 match should_close {
122 Ok(()) | Err(mpsc::error::TryRecvError::Disconnected) => break,
123 Err(mpsc::error::TryRecvError::Empty) => {}
124 }
125 }
126
127 match self.run_iteration().await {
128 Ok(run_dependents) => {
129 current_sleep_before_next_task = default_sleep_before_next_task;
131
132 if run_dependents {
133 for dependent in &dependents {
134 dependent.run_now();
135 }
136 }
137 }
138 Err(e) => {
139 log::warn!("{}", e);
140 increase_sleep_before_next_task(&mut current_sleep_before_next_task);
141 }
142 }
143
144 tokio::select! {
146 () = tokio::time::sleep(Duration::from_secs(current_sleep_before_next_task)) => {},
147 msg = task.run_now.recv() => {
148 if msg.is_none() {
150 break;
151 }
152 },
153 }
154 }
155
156 task.closed.send(()).unwrap();
157 }
158 }
159}