next_web_dev/manager/
job_scheduler_manager.rs1use std::sync::Arc;
2
3use flume::Sender;
4use hashbrown::HashSet;
5use tokio::sync::Mutex;
6use tokio_cron_scheduler::{Job, JobScheduler};
7use tracing::info;
8
9#[derive(Clone)]
10pub struct JobSchedulerManager {
11 ids: Arc<Mutex<HashSet<Vec<u8>>>>,
12 jobs: Arc<Mutex<Vec<Job>>>,
13 tx: Option<Sender<SchedulerEvent>>,
14}
15
16impl JobSchedulerManager {
17 pub fn new() -> Self {
18 Self {
19 ids: Arc::new(Mutex::new(HashSet::new())),
20 jobs: Arc::new(Mutex::new(Vec::new())),
21 tx: None,
22 }
23 }
24
25 pub async fn add_job(&self, job: Job) {
26 if let Some(sender) = &self.tx {
27 sender.send_async(SchedulerEvent::AddJob(job)).await.ok();
28 }
29 }
30
31 pub async fn remove_job(&self, guid: Vec<u8>) {
32 if let Some(sender) = &self.tx {
33 sender
34 .send_async(SchedulerEvent::RemoveJob(guid))
35 .await
36 .ok();
37 }
38 }
39
40 pub async fn check_job_exists(&self, guid: Vec<u8>) -> bool {
41 self.ids.lock().await.contains(&guid)
42 }
43
44 pub fn start(&mut self) {
45 let jobs = self.jobs.clone();
46 let ids = self.ids.clone();
47 let (tx, rx) = flume::unbounded();
48 self.tx = Some(tx);
49
50 tokio::spawn(async move {
51 let mut scheduler = JobScheduler::new().await.unwrap();
52 let jobs = jobs.lock().await;
53 for job in jobs.iter() {
54 let job_id = scheduler.add(job.clone()).await.unwrap();
55 let _ = ids
56 .try_lock()
57 .map(|mut ids| ids.insert(job_id.as_bytes().to_vec()))
58 .map(|_| info!("Job: {} added successfully!", job_id));
59 }
60
61 scheduler.set_shutdown_handler(Box::new(|| {
63 Box::pin(async move {
64 println!("Shut down done");
65 })
66 }));
67
68 scheduler.start().await.unwrap();
69 tokio::spawn(async move {
72 while let Ok(event) = rx.recv() {
73 match event {
74 SchedulerEvent::AddJob(job) => {
75 let job_id = scheduler.add(job).await.unwrap();
76 let _ = ids
77 .try_lock()
78 .map(|mut ids| ids.insert(job_id.as_bytes().to_vec()));
79 }
80 SchedulerEvent::RemoveJob(guid) => {
81 if let Ok(uuid) = guid.try_into() {
82 let _ = scheduler
83 .remove(&uuid)
84 .await
85 .map(|_| info!("Job removed successfully;"));
86 };
87 }
88 SchedulerEvent::Shutdown => {
89 scheduler.shutdown().await.unwrap();
90 }
91 }
92 }
93 });
94 });
95 }
96
97 pub async fn job_count(&self) -> usize {
98 self.ids.lock().await.len()
99 }
100}
101
102#[derive(Clone)]
103pub enum SchedulerEvent {
104 AddJob(Job),
105 RemoveJob(Vec<u8>),
106 Shutdown,
107}
108
109pub trait ApplicationJob: Send + Sync {
110 fn gen_job(&self) -> Job;
111}