extern crate chrono;
extern crate cron;
use chrono::DateTime;
use chrono::TimeZone;
use cron::Schedule;
use log::{debug, error, warn};
pub struct TaskStep<'a> {
pub(crate) function: Box<dyn (FnMut() -> Result<(), ()>) + 'a>,
pub(crate) description: String,
}
#[derive(Debug, PartialEq)]
pub enum Status {
Init,
Scheduled,
Failed,
Executed,
Finished,
}
pub struct Task<'a, T>
where
T: TimeZone,
{
pub(crate) steps: Vec<TaskStep<'a>>,
pub(crate) schedule: Schedule,
pub(crate) repeats: Option<usize>,
pub(crate) description: String,
pub(crate) timezone: T,
pub(crate) task_id: usize,
pub(crate) next_exec: Option<DateTime<T>>,
pub(crate) status: Status,
}
impl<'a, T> Task<'a, T>
where
T: TimeZone,
{
pub fn new(
expression: &str,
description: Option<&str>,
repeats: Option<usize>,
timezone: T,
) -> Task<'a, T> {
Task {
steps: Vec::new(),
schedule: expression.parse().unwrap(),
description: match description {
Some(s) => s.to_string(),
None => "-".to_string(),
},
repeats,
timezone,
task_id: 0,
status: Status::Init,
next_exec: None,
}
}
pub(crate) fn add_step<F>(&mut self, description: Option<&str>, function: F) -> &mut Task<'a, T>
where
F: (FnMut() -> Result<(), ()>) + 'a,
{
self.steps.push(TaskStep {
function: Box::new(function),
description: match description {
Some(s) => s.to_string(),
None => "-".to_string(),
},
});
self
}
pub(crate) fn set_steps(&mut self, steps: Vec<TaskStep<'a>>) -> &mut Task<'a, T> {
self.steps = steps;
self
}
pub(crate) fn set_schedule(&mut self, schedule: Schedule) -> &mut Task<'a, T> {
self.schedule = schedule;
self
}
pub(crate) fn init(&mut self, id: usize) {
self.task_id = id;
self.next_exec = Some(
self.schedule
.upcoming(self.timezone.clone())
.next()
.unwrap(),
);
self.status = Status::Scheduled;
}
pub(crate) fn run_task(&mut self) {
match &self.status {
Status::Init => panic!("Task not initialized yet!"),
Status::Failed => panic!("Task must be rescheduled!"),
Status::Executed => panic!("Task already executed and must be rescheduled!"),
Status::Finished => panic!("Task has finished and must be removed!"),
Status::Scheduled => {
debug!(
"[Task {}] [{}] is been executed...",
self.task_id, self.description
);
let mut had_error: bool = false;
for (index, step) in self.steps.iter_mut().enumerate() {
if !had_error {
match (step.function)() {
Ok(_) => {
debug!(
"[Task {}-{}] [{}] Executed successfully.",
self.task_id, index, step.description,
);
self.status = Status::Executed
}
Err(_) => {
error!(
"[Task {}] [{}] Execution failed at step {}.",
self.task_id, index, step.description,
);
had_error = true;
self.status = Status::Failed
}
};
}
}
if self.steps.len() == 0 {
self.status = Status::Executed
}
self.repeats = match self.repeats {
Some(t) => Some(t - 1),
None => None,
};
}
}
}
pub(crate) fn reschedule(&mut self) {
match &self.status {
Status::Init => panic!("Task not initialized yet!"),
Status::Failed | Status::Executed => {
self.next_exec = Some(
self.schedule
.upcoming(self.timezone.clone())
.next()
.unwrap(),
);
self.status = match self.repeats {
Some(t) => {
if t > 0 {
debug!("[Task {}] Has been rescheduled.", self.task_id);
Status::Scheduled
} else {
warn!(
"[Task {}] Has finished it's execution cycle and will be removed.",
self.task_id
);
Status::Finished
}
}
None => Status::Scheduled,
}
}
Status::Finished => panic!("[Task {}] has finished and must be removed!", self.task_id),
Status::Scheduled => { }
}
}
}
#[cfg(test)]
mod test {
use super::*;
use chrono::prelude::*;
#[test]
fn normal_task_flow_test() {
let mut task = Task::new("* * * * * *", Some("Test task"), Some(2), Local);
task.add_step(None, || Ok(()));
assert_eq!(task.status, Status::Init);
task.init(0);
assert_eq!(task.status, Status::Scheduled);
task.run_task();
assert_eq!(task.status, Status::Executed);
task.reschedule();
assert_eq!(task.status, Status::Scheduled);
task.run_task();
assert_eq!(task.status, Status::Executed);
task.reschedule();
assert_eq!(task.status, Status::Finished);
}
#[test]
fn test_task_set_schedule() {
let schedule: Schedule = "* * * * * * *".parse().unwrap();
let mut task = Task::new("* * * * * * *", None, None, Local);
task.set_schedule(schedule);
task.add_step(None, || Ok(()));
assert_eq!(task.status, Status::Init);
task.init(0);
assert_eq!(task.status, Status::Scheduled);
}
#[test]
fn normal_task_error_flow_test() {
let mut task = Task::new("* * * * * *", Some("Test task"), Some(2), Local);
task.add_step(None, || Err(()));
assert_eq!(task.status, Status::Init);
task.init(0);
assert_eq!(task.status, Status::Scheduled);
task.run_task();
assert_eq!(task.status, Status::Failed);
task.reschedule();
assert_eq!(task.status, Status::Scheduled);
task.run_task();
assert_eq!(task.status, Status::Failed);
task.reschedule();
assert_eq!(task.status, Status::Finished);
}
#[test]
fn normal_task_no_fixed_repeats_test() {
let mut task = Task::new("* * * * * * *", Some("Test task"), None, Local);
task.add_step(None, || Ok(()));
assert_eq!(task.status, Status::Init);
task.init(0);
assert_eq!(task.status, Status::Scheduled);
for _i in 1..10 {
task.run_task();
assert_eq!(task.status, Status::Executed);
task.reschedule();
assert_eq!(task.status, Status::Scheduled);
}
}
#[test]
#[should_panic(expected = "Task not initialized yet!")]
fn test_reschedule_init_panic() {
let mut task = Task::new("* * * * * * *", None, None, Local);
task.reschedule();
}
#[test]
#[should_panic(expected = "[Task 0] has finished and must be removed!")]
fn test_reschedule_finished_panic() {
let mut task = Task::new("* * * * * * *", None, Some(1), Local);
task.init(0);
task.run_task();
task.reschedule();
task.reschedule();
}
#[test]
#[should_panic = "Task not initialized yet!"]
fn test_run_uninitialized_task() {
let mut task = Task::new("* * * * * * *", None, None, Local);
task.run_task();
}
#[test]
#[should_panic = "Task must be rescheduled!"]
fn test_run_failed_task() {
let mut task = Task::new("* * * * * * *", None, None, Local);
task.add_step(None, || Err(()));
task.init(0);
task.run_task();
task.run_task();
}
#[test]
#[should_panic = "Task already executed and must be rescheduled!"]
fn test_run_executed_task() {
let mut task = Task::new("* * * * * * *", None, None, Local);
task.add_step(None, || Ok(()));
task.init(0);
task.run_task();
task.run_task();
}
#[test]
#[should_panic = "Task has finished and must be removed!"]
fn test_run_finished_task() {
let mut task = Task::new("* * * * * * *", None, Some(1), Local);
task.init(0);
task.run_task();
task.reschedule();
task.run_task();
}
}