use std::{pin::Pin, sync::Arc};
use chrono::{DateTime, TimeZone};
use core::fmt;
use futures::Future;
pub type TaskFuture = Box<dyn Future<Output = ()> + Send>;
pub trait TaskFuturePinned {
fn get_pinned(&self) -> Pin<TaskFuture>;
}
pub struct TaskWrapper<F, T>(F)
where
F: Fn() -> T,
T: Future;
impl<F, T> TaskWrapper<F, T>
where
F: Fn() -> T,
T: Future,
{
pub fn new(f: F) -> Self {
TaskWrapper(f)
}
}
impl<F, T> TaskFuturePinned for TaskWrapper<F, T>
where
F: Fn() -> T,
T: Future<Output = ()> + Send + 'static,
{
fn get_pinned(&self) -> Pin<TaskFuture> {
Box::pin(self.0())
}
}
#[derive(Clone)]
pub struct AsyncEntry<Z>
where
Z: Send + Sync + 'static,
Z: TimeZone,
{
pub id: usize,
pub schedule: Option<cron::Schedule>,
pub next: Option<DateTime<Z>>,
pub run: Arc<dyn TaskFuturePinned + Send + Sync>,
}
impl<Z> fmt::Debug for AsyncEntry<Z>
where
Z: TimeZone + Send + Sync + 'static,
Z::Offset: fmt::Debug,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("AsyncEntry")
.field("id", &self.id)
.field("schedule", &self.schedule)
.field("next", &self.next)
.finish()
}
}
impl<Z> AsyncEntry<Z>
where
Z: Send + Sync + 'static,
Z: TimeZone,
{
pub fn get_next(&self, tz: Z) -> Option<DateTime<Z>> {
self.schedule.as_ref().and_then(|s| s.upcoming(tz).next())
}
pub fn is_once(&self) -> bool {
self.schedule.is_none()
}
}
#[cfg(all(test, feature = "async"))]
mod tests {
use super::*;
use chrono::Utc;
use std::sync::{Arc, Mutex};
#[tokio::test]
async fn test_async_entry_debug() {
let entry: AsyncEntry<Utc> = AsyncEntry {
id: 1,
next: None,
schedule: Some("* * * * * *".parse().unwrap()),
run: Arc::new(TaskWrapper::new(|| async { })),
};
let debug_str = format!("{:?}", entry);
assert!(debug_str.contains("AsyncEntry"));
assert!(debug_str.contains("id: 1"));
}
#[tokio::test]
async fn test_async_entry_get_next() {
let entry: AsyncEntry<Utc> = AsyncEntry {
id: 1,
next: None,
schedule: Some("* * * * * *".parse().unwrap()),
run: Arc::new(TaskWrapper::new(|| async { })),
};
let now = Utc::now();
let next = entry.get_next(Utc);
assert!(next.is_some());
assert!(next.unwrap() > now);
}
#[tokio::test]
async fn test_async_entry_once() {
let entry: AsyncEntry<Utc> = AsyncEntry {
id: 1,
next: Some(Utc::now()),
schedule: None,
run: Arc::new(TaskWrapper::new(|| async { })),
};
let next = entry.get_next(Utc);
assert!(next.is_none());
}
#[tokio::test]
async fn test_task_wrapper() {
let executed = Arc::new(Mutex::new(false));
let executed_clone = Arc::clone(&executed);
let wrapper = TaskWrapper::new(move || {
let executed = executed_clone.clone();
async move {
*executed.lock().unwrap() = true;
}
});
wrapper.get_pinned().await;
assert!(*executed.lock().unwrap());
}
}