next_web_dev/manager/
job_scheduler_manager.rs

1use std::sync::Arc;
2
3use flume::Sender;
4use hashbrown::HashSet;
5use tokio::sync::Mutex;
6use tokio_cron_scheduler::{Job, JobScheduler};
7use tracing::info;
8
9#[derive(Clone)]
10pub struct JobSchedulerManager {
11    ids: Arc<Mutex<HashSet<Vec<u8>>>>,
12    jobs: Arc<Mutex<Vec<Job>>>,
13    tx: Option<Sender<SchedulerEvent>>,
14}
15
16impl JobSchedulerManager {
17    pub fn new() -> Self {
18        Self {
19            ids: Arc::new(Mutex::new(HashSet::new())),
20            jobs: Arc::new(Mutex::new(Vec::new())),
21            tx: None,
22        }
23    }
24
25    pub async fn add_job(&self, job: Job) {
26        if let Some(sender) = &self.tx {
27            sender.send_async(SchedulerEvent::AddJob(job)).await.ok();
28        }
29    }
30
31    pub async fn remove_job(&self, guid: Vec<u8>) {
32        if let Some(sender) = &self.tx {
33            sender
34                .send_async(SchedulerEvent::RemoveJob(guid))
35                .await
36                .ok();
37        }
38    }
39
40    pub async fn check_job_exists(&self, guid: Vec<u8>) -> bool {
41        self.ids.lock().await.contains(&guid)
42    }
43
44    pub fn start(&mut self) {
45        let jobs = self.jobs.clone();
46        let ids = self.ids.clone();
47        let (tx, rx) = flume::unbounded();
48        self.tx = Some(tx);
49
50        tokio::spawn(async move {
51            let mut scheduler = JobScheduler::new().await.unwrap();
52            let jobs = jobs.lock().await;
53            for job in jobs.iter() {
54                let job_id = scheduler.add(job.clone()).await.unwrap();
55                let _ = ids
56                    .try_lock()
57                    .map(|mut ids| ids.insert(job_id.as_bytes().to_vec()))
58                    .map(|_| info!("Job: {} added successfully!", job_id));
59            }
60
61            // Add code to be run during/after shutdown
62            scheduler.set_shutdown_handler(Box::new(|| {
63                Box::pin(async move {
64                    println!("Shut down done");
65                })
66            }));
67
68            scheduler.start().await.unwrap();
69            // spawn a task to listen for job removal
70
71            tokio::spawn(async move {
72                while let Ok(event) = rx.recv() {
73                    match event {
74                        SchedulerEvent::AddJob(job) => {
75                            let job_id = scheduler.add(job).await.unwrap();
76                            let _ = ids
77                                .try_lock()
78                                .map(|mut ids| ids.insert(job_id.as_bytes().to_vec()));
79                        }
80                        SchedulerEvent::RemoveJob(guid) => {
81                            if let Ok(uuid) = guid.try_into() {
82                                let _ = scheduler
83                                    .remove(&uuid)
84                                    .await
85                                    .map(|_| info!("Job removed successfully;"));
86                            };
87                        }
88                        SchedulerEvent::Shutdown => {
89                            scheduler.shutdown().await.unwrap();
90                        }
91                    }
92                }
93            });
94        });
95    }
96
97    pub async fn job_count(&self) -> usize {
98        self.ids.lock().await.len()
99    }
100}
101
102#[derive(Clone)]
103pub enum SchedulerEvent {
104    AddJob(Job),
105    RemoveJob(Vec<u8>),
106    Shutdown,
107}
108
109pub trait ApplicationJob: Send + Sync {
110    fn gen_job(&self) -> Job;
111}