endpoint_libs/libs/
scheduler.rs

1use eyre::*;
2use futures::future::BoxFuture;
3use std::future::Future;
4use std::sync::Arc;
5use std::sync::RwLock;
6use std::time::Duration;
7use tokio_cron_scheduler::{Job, JobScheduler};
8
9pub struct AdaptiveJob {
10    duration: Arc<RwLock<Duration>>,
11    task: Box<dyn Fn() -> BoxFuture<'static, ()> + Send + Sync>,
12}
13
14impl AdaptiveJob {
15    pub fn new<F>(duration: Duration, task: F) -> Self
16    where
17        F: Fn() -> BoxFuture<'static, ()> + Send + Sync + 'static,
18    {
19        Self {
20            duration: Arc::new(RwLock::new(duration)),
21            task: Box::new(task),
22        }
23    }
24    pub fn set_duration(&self, duration: Duration) {
25        *self.duration.write().unwrap() = duration;
26    }
27    pub fn get_trigger(&self) -> JobTrigger {
28        JobTrigger {
29            duration: self.duration.clone(),
30        }
31    }
32    pub async fn run(self) {
33        loop {
34            let duration = *self.duration.read().unwrap();
35            tokio::time::sleep(duration).await;
36            let task = (self.task)();
37            tokio::spawn(task);
38        }
39    }
40}
41#[derive(Clone)]
42pub struct JobTrigger {
43    duration: Arc<RwLock<Duration>>,
44}
45impl JobTrigger {
46    pub fn new(duration: Arc<RwLock<Duration>>) -> Self {
47        Self { duration }
48    }
49    pub fn set_duration(&self, duration: Duration) {
50        *self.duration.write().unwrap() = duration;
51    }
52}
53pub struct Scheduler {
54    scheduler: JobScheduler,
55    pending_jobs: Vec<AdaptiveJob>,
56}
57
58impl Scheduler {
59    pub async fn new() -> Self {
60        Self {
61            scheduler: JobScheduler::new().await.unwrap(),
62            pending_jobs: vec![],
63        }
64    }
65    pub async fn add_job<F, Fut>(&mut self, duration: Duration, f: F) -> Result<()>
66    where
67        F: Fn() -> Fut + Send + Sync + 'static,
68        Fut: Future + Send + 'static,
69    {
70        let job = Job::new_repeated_async(duration, move |_, _| {
71            let fut = f();
72            Box::pin(async move {
73                fut.await;
74            })
75        })
76        .unwrap();
77
78        self.scheduler
79            .add(job)
80            .await
81            .map_err(|x| eyre!("{:?}", x))?;
82        Ok(())
83    }
84    pub fn add_adaptive_job<F, Fut>(&mut self, duration: Duration, f: F) -> Result<JobTrigger>
85    where
86        F: Fn() -> Fut + Send + Sync + 'static,
87        Fut: Future + Send + 'static,
88    {
89        let job = AdaptiveJob::new(duration, move || {
90            let fut = f();
91            Box::pin(async move {
92                fut.await;
93            })
94        });
95        let trigger = job.get_trigger();
96        self.pending_jobs.push(job);
97        Ok(trigger)
98    }
99    pub async fn spawn(mut self) {
100        for job in self.pending_jobs.drain(..) {
101            tokio::task::spawn(job.run());
102        }
103        self.scheduler.start().await.unwrap();
104    }
105}