1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
#![crate_name = "apalis_cron"]
#![warn(
    missing_debug_implementations,
    missing_docs,
    rust_2018_idioms,
    unreachable_pub
)]
#![cfg_attr(docsrs, feature(doc_cfg))]

//! # apalis-cron
//! A simple yet extensible library for cron-like job scheduling for rust.
//! Since `apalis-cron` is build on top of `apalis` which supports tower middlerware, you should be able to easily
//! add middleware such as tracing, retries, load shed, concurrency etc.
//!
//! ## Example
//!
//! ```rust,no_run
//! # use apalis_core::context::JobContext;
//! # use apalis_core::layers::retry::RetryLayer;
//! # use apalis_core::layers::retry::DefaultRetryPolicy;
//! # use apalis_core::layers::extensions::Extension;
//! # use apalis_core::job_fn::job_fn;
//! # use apalis_core::job::Job;
//! use tower::ServiceBuilder;
//! use apalis_cron::Schedule;
//! use std::str::FromStr;
//! # use apalis_core::monitor::Monitor;
//! # use apalis_core::builder::WorkerBuilder;
//! # use apalis_core::builder::WorkerFactory;
//! use apalis_cron::CronStream;
//! use chrono::{DateTime, Utc};
//!
//! #[derive(Clone)]
//! struct FakeService;
//! impl FakeService {
//!     fn execute(&self, item: Reminder){}
//! }
//!
//! #[derive(Default, Debug, Clone)]
//! struct Reminder(DateTime<Utc>);
//! impl From<DateTime<Utc>> for Reminder {
//!    fn from(t: DateTime<Utc>) -> Self {
//!        Reminder(t)
//!    }
//! }
//! impl Job for Reminder {
//!     const NAME: &'static str = "reminder::DailyReminder";
//! }
//! async fn send_reminder(job: Reminder, ctx: JobContext) {
//!     let svc = ctx.data_opt::<FakeService>().unwrap();
//!     svc.execute(job);
//! }
//!
//! #[tokio::main]
//! async fn main() {
//!     let schedule = Schedule::from_str("@daily").unwrap();
//!     let service = ServiceBuilder::new()
//!         .layer(RetryLayer::new(DefaultRetryPolicy))
//!         .layer(Extension(FakeService))
//!         .service(job_fn(send_reminder));
//!     let worker = WorkerBuilder::new("morning-cereal")
//!         .stream(CronStream::new(schedule).to_stream())
//!         .build(service);
//!     Monitor::new()
//!         .register(worker)
//!         .run()
//!         .await
//!         .unwrap();
//! }
//! ```

use apalis_core::job::Job;
use apalis_core::utils::Timer;
use apalis_core::{error::JobError, request::JobRequest};
use chrono::{DateTime, Utc};
pub use cron::Schedule;
use futures::stream::BoxStream;
use futures::StreamExt;
use std::marker::PhantomData;

/// Represents a stream from a cron schedule
#[derive(Clone, Debug)]
pub struct CronStream<J, T>(Schedule, PhantomData<J>, T);

impl<J: From<DateTime<Utc>> + Job + Send + 'static> CronStream<J, ()> {
    /// Build a new cron stream from a schedule
    pub fn new(schedule: Schedule) -> Self {
        Self(schedule, PhantomData, ())
    }

    /// Set a custom timer
    pub fn timer<NT: Timer>(self, timer: NT) -> CronStream<J, NT> {
        CronStream(self.0, PhantomData, timer)
    }
}

impl<J: From<DateTime<Utc>> + Job + Send + 'static, T: Timer + Sync + Send + 'static>
    CronStream<J, T>
{
    /// Convert to consumable
    pub fn to_stream(self) -> BoxStream<'static, Result<Option<JobRequest<J>>, JobError>> {
        let stream = async_stream::stream! {
            let mut schedule = self.0.upcoming_owned(Utc);
            loop {
                let next = schedule.next();
                match next {
                    Some(next) => {
                        let to_sleep = next - chrono::Utc::now();
                        let to_sleep = to_sleep.to_std().map_err(|e| JobError::Failed(e.into()))?;
                        self.2.sleep(to_sleep).await;
                        yield Ok(Some(JobRequest::new(J::from(chrono::Utc::now()))));
                    },
                    None => {
                        yield Ok(None);
                    }
                }

            }
        };
        stream.boxed()
    }
}