mod config;
mod reactor;
mod task_actor;
mod task_manager;
mod time_wheel;
use crate::schedule::config::{
DEFAULT_ERROR_CODE_1000, DEFAULT_ERROR_CODE_1001, DEFAULT_ERROR_CODE_1002,
};
use crate::schedule::task_actor::{ITaskHandler, Task, TaskOrderType, TaskStatus};
#[cfg(feature = "schedule")]
use crate::schedule::task_manager::TaskManager;
use crate::schedule::{reactor::TaskReactor, time_wheel::TierTimeWheel};
use chrono::Local;
use std::error::Error;
use std::sync::Arc;
use std::{marker::PhantomData, pin::Pin, str::FromStr};
#[cfg(feature = "schedule")]
pub enum RepeatModel {
Once,
Repetition,
Times(usize),
}
#[cfg(feature = "schedule")]
pub struct Scheduler {
pub(crate) time_wheel: Arc<TierTimeWheel>,
pub(crate) task_reactor: TaskReactor,
pub(crate) task_manager: TaskManager,
}
impl Scheduler {
pub fn new() -> Self {
let time_wheel = Arc::new(TierTimeWheel::new());
let mut task_reactor = TaskReactor::new();
let task_manager = TaskManager::new();
task_reactor.start(time_wheel.clone(), task_manager.get_notice_list());
Self {
time_wheel,
task_reactor,
task_manager,
}
}
pub async fn wait_all(&mut self) {
let _ = self.task_reactor.wait_all().await;
}
pub fn push_task(
&mut self,
cron: &str,
handle: impl IntoSystem,
repeat: RepeatModel,
) -> Result<usize, Box<dyn std::error::Error>> {
self.push_order_task(cron, handle, repeat)
}
pub fn push_order_task(
&mut self,
cron: &str,
handle: impl IntoSystem,
repeat: RepeatModel,
) -> Result<usize, Box<dyn std::error::Error>> {
if let Some(task_id) = self.task_manager.get_new_id() {
let cron_schedule = cron::Schedule::from_str(cron)?;
if let Some(next_time) = cron_schedule.upcoming(Local).next() {
let now_time = Local::now();
let time_delta = next_time.signed_duration_since(now_time);
let milliseconds = time_delta.num_milliseconds();
let task = Task::new(
cron_schedule,
Arc::new(Box::new(handle.to_system())),
repeat,
next_time,
TaskOrderType::Order,
task_id,
);
self.task_manager
.insert_new_task(task_id, task.get_sender());
self.time_wheel.push_T_to_time_wheel(task, milliseconds);
return Ok(task_id);
}
return Err(Box::new(SchedulerError(
DEFAULT_ERROR_CODE_1001.to_string(),
)));
} else {
return Err(Box::new(SchedulerError(
DEFAULT_ERROR_CODE_1000.to_string(),
)));
}
}
pub fn push_disorder_task(
&mut self,
cron: &str,
handle: impl IntoSystem,
repeat: RepeatModel,
) -> Result<usize, Box<dyn std::error::Error>> {
if let Some(task_id) = self.task_manager.get_new_id() {
let cron_schedule = cron::Schedule::from_str(cron)?;
if let Some(next_time) = cron_schedule.upcoming(Local).next() {
let now_time = Local::now();
let time_delta = next_time.signed_duration_since(now_time);
let milliseconds = time_delta.num_milliseconds();
let task = Task::new(
cron_schedule,
Arc::new(Box::new(handle.to_system())),
repeat,
next_time,
TaskOrderType::Disorder,
task_id,
);
self.task_manager
.insert_new_task(task_id, task.get_sender());
self.time_wheel.push_T_to_time_wheel(task, milliseconds);
return Ok(task_id);
} else {
return Err(Box::new(SchedulerError(
DEFAULT_ERROR_CODE_1001.to_string(),
)));
}
} else {
return Err(Box::new(SchedulerError(
DEFAULT_ERROR_CODE_1000.to_string(),
)));
}
}
pub async fn pause_task_by_id(&mut self, task_id: usize) -> bool {
self.task_manager
.update_task_status_by_id(task_id, TaskStatus::Pause)
.await
}
pub async fn restart_task_by_id(&mut self, task_id: usize) -> bool {
self.task_manager
.update_task_status_by_id(task_id, TaskStatus::Running)
.await
}
pub async fn destory_task_by_id(&mut self, task_id: usize) -> bool {
self.task_manager
.update_task_status_by_id(task_id, TaskStatus::Destory)
.await
}
pub async fn update_cron_by_id(
&mut self,
task_id: usize,
cron: &str,
) -> Result<(), Box<dyn std::error::Error>> {
let cron_schedule = cron::Schedule::from_str(cron)?;
if !self
.task_manager
.update_cron_by_id(task_id, cron_schedule)
.await
{
return Err(Box::new(SchedulerError(
DEFAULT_ERROR_CODE_1002.to_string(),
)));
}
Ok(())
}
}
pub(crate) trait IntoSystem: Sized {
type System: ITaskHandler + 'static;
fn to_system(self) -> Self::System;
}
impl<F: SystemParamFunction> IntoSystem for F {
type System = FunctionSystem<F>;
fn to_system(self) -> Self::System {
FunctionSystem {
func: self,
_maker: PhantomData,
}
}
}
#[derive(Clone)]
pub(crate) struct FunctionSystem<F>
where
F: SystemParamFunction + 'static,
{
func: F,
_maker: PhantomData<()>,
}
impl<F: SystemParamFunction> ITaskHandler for FunctionSystem<F> {
fn run(&self) -> Pin<Box<dyn Future<Output = ()> + Send + 'static>> {
Box::pin(self.func.run())
}
}
pub(crate) trait SystemParamFunction: Send + Sync + 'static {
fn run(&self) -> Pin<Box<dyn Future<Output = ()> + Send + 'static>>;
}
impl<T, Fut> SystemParamFunction for T
where
T: Fn() -> Fut + Send + Sync + 'static,
Fut: Future<Output = ()> + Send + 'static,
{
fn run(&self) -> Pin<Box<dyn Future<Output = ()> + Send + 'static>> {
Box::pin(self())
}
}
#[derive(Debug)]
struct SchedulerError(String);
impl std::fmt::Display for SchedulerError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let value = &self.0;
write!(f, "SchedulerError: {value}")
}
}
impl Error for SchedulerError {}