1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
use std::{future::Future, marker::PhantomData, pin::Pin, task::Poll};

use crate::AsyncJob;
use crate::Interval;
use crate::{
    async_job::JobFuture,
    timeprovider::{ChronoTimeProvider, TimeProvider},
    Job,
};

/// An asynchronous job scheduler, for use with `Future`s.
///
/// The asynchronous scheduler works almost identically to the [synchronous one](crate::Scheduler), except that
/// instead of taking functions or closures returning `()`, it takes functions or closures returning values implementing `Future<Output = ()>`.
///
/// Unlike the synchronous version, there is no [`watch_thread`](crate::Scheduler::watch_thread) method, as it would tie
/// this crate to a specific runtime, and also because it's trivial to implement by hand. For example, using tokio:
///
/// ```no_run
/// # use clokwerk::*;
/// # use std::time::Duration;
/// # let mut scheduler = AsyncScheduler::new();
/// tokio::spawn(async move {
///   loop {
///     scheduler.run_pending().await;
///     tokio::time::sleep(Duration::from_millis(100)).await;
///   }
/// });
/// ```
/// For async_std:
/// ```no_run
/// # use clokwerk::*;
/// # use std::time::Duration;
/// # let mut scheduler = AsyncScheduler::new();
/// async_std::task::spawn(async move {
///   loop {
///     scheduler.run_pending().await;
///     async_std::task::sleep(Duration::from_millis(100)).await;
///   }
/// });
/// ```
/// ### Usage examples
/// The examples below are intended to demonstrate how to work with various types of Future.
/// See [synchronous examples](crate::Scheduler) for more examples of how to schedule tasks.
///
/// ```rust
/// // Scheduler, trait for .seconds(), .minutes(), etc., and trait with job scheduling methods
/// use clokwerk::{AsyncScheduler, TimeUnits, Job};
/// // Import week days and WeekDay
/// use clokwerk::Interval::*;
/// use std::time::Duration;
/// # use std::future::Future;
/// # use std::pin::Pin;
/// # async fn some_async_fn() {}
/// # fn returns_boxed_future() -> Box<dyn Future<Output=()> + Send> { Box::new(some_async_fn()) }
/// # fn returns_pinned_boxed_future() -> Pin<Box<dyn Future<Output=()> + Send>> { Box::pin(some_async_fn()) }
///
/// // Create a new scheduler
/// let mut scheduler = AsyncScheduler::new();
/// // Add some tasks to it
/// scheduler
///     .every(10.minutes())
///         .plus(30.seconds())
///     .run(|| async { println!("Simplest is just using an async block"); });
/// scheduler
///     .every(1.day())
///         .at("3:20 pm")
///     .run(|| some_async_fn());
/// scheduler
///     .every(Wednesday)
///         .at("14:20:17")
///     .run(some_async_fn);
/// scheduler
///     .every(Tuesday)
///         .at("14:20:17")
///     .and_every(Thursday)
///         .at("15:00")
///     .run(|| std::pin::Pin::from(returns_boxed_future()));
/// scheduler
///     .every(Weekday)
///     .run(|| returns_pinned_boxed_future());
/// scheduler
///     .every(1.day())
///         .at("3:20 pm")
///     .run(returns_pinned_boxed_future).once();
/// # tokio_test::block_on(async move {
/// // Manually run the scheduler forever
/// loop {
///     scheduler.run_pending().await;
///     tokio::time::sleep(Duration::from_millis(10)).await;
///     # break;
/// }
///
/// // Or spawn a task to run it forever
/// tokio::spawn(async move {
///   loop {
///     scheduler.run_pending().await;
///     tokio::time::sleep(Duration::from_millis(100)).await;
///   }
/// });
/// # });
/// ```
#[derive(Debug)]
pub struct AsyncScheduler<Tz = chrono::Local, Tp = ChronoTimeProvider>
where
    Tz: chrono::TimeZone,
    Tp: TimeProvider,
{
    jobs: Vec<AsyncJob<Tz, Tp>>,
    tz: Tz,
    _tp: PhantomData<Tp>,
}

impl Default for AsyncScheduler {
    fn default() -> AsyncScheduler {
        AsyncScheduler::<chrono::Local> {
            jobs: vec![],
            tz: chrono::Local,
            _tp: PhantomData,
        }
    }
}

impl AsyncScheduler {
    /// Create a new scheduler. Dates and times will be interpretted using the local timezone
    pub fn new() -> Self {
        Self::default()
    }

    /// Create a new scheduler. Dates and times will be interpretted using the specified timezone.
    pub fn with_tz<Tz: chrono::TimeZone>(tz: Tz) -> AsyncScheduler<Tz> {
        AsyncScheduler {
            jobs: vec![],
            tz,
            _tp: PhantomData,
        }
    }

    /// Create a new scheduler. Dates and times will be interpretted using the specified timezone.
    /// In addition, you can provide an alternate time provider. This is mostly useful for writing
    /// tests.
    pub fn with_tz_and_provider<Tz: chrono::TimeZone, Tp: TimeProvider>(
        tz: Tz,
    ) -> AsyncScheduler<Tz, Tp> {
        AsyncScheduler {
            jobs: vec![],
            tz,
            _tp: PhantomData,
        }
    }
}

impl<Tz, Tp> AsyncScheduler<Tz, Tp>
where
    Tz: chrono::TimeZone + Sync + Send,
    Tp: TimeProvider,
{
    /// Add a new job to the scheduler to be run on the given interval
    /// ```rust
    /// # use clokwerk::*;
    /// # use clokwerk::Interval::*;
    /// # use std::future::Future;
    /// # use std::pin::Pin;
    /// # async fn some_async_fn() {}
    /// # fn returns_boxed_future() -> Box<dyn Future<Output=()> + Send> { Box::new(some_async_fn()) }
    /// # fn returns_pinned_boxed_future() -> Pin<Box<dyn Future<Output=()> + Send>> { Box::pin(some_async_fn()) }
    /// let mut scheduler = AsyncScheduler::new();
    /// scheduler.every(10.minutes()).plus(30.seconds()).run(|| async { println!("Periodic task") });
    /// scheduler.every(1.day()).at("3:20 pm").run(|| some_async_fn());
    /// scheduler.every(Wednesday).at("14:20:17").run(|| Pin::from(returns_boxed_future()));
    /// scheduler.every(Weekday).run(|| returns_pinned_boxed_future());
    /// ```
    pub fn every(&mut self, ival: Interval) -> &mut AsyncJob<Tz, Tp> {
        let job = AsyncJob::<Tz, Tp>::new(ival, self.tz.clone());
        self.jobs.push(job);
        let last_index = self.jobs.len() - 1;
        &mut self.jobs[last_index]
    }

    /// Run all jobs that should run at this time.
    ///
    /// This method returns a future that will poll each of the tasks until they are completed.
    /// ```no_run
    /// # use clokwerk::*;
    /// # use clokwerk::Interval::*;
    /// use std::time::Duration;
    /// # let mut scheduler = AsyncScheduler::new();
    /// # async {
    /// loop {
    ///     scheduler.run_pending().await;
    ///     tokio::time::sleep(Duration::from_millis(100)).await;
    ///     # break
    /// }
    /// # };
    /// ```
    /// Note that while all pending jobs will run asynchronously, a long-running task can still
    /// block future executions if you `await` the future returned by this method.
    /// If you are concerned that a task might run for a long time, there are several possible approaches:
    ///
    /// 1. Pass the result of `scheduler.run_pending()` to your runtime's `spawn` function. This might
    ///    result in multiple invocations of the same task running concurrently.
    /// 2. Use `spawn` or `spawn_blocking` in your task itself. This has the same concurrent execution risk
    ///    as approach 1, but limited to that specific task.
    /// 3. Use `tokio::time::timeout` or equivalent to prevent `scheduler.run_pending()` or the task itself
    ///    from running more than an expected amount of time. E.g.
    /// ```no_run
    /// # use clokwerk::*;
    /// # use clokwerk::Interval::*;
    /// # async fn scrape_pages() {}
    /// use std::time::Duration;
    /// let mut scheduler = AsyncScheduler::new();
    /// scheduler.every(10.minutes()).run(|| async {
    ///   if let Err(_) = tokio::time::timeout(Duration::from_secs(10 * 60), scrape_pages()).await {
    ///     eprintln!("Timed out scraping pages")
    ///   }
    /// });
    /// ```
    pub fn run_pending(&mut self) -> AsyncSchedulerFuture {
        let now = Tp::now(&self.tz);
        let mut futures = vec![];
        for job in &mut self.jobs {
            if job.is_pending(&now) {
                if let Some(future) = job.execute(&now) {
                    futures.push(Some(future.into()));
                }
            }
        }
        AsyncSchedulerFuture { futures }
    }
}

pub struct AsyncSchedulerFuture {
    futures: Vec<Option<Pin<JobFuture>>>,
}

impl Future for AsyncSchedulerFuture {
    type Output = ();

    fn poll(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
        let mut all_done = true;

        for future in &mut self.get_mut().futures {
            if let Some(this_future) = future {
                if this_future.as_mut().poll(cx) == Poll::Ready(()) {
                    future.take();
                } else {
                    all_done = false;
                }
            }
        }
        if all_done {
            Poll::Ready(())
        } else {
            Poll::Pending
        }
    }
}