ic_bn_lib/tasks/
mod.rs

1use std::{fmt::Display, sync::Arc, time::Duration};
2
3use async_trait::async_trait;
4use derive_new::new;
5use ic_bn_lib_common::traits::Run;
6use tokio_util::{sync::CancellationToken, task::TaskTracker};
7use tracing::{error, warn};
8
9#[derive(Clone)]
10struct Task(String, Arc<dyn Run>);
11
12impl Display for Task {
13    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
14        write!(f, "{}", self.0)
15    }
16}
17
18/// Runs given task periodically
19struct IntervalRunner(Duration, Task);
20
21#[async_trait]
22impl Run for IntervalRunner {
23    async fn run(&self, token: CancellationToken) -> Result<(), anyhow::Error> {
24        warn!(
25            "Task '{}': running with interval {}s",
26            self.1,
27            self.0.as_secs()
28        );
29
30        let mut interval = tokio::time::interval(self.0);
31        interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
32
33        loop {
34            tokio::select! {
35                biased;
36
37                () = token.cancelled() => {
38                    warn!("Task '{}': stopped", self.1);
39                    return Ok(());
40                },
41
42                _ = interval.tick() => {
43                    if let Err(e) = self.1.1.run(token.child_token()).await {
44                        warn!("Task '{}': {e:#}", self.1);
45                    }
46                }
47            }
48        }
49    }
50}
51
52/// Starts & tracks Tasks that implement Run
53#[derive(new)]
54pub struct TaskManager {
55    #[new(default)]
56    tracker: TaskTracker,
57    #[new(default)]
58    tasks: Vec<Task>,
59    #[new(default)]
60    token: CancellationToken,
61}
62
63impl TaskManager {
64    /// Add a task to run only once.
65    /// It needs to implement its own internal repeat logic if need be.
66    pub fn add(&mut self, name: &str, task: Arc<dyn Run>) {
67        self.tasks.push(Task(name.into(), task));
68    }
69
70    /// Add a task to run with a given interval.
71    /// Errors are printed with a WARN level and then ignored.
72    pub fn add_interval(&mut self, name: &str, task: Arc<dyn Run>, interval: Duration) {
73        let runner = IntervalRunner(interval, Task(name.into(), task));
74        self.tasks.push(Task(name.into(), Arc::new(runner)));
75    }
76
77    /// Start the tasks
78    pub fn start(&self) {
79        warn!("TaskManager: starting {} tasks", self.tasks.len());
80
81        for task in self.tasks.clone() {
82            let token = self.token.child_token();
83            self.tracker.spawn(async move {
84                if let Err(e) = task.1.run(token).await {
85                    error!("TaskManager: task '{}' exited with an error: {e:#}", task.0);
86                }
87            });
88        }
89    }
90
91    /// Signal the tasks to stop and wait until they do.
92    /// If one or more tasks aren't acting on the token cancellation signal then this will hang forever.
93    pub async fn stop(&self) {
94        warn!("TaskManager: stopping {} tasks", self.tasks.len());
95        self.token.cancel();
96        self.tracker.close();
97        self.tracker.wait().await;
98    }
99
100    /// Return a cancellation token that can be used to signal external tasks when `TaskManager` is stopping.
101    pub fn token(&self) -> CancellationToken {
102        self.token.child_token()
103    }
104}