1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
//! [spring-job](https://spring-rs.github.io/docs/plugins/spring-job/)

pub mod extractor;
pub mod handler;
pub mod job;

/////////////////job-macros/////////////////////
/// To use these Procedural Macros, you need to add `spring-job` dependency
pub use spring_macros::cron;
pub use spring_macros::fix_delay;
pub use spring_macros::fix_rate;
pub use spring_macros::one_shot;

use anyhow::Context;
use job::Job;
use spring::async_trait;
use spring::error::Result;
use spring::{
    app::{App, AppBuilder},
    plugin::Plugin,
};
use std::ops::Deref;
use std::sync::Arc;
use uuid::Uuid;

#[derive(Clone, Default)]
pub struct Jobs(Vec<Job>);

impl Jobs {
    pub fn new() -> Self {
        Self::default()
    }
    fn single(job: Job) -> Self {
        Self(vec![job])
    }

    pub fn add_job(mut self, job: Job) -> Self {
        self.0.push(job);
        self
    }

    pub fn add_jobs(mut self, jobs: Jobs) -> Self {
        for job in jobs.0 {
            self.0.push(job);
        }
        self
    }

    fn merge(&mut self, jobs: Jobs) {
        for job in jobs.0 {
            self.0.push(job);
        }
    }
}

impl Deref for Jobs {
    type Target = Vec<Job>;

    fn deref(&self) -> &Self::Target {
        &self.0
    }
}

pub type JobId = Uuid;
pub type JobScheduler = tokio_cron_scheduler::JobScheduler;

pub trait JobConfigurator {
    fn add_job(&mut self, job: Job) -> &mut Self;
    fn add_jobs(&mut self, job: Jobs) -> &mut Self;
}

impl JobConfigurator for AppBuilder {
    fn add_job(&mut self, job: Job) -> &mut Self {
        if let Some(jobs) = self.get_component::<Jobs>() {
            unsafe {
                let raw_ptr = Arc::into_raw(jobs);
                let jobs = &mut *(raw_ptr as *mut Vec<Job>);
                jobs.push(job);
            }
            self
        } else {
            self.add_component(Jobs::single(job))
        }
    }

    fn add_jobs(&mut self, new_jobs: Jobs) -> &mut Self {
        if let Some(jobs) = self.get_component::<Jobs>() {
            unsafe {
                let raw_ptr = Arc::into_raw(jobs);
                let jobs = &mut *(raw_ptr as *mut Jobs);
                jobs.merge(new_jobs);
            }
            self
        } else {
            self.add_component(new_jobs)
        }
    }
}

pub struct JobPlugin;

#[async_trait]
impl Plugin for JobPlugin {
    async fn build(&self, app: &mut AppBuilder) {
        app.add_scheduler(|app: Arc<App>| Box::new(Self::schedule(app)));
    }
}

impl JobPlugin {
    async fn schedule(app: Arc<App>) -> Result<String> {
        let jobs = app.get_component::<Jobs>();

        let jobs = match jobs {
            None => {
                let msg = "No tasks are registered, so the task scheduler does not start.";
                tracing::info!(msg);
                return Ok(msg.to_string());
            }
            Some(jobs) => jobs,
        };

        let mut sched = JobScheduler::new().await.context("job init failed")?;

        for job in jobs.deref().iter() {
            sched
                .add(job.to_owned().build(app.clone()))
                .await
                .context("add job failed")?;
        }

        sched.shutdown_on_ctrl_c();

        // Add code to be run during/after shutdown
        sched.set_shutdown_handler(Box::new(|| {
            Box::pin(async move {
                tracing::info!("Shut down done");
            })
        }));

        // Start the scheduler
        sched.start().await.context("job scheduler start failed")?;

        Ok("job schedule finished".to_string())
    }
}