1use std::{fmt::Display, sync::Arc, time::Duration};
2
3use async_trait::async_trait;
4use derive_new::new;
5use ic_bn_lib_common::traits::Run;
6use tokio_util::{sync::CancellationToken, task::TaskTracker};
7use tracing::{error, warn};
8
9#[derive(Clone)]
10struct Task(String, Arc<dyn Run>);
11
12impl Display for Task {
13 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
14 write!(f, "{}", self.0)
15 }
16}
17
18struct IntervalRunner(Duration, Task);
20
21#[async_trait]
22impl Run for IntervalRunner {
23 async fn run(&self, token: CancellationToken) -> Result<(), anyhow::Error> {
24 warn!(
25 "Task '{}': running with interval {}s",
26 self.1,
27 self.0.as_secs()
28 );
29
30 let mut interval = tokio::time::interval(self.0);
31 interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
32
33 loop {
34 tokio::select! {
35 biased;
36
37 () = token.cancelled() => {
38 warn!("Task '{}': stopped", self.1);
39 return Ok(());
40 },
41
42 _ = interval.tick() => {
43 if let Err(e) = self.1.1.run(token.child_token()).await {
44 warn!("Task '{}': {e:#}", self.1);
45 }
46 }
47 }
48 }
49 }
50}
51
52#[derive(new)]
54pub struct TaskManager {
55 #[new(default)]
56 tracker: TaskTracker,
57 #[new(default)]
58 tasks: Vec<Task>,
59 #[new(default)]
60 token: CancellationToken,
61}
62
63impl TaskManager {
64 pub fn add(&mut self, name: &str, task: Arc<dyn Run>) {
67 self.tasks.push(Task(name.into(), task));
68 }
69
70 pub fn add_interval(&mut self, name: &str, task: Arc<dyn Run>, interval: Duration) {
73 let runner = IntervalRunner(interval, Task(name.into(), task));
74 self.tasks.push(Task(name.into(), Arc::new(runner)));
75 }
76
77 pub fn start(&self) {
79 warn!("TaskManager: starting {} tasks", self.tasks.len());
80
81 for task in self.tasks.clone() {
82 let token = self.token.child_token();
83 self.tracker.spawn(async move {
84 if let Err(e) = task.1.run(token).await {
85 error!("TaskManager: task '{}' exited with an error: {e:#}", task.0);
86 }
87 });
88 }
89 }
90
91 pub async fn stop(&self) {
94 warn!("TaskManager: stopping {} tasks", self.tasks.len());
95 self.token.cancel();
96 self.tracker.close();
97 self.tracker.wait().await;
98 }
99
100 pub fn token(&self) -> CancellationToken {
102 self.token.child_token()
103 }
104}