serai_task/
lib.rs

1#![cfg_attr(docsrs, feature(doc_auto_cfg))]
2#![doc = include_str!("../README.md")]
3#![deny(missing_docs)]
4
5use core::{future::Future, time::Duration};
6use std::sync::Arc;
7
8use tokio::sync::{mpsc, oneshot, Mutex};
9
10enum Closed {
11  NotClosed(Option<oneshot::Receiver<()>>),
12  Closed,
13}
14
15/// A handle for a task.
16#[derive(Clone)]
17pub struct TaskHandle {
18  run_now: mpsc::Sender<()>,
19  close: mpsc::Sender<()>,
20  closed: Arc<Mutex<Closed>>,
21}
22/// A task's internal structures.
23pub struct Task {
24  run_now: mpsc::Receiver<()>,
25  close: mpsc::Receiver<()>,
26  closed: oneshot::Sender<()>,
27}
28
29impl Task {
30  /// Create a new task definition.
31  pub fn new() -> (Self, TaskHandle) {
32    // Uses a capacity of 1 as any call to run as soon as possible satisfies all calls to run as
33    // soon as possible
34    let (run_now_send, run_now_recv) = mpsc::channel(1);
35    // And any call to close satisfies all calls to close
36    let (close_send, close_recv) = mpsc::channel(1);
37    let (closed_send, closed_recv) = oneshot::channel();
38    (
39      Self { run_now: run_now_recv, close: close_recv, closed: closed_send },
40      TaskHandle {
41        run_now: run_now_send,
42        close: close_send,
43        closed: Arc::new(Mutex::new(Closed::NotClosed(Some(closed_recv)))),
44      },
45    )
46  }
47}
48
49impl TaskHandle {
50  /// Tell the task to run now (and not whenever its next iteration on a timer is).
51  ///
52  /// Panics if the task has been dropped.
53  pub fn run_now(&self) {
54    #[allow(clippy::match_same_arms)]
55    match self.run_now.try_send(()) {
56      Ok(()) => {}
57      // NOP on full, as this task will already be ran as soon as possible
58      Err(mpsc::error::TrySendError::Full(())) => {}
59      Err(mpsc::error::TrySendError::Closed(())) => {
60        panic!("task was unexpectedly closed when calling run_now")
61      }
62    }
63  }
64
65  /// Close the task.
66  ///
67  /// Returns once the task shuts down after it finishes its current iteration (which may be of
68  /// unbounded time).
69  pub async fn close(self) {
70    // If another instance of the handle called tfhis, don't error
71    let _ = self.close.send(()).await;
72    // Wait until we receive the closed message
73    let mut closed = self.closed.lock().await;
74    match &mut *closed {
75      Closed::NotClosed(ref mut recv) => {
76        assert_eq!(recv.take().unwrap().await, Ok(()), "continually ran task dropped itself?");
77        *closed = Closed::Closed;
78      }
79      Closed::Closed => {}
80    }
81  }
82}
83
84/// A task to be continually ran.
85pub trait ContinuallyRan: Sized + Send {
86  /// The amount of seconds before this task should be polled again.
87  const DELAY_BETWEEN_ITERATIONS: u64 = 5;
88  /// The maximum amount of seconds before this task should be run again.
89  ///
90  /// Upon error, the amount of time waited will be linearly increased until this limit.
91  const MAX_DELAY_BETWEEN_ITERATIONS: u64 = 120;
92
93  /// Run an iteration of the task.
94  ///
95  /// If this returns `true`, all dependents of the task will immediately have a new iteration ran
96  /// (without waiting for whatever timer they were already on).
97  fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, String>>;
98
99  /// Continually run the task.
100  fn continually_run(
101    mut self,
102    mut task: Task,
103    dependents: Vec<TaskHandle>,
104  ) -> impl Send + Future<Output = ()> {
105    async move {
106      // The default number of seconds to sleep before running the task again
107      let default_sleep_before_next_task = Self::DELAY_BETWEEN_ITERATIONS;
108      // The current number of seconds to sleep before running the task again
109      // We increment this upon errors in order to not flood the logs with errors
110      let mut current_sleep_before_next_task = default_sleep_before_next_task;
111      let increase_sleep_before_next_task = |current_sleep_before_next_task: &mut u64| {
112        let new_sleep = *current_sleep_before_next_task + default_sleep_before_next_task;
113        // Set a limit of sleeping for two minutes
114        *current_sleep_before_next_task = new_sleep.max(Self::MAX_DELAY_BETWEEN_ITERATIONS);
115      };
116
117      loop {
118        // If we were told to close/all handles were dropped, drop it
119        {
120          let should_close = task.close.try_recv();
121          match should_close {
122            Ok(()) | Err(mpsc::error::TryRecvError::Disconnected) => break,
123            Err(mpsc::error::TryRecvError::Empty) => {}
124          }
125        }
126
127        match self.run_iteration().await {
128          Ok(run_dependents) => {
129            // Upon a successful (error-free) loop iteration, reset the amount of time we sleep
130            current_sleep_before_next_task = default_sleep_before_next_task;
131
132            if run_dependents {
133              for dependent in &dependents {
134                dependent.run_now();
135              }
136            }
137          }
138          Err(e) => {
139            log::warn!("{}", e);
140            increase_sleep_before_next_task(&mut current_sleep_before_next_task);
141          }
142        }
143
144        // Don't run the task again for another few seconds UNLESS told to run now
145        tokio::select! {
146          () = tokio::time::sleep(Duration::from_secs(current_sleep_before_next_task)) => {},
147          msg = task.run_now.recv() => {
148            // Check if this is firing because the handle was dropped
149            if msg.is_none() {
150              break;
151            }
152          },
153        }
154      }
155
156      task.closed.send(()).unwrap();
157    }
158  }
159}