endpoint_libs/libs/
scheduler.rs1use eyre::*;
2use futures::future::BoxFuture;
3use std::future::Future;
4use std::sync::Arc;
5use std::sync::RwLock;
6use std::time::Duration;
7use tokio_cron_scheduler::{Job, JobScheduler};
8
9pub struct AdaptiveJob {
10 duration: Arc<RwLock<Duration>>,
11 task: Box<dyn Fn() -> BoxFuture<'static, ()> + Send + Sync>,
12}
13
14impl AdaptiveJob {
15 pub fn new<F>(duration: Duration, task: F) -> Self
16 where
17 F: Fn() -> BoxFuture<'static, ()> + Send + Sync + 'static,
18 {
19 Self {
20 duration: Arc::new(RwLock::new(duration)),
21 task: Box::new(task),
22 }
23 }
24 pub fn set_duration(&self, duration: Duration) {
25 *self.duration.write().unwrap() = duration;
26 }
27 pub fn get_trigger(&self) -> JobTrigger {
28 JobTrigger {
29 duration: self.duration.clone(),
30 }
31 }
32 pub async fn run(self) {
33 loop {
34 let duration = *self.duration.read().unwrap();
35 tokio::time::sleep(duration).await;
36 let task = (self.task)();
37 tokio::spawn(task);
38 }
39 }
40}
41#[derive(Clone)]
42pub struct JobTrigger {
43 duration: Arc<RwLock<Duration>>,
44}
45impl JobTrigger {
46 pub fn new(duration: Arc<RwLock<Duration>>) -> Self {
47 Self { duration }
48 }
49 pub fn set_duration(&self, duration: Duration) {
50 *self.duration.write().unwrap() = duration;
51 }
52}
53pub struct Scheduler {
54 scheduler: JobScheduler,
55 pending_jobs: Vec<AdaptiveJob>,
56}
57
58impl Scheduler {
59 pub async fn new() -> Self {
60 Self {
61 scheduler: JobScheduler::new().await.unwrap(),
62 pending_jobs: vec![],
63 }
64 }
65 pub async fn add_job<F, Fut>(&mut self, duration: Duration, f: F) -> Result<()>
66 where
67 F: Fn() -> Fut + Send + Sync + 'static,
68 Fut: Future + Send + 'static,
69 {
70 let job = Job::new_repeated_async(duration, move |_, _| {
71 let fut = f();
72 Box::pin(async move {
73 fut.await;
74 })
75 })
76 .unwrap();
77
78 self.scheduler
79 .add(job)
80 .await
81 .map_err(|x| eyre!("{:?}", x))?;
82 Ok(())
83 }
84 pub fn add_adaptive_job<F, Fut>(&mut self, duration: Duration, f: F) -> Result<JobTrigger>
85 where
86 F: Fn() -> Fut + Send + Sync + 'static,
87 Fut: Future + Send + 'static,
88 {
89 let job = AdaptiveJob::new(duration, move || {
90 let fut = f();
91 Box::pin(async move {
92 fut.await;
93 })
94 });
95 let trigger = job.get_trigger();
96 self.pending_jobs.push(job);
97 Ok(trigger)
98 }
99 pub async fn spawn(mut self) {
100 for job in self.pending_jobs.drain(..) {
101 tokio::task::spawn(job.run());
102 }
103 self.scheduler.start().await.unwrap();
104 }
105}