use async_trait::async_trait;
use itertools::Itertools;
use std::{
collections::{BTreeMap, HashMap},
fmt::Debug,
sync::{Arc, Mutex, RwLock},
};
use chrono::{DateTime, Utc};
use croner::Cron;
use std::str::FromStr;
use crate::prelude::*;
use cloudillo_types::{lock, meta_adapter};
pub type TaskId = u64;
pub enum TaskType {
Periodic,
Once,
}
#[derive(Debug, Clone)]
pub struct CronSchedule {
expr: Box<str>,
cron: Cron,
}
impl CronSchedule {
pub fn parse(expr: &str) -> ClResult<Self> {
let cron = Cron::from_str(expr)
.map_err(|e| Error::ValidationError(format!("invalid cron expression: {}", e)))?;
Ok(Self { expr: expr.into(), cron })
}
pub fn next_execution(&self, after: Timestamp) -> ClResult<Timestamp> {
let dt = DateTime::<Utc>::from_timestamp(after.0, 0).unwrap_or_else(Utc::now);
self.cron
.find_next_occurrence(&dt, false)
.map(|next| Timestamp(next.timestamp()))
.map_err(|e| {
tracing::error!("Failed to find next cron occurrence for '{}': {}", self.expr, e);
Error::ValidationError(format!("cron next_execution failed: {}", e))
})
}
pub fn to_cron_string(&self) -> String {
self.expr.to_string()
}
}
impl PartialEq for CronSchedule {
fn eq(&self, other: &Self) -> bool {
self.expr == other.expr
}
}
impl Eq for CronSchedule {}
#[async_trait]
pub trait Task<S: Clone>: Send + Sync + Debug {
fn kind() -> &'static str
where
Self: Sized;
fn build(id: TaskId, context: &str) -> ClResult<Arc<dyn Task<S>>>
where
Self: Sized;
fn serialize(&self) -> String;
async fn run(&self, state: &S) -> ClResult<()>;
fn kind_of(&self) -> &'static str;
async fn on_failed(&self, _state: &S, _attempts: u16, _last_error: &str) {}
}
#[derive(Debug)]
pub enum TaskStatus {
Pending,
Completed,
Failed,
}
pub struct TaskData {
id: TaskId,
kind: Box<str>,
status: TaskStatus,
input: Box<str>,
deps: Box<[TaskId]>,
retry_data: Option<Box<str>>,
cron_data: Option<Box<str>>,
next_at: Option<Timestamp>,
}
#[async_trait]
pub trait TaskStore<S: Clone>: Send + Sync {
async fn add(&self, task: &TaskMeta<S>, key: Option<&str>) -> ClResult<TaskId>;
async fn finished(&self, id: TaskId, output: &str) -> ClResult<()>;
async fn load(&self) -> ClResult<Vec<TaskData>>;
async fn update_task_error(
&self,
task_id: TaskId,
output: &str,
next_at: Option<Timestamp>,
) -> ClResult<()>;
async fn find_by_key(&self, key: &str) -> ClResult<Option<(TaskId, TaskData)>>;
async fn update_task(&self, id: TaskId, task: &TaskMeta<S>) -> ClResult<()>;
async fn find_completed_deps(&self, deps: &[TaskId]) -> ClResult<Vec<TaskId>>;
}
pub struct InMemoryTaskStore {
last_id: Mutex<TaskId>,
}
impl InMemoryTaskStore {
pub fn new() -> Arc<Self> {
Arc::new(Self { last_id: Mutex::new(0) })
}
}
#[async_trait]
impl<S: Clone> TaskStore<S> for InMemoryTaskStore {
async fn add(&self, _task: &TaskMeta<S>, _key: Option<&str>) -> ClResult<TaskId> {
let mut last_id = lock!(self.last_id)?;
*last_id += 1;
Ok(*last_id)
}
async fn finished(&self, _id: TaskId, _output: &str) -> ClResult<()> {
Ok(())
}
async fn load(&self) -> ClResult<Vec<TaskData>> {
Ok(vec![])
}
async fn update_task_error(
&self,
_task_id: TaskId,
_output: &str,
_next_at: Option<Timestamp>,
) -> ClResult<()> {
Ok(())
}
async fn find_by_key(&self, _key: &str) -> ClResult<Option<(TaskId, TaskData)>> {
Ok(None)
}
async fn update_task(&self, _id: TaskId, _task: &TaskMeta<S>) -> ClResult<()> {
Ok(())
}
async fn find_completed_deps(&self, _deps: &[TaskId]) -> ClResult<Vec<TaskId>> {
Ok(vec![])
}
}
pub struct MetaAdapterTaskStore {
meta_adapter: Arc<dyn meta_adapter::MetaAdapter>,
}
impl MetaAdapterTaskStore {
pub fn new(meta_adapter: Arc<dyn meta_adapter::MetaAdapter>) -> Arc<Self> {
Arc::new(Self { meta_adapter })
}
}
#[async_trait]
impl<S: Clone> TaskStore<S> for MetaAdapterTaskStore {
async fn add(&self, task: &TaskMeta<S>, key: Option<&str>) -> ClResult<TaskId> {
let id = self
.meta_adapter
.create_task(task.task.kind_of(), key, &task.task.serialize(), &task.deps)
.await?;
if let Some(cron) = &task.cron {
self.meta_adapter
.update_task(
id,
&meta_adapter::TaskPatch {
cron: Patch::Value(cron.to_cron_string()),
..Default::default()
},
)
.await?;
}
Ok(id)
}
async fn finished(&self, id: TaskId, output: &str) -> ClResult<()> {
self.meta_adapter.update_task_finished(id, output).await
}
async fn load(&self) -> ClResult<Vec<TaskData>> {
let tasks = self.meta_adapter.list_tasks(meta_adapter::ListTaskOptions::default()).await?;
let tasks = tasks
.into_iter()
.map(|t| TaskData {
id: t.task_id,
kind: t.kind,
status: match t.status {
'P' => TaskStatus::Pending,
'F' => TaskStatus::Completed,
_ => TaskStatus::Failed,
},
input: t.input,
deps: t.deps,
retry_data: t.retry,
cron_data: t.cron,
next_at: t.next_at,
})
.collect();
Ok(tasks)
}
async fn update_task_error(
&self,
task_id: TaskId,
output: &str,
next_at: Option<Timestamp>,
) -> ClResult<()> {
self.meta_adapter.update_task_error(task_id, output, next_at).await
}
async fn find_by_key(&self, key: &str) -> ClResult<Option<(TaskId, TaskData)>> {
let task_opt = self.meta_adapter.find_task_by_key(key).await?;
match task_opt {
Some(t) => Ok(Some((
t.task_id,
TaskData {
id: t.task_id,
kind: t.kind,
status: match t.status {
'P' => TaskStatus::Pending,
'F' => TaskStatus::Completed,
_ => TaskStatus::Failed,
},
input: t.input,
deps: t.deps,
retry_data: t.retry,
cron_data: t.cron,
next_at: t.next_at,
},
))),
None => Ok(None),
}
}
async fn update_task(&self, id: TaskId, task: &TaskMeta<S>) -> ClResult<()> {
use cloudillo_types::types::Patch;
let mut patch = meta_adapter::TaskPatch {
input: Patch::Value(task.task.serialize()),
next_at: match task.next_at {
Some(ts) => Patch::Value(ts),
None => Patch::Null,
},
..Default::default()
};
if !task.deps.is_empty() {
patch.deps = Patch::Value(task.deps.clone());
}
if let Some(ref retry) = task.retry {
let retry_str = format!(
"{},{},{},{}",
task.retry_count, retry.wait_min_max.0, retry.wait_min_max.1, retry.times
);
patch.retry = Patch::Value(retry_str);
}
if let Some(ref cron) = task.cron {
patch.cron = Patch::Value(cron.to_cron_string());
}
self.meta_adapter.update_task(id, &patch).await
}
async fn find_completed_deps(&self, deps: &[TaskId]) -> ClResult<Vec<TaskId>> {
self.meta_adapter.find_completed_deps(deps).await
}
}
type TaskBuilder<S> = dyn Fn(TaskId, &str) -> ClResult<Arc<dyn Task<S>>> + Send + Sync;
#[derive(Debug, Clone)]
pub struct RetryPolicy {
wait_min_max: (u64, u64),
times: u16,
}
impl Default for RetryPolicy {
fn default() -> Self {
Self { wait_min_max: (60, 3600), times: 10 }
}
}
impl RetryPolicy {
pub fn new(wait_min_max: (u64, u64), times: u16) -> Self {
Self { wait_min_max, times }
}
pub fn calculate_backoff(&self, attempt_count: u16) -> u64 {
let (min, max) = self.wait_min_max;
let backoff = min * (1u64 << u64::from(attempt_count));
backoff.min(max)
}
pub fn should_retry(&self, attempt_count: u16) -> bool {
attempt_count < self.times
}
}
pub struct TaskSchedulerBuilder<'a, S: Clone> {
scheduler: &'a Scheduler<S>,
task: Arc<dyn Task<S>>,
key: Option<String>,
next_at: Option<Timestamp>,
deps: Vec<TaskId>,
retry: Option<RetryPolicy>,
cron: Option<CronSchedule>,
run_on_startup: bool,
}
impl<'a, S: Clone + Send + Sync + 'static> TaskSchedulerBuilder<'a, S> {
fn new(scheduler: &'a Scheduler<S>, task: Arc<dyn Task<S>>) -> Self {
Self {
scheduler,
task,
key: None,
next_at: None,
deps: Vec::new(),
retry: None,
cron: None,
run_on_startup: false,
}
}
pub fn key(mut self, key: impl Into<String>) -> Self {
self.key = Some(key.into());
self
}
pub fn schedule_at(mut self, timestamp: Timestamp) -> Self {
self.next_at = Some(timestamp);
self
}
pub fn schedule_after(mut self, seconds: i64) -> Self {
self.next_at = Some(Timestamp::from_now(seconds));
self
}
pub fn depend_on(mut self, deps: Vec<TaskId>) -> Self {
self.deps = deps;
self
}
pub fn depends_on(mut self, dep: TaskId) -> Self {
self.deps.push(dep);
self
}
pub fn with_retry(mut self, policy: RetryPolicy) -> Self {
self.retry = Some(policy);
self
}
pub fn cron(mut self, expr: impl Into<String>) -> Self {
if let Ok(cron_schedule) = CronSchedule::parse(&expr.into()) {
self.next_at = cron_schedule.next_execution(Timestamp::now()).ok();
self.cron = Some(cron_schedule);
}
self
}
pub fn daily_at(mut self, hour: u8, minute: u8) -> Self {
if hour <= 23 && minute <= 59 {
let expr = format!("{} {} * * *", minute, hour);
if let Ok(cron_schedule) = CronSchedule::parse(&expr) {
self.next_at = cron_schedule.next_execution(Timestamp::now()).ok();
self.cron = Some(cron_schedule);
}
}
self
}
pub fn weekly_at(mut self, weekday: u8, hour: u8, minute: u8) -> Self {
if weekday <= 6 && hour <= 23 && minute <= 59 {
let expr = format!("{} {} * * {}", minute, hour, weekday);
if let Ok(cron_schedule) = CronSchedule::parse(&expr) {
self.next_at = cron_schedule.next_execution(Timestamp::now()).ok();
self.cron = Some(cron_schedule);
}
}
self
}
pub fn run_on_startup(mut self) -> Self {
self.run_on_startup = true;
self
}
pub async fn now(self) -> ClResult<TaskId> {
self.schedule().await
}
pub async fn at(mut self, ts: Timestamp) -> ClResult<TaskId> {
self.next_at = Some(ts);
self.schedule().await
}
pub async fn after(mut self, seconds: i64) -> ClResult<TaskId> {
self.next_at = Some(Timestamp::from_now(seconds));
self.schedule().await
}
pub async fn after_task(mut self, dep: TaskId) -> ClResult<TaskId> {
self.deps.push(dep);
self.schedule().await
}
pub async fn with_automatic_retry(mut self) -> ClResult<TaskId> {
self.retry = Some(RetryPolicy::default());
self.schedule().await
}
pub async fn schedule(self) -> ClResult<TaskId> {
self.scheduler
.schedule_task_impl(
self.task,
self.key.as_deref(),
self.next_at,
if self.deps.is_empty() { None } else { Some(self.deps) },
self.retry,
self.cron,
self.run_on_startup,
)
.await
}
}
#[derive(Debug, Clone)]
pub struct TaskMeta<S: Clone> {
pub task: Arc<dyn Task<S>>,
pub next_at: Option<Timestamp>,
pub deps: Vec<TaskId>,
retry_count: u16,
pub retry: Option<RetryPolicy>,
pub cron: Option<CronSchedule>,
}
type TaskBuilderRegistry<S> = HashMap<&'static str, Box<TaskBuilder<S>>>;
type ScheduledTaskMap<S> = BTreeMap<(Timestamp, TaskId), TaskMeta<S>>;
#[derive(Clone)]
pub struct Scheduler<S: Clone> {
task_builders: Arc<RwLock<TaskBuilderRegistry<S>>>,
store: Arc<dyn TaskStore<S>>,
tasks_running: Arc<Mutex<HashMap<TaskId, TaskMeta<S>>>>,
tasks_waiting: Arc<Mutex<HashMap<TaskId, TaskMeta<S>>>>,
task_dependents: Arc<Mutex<HashMap<TaskId, Vec<TaskId>>>>,
tasks_scheduled: Arc<Mutex<ScheduledTaskMap<S>>>,
tx_finish: flume::Sender<TaskId>,
rx_finish: flume::Receiver<TaskId>,
notify_schedule: Arc<tokio::sync::Notify>,
}
impl<S: Clone + Send + Sync + 'static> Scheduler<S> {
pub fn new(store: Arc<dyn TaskStore<S>>) -> Arc<Self> {
let (tx_finish, rx_finish) = flume::unbounded();
let scheduler = Self {
task_builders: Arc::new(RwLock::new(HashMap::new())),
store,
tasks_running: Arc::new(Mutex::new(HashMap::new())),
tasks_waiting: Arc::new(Mutex::new(HashMap::new())),
task_dependents: Arc::new(Mutex::new(HashMap::new())),
tasks_scheduled: Arc::new(Mutex::new(BTreeMap::new())),
tx_finish,
rx_finish,
notify_schedule: Arc::new(tokio::sync::Notify::new()),
};
Arc::new(scheduler)
}
pub fn start(&self, state: S) {
let schedule = self.clone();
let stat = state.clone();
let rx_finish = self.rx_finish.clone();
tokio::spawn(async move {
while let Ok(id) = rx_finish.recv_async().await {
debug!("Completed task {} (notified)", id);
let task_meta_opt = {
let tasks_running = match schedule.tasks_running.lock() {
Ok(guard) => guard,
Err(poisoned) => {
error!("Mutex poisoned: tasks_running (recovering)");
poisoned.into_inner()
}
};
tasks_running.get(&id).cloned()
};
if let Some(task_meta) = task_meta_opt {
let mut transition_ok = false;
if let Some(ref cron) = task_meta.cron {
let next_at = match cron.next_execution(Timestamp::now()) {
Ok(ts) => ts,
Err(e) => {
error!(
"Failed to calculate next execution for recurring task {}: {} - task will not reschedule",
id, e
);
if let Err(e) = schedule.store.finished(id, "").await {
error!("Failed to mark task {} as finished: {}", id, e);
}
continue;
}
};
info!(
"Recurring task {} completed, scheduling next execution at {}",
id, next_at
);
let mut updated_meta = task_meta.clone();
updated_meta.next_at = Some(next_at);
if let Err(e) = schedule.store.update_task(id, &updated_meta).await {
error!("Failed to update recurring task {} next_at: {}", id, e);
}
match schedule.tasks_running.lock() {
Ok(mut tasks_running) => {
tasks_running.remove(&id);
}
Err(poisoned) => {
error!("Mutex poisoned: tasks_running (recovering)");
poisoned.into_inner().remove(&id);
}
}
match schedule.add_queue(id, updated_meta).await {
Ok(_) => transition_ok = true,
Err(e) => {
error!(
"Failed to reschedule recurring task {}: {} - task lost!",
id, e
);
}
}
} else {
match schedule.store.finished(id, "").await {
Ok(()) => transition_ok = true,
Err(e) => {
error!(
"Failed to mark task {} as finished: {} - task remains in running queue",
id, e
);
}
}
}
if transition_ok {
match schedule.tasks_running.lock() {
Ok(mut tasks_running) => {
tasks_running.remove(&id);
}
Err(poisoned) => {
error!("Mutex poisoned: tasks_running (recovering)");
poisoned.into_inner().remove(&id);
}
}
}
match schedule.release_dependents(id) {
Ok(ready_to_spawn) => {
for (dep_id, dep_task_meta) in ready_to_spawn {
match schedule.tasks_running.lock() {
Ok(mut tasks_running) => {
tasks_running.insert(dep_id, dep_task_meta.clone());
}
Err(poisoned) => {
error!("Mutex poisoned: tasks_running (recovering)");
poisoned.into_inner().insert(dep_id, dep_task_meta.clone());
}
}
schedule.spawn_task(
stat.clone(),
dep_task_meta.task.clone(),
dep_id,
dep_task_meta,
);
}
}
Err(e) => {
error!("Failed to release dependents of task {}: {}", id, e);
}
}
} else {
warn!("Completed task {} not found in running queue", id);
}
}
});
let schedule = self.clone();
tokio::spawn(async move {
loop {
let is_empty = match schedule.tasks_scheduled.lock() {
Ok(guard) => guard.is_empty(),
Err(poisoned) => {
error!("Mutex poisoned: tasks_scheduled (recovering)");
poisoned.into_inner().is_empty()
}
};
if is_empty {
schedule.notify_schedule.notified().await;
}
let time = Timestamp::now();
if let Some((timestamp, _id)) = loop {
let mut tasks_scheduled = match schedule.tasks_scheduled.lock() {
Ok(guard) => guard,
Err(poisoned) => {
error!("Mutex poisoned: tasks_scheduled (recovering)");
poisoned.into_inner()
}
};
if let Some((&(timestamp, id), _)) = tasks_scheduled.first_key_value() {
let (timestamp, id) = (timestamp, id);
if timestamp <= Timestamp::now() {
debug!("Spawning task id {} (from schedule)", id);
if let Some(task) = tasks_scheduled.remove(&(timestamp, id)) {
let mut tasks_running = match schedule.tasks_running.lock() {
Ok(guard) => guard,
Err(poisoned) => {
error!("Mutex poisoned: tasks_running (recovering)");
poisoned.into_inner()
}
};
tasks_running.insert(id, task.clone());
schedule.spawn_task(state.clone(), task.task.clone(), id, task);
} else {
error!("Task disappeared while being removed from schedule");
break None;
}
} else {
break Some((timestamp, id));
}
} else {
break None;
}
} {
let diff = timestamp.0 - time.0;
let wait =
tokio::time::Duration::from_secs(u64::try_from(diff).unwrap_or_default());
tokio::select! {
() = tokio::time::sleep(wait) => (), () = schedule.notify_schedule.notified() => ()
};
}
}
});
let schedule = self.clone();
tokio::spawn(async move {
let _ignore_err = schedule.load().await;
});
}
fn register_builder(
&self,
name: &'static str,
builder: &'static TaskBuilder<S>,
) -> ClResult<&Self> {
let mut task_builders = self
.task_builders
.write()
.map_err(|_| Error::Internal("task_builders RwLock poisoned".into()))?;
task_builders.insert(name, Box::new(builder));
Ok(self)
}
pub fn register<T: Task<S>>(&self) -> ClResult<&Self> {
info!("Registering task type {}", T::kind());
self.register_builder(T::kind(), &|id: TaskId, params: &str| T::build(id, params))?;
Ok(self)
}
pub fn task(&self, task: Arc<dyn Task<S>>) -> TaskSchedulerBuilder<'_, S> {
TaskSchedulerBuilder::new(self, task)
}
#[allow(clippy::too_many_arguments)]
async fn schedule_task_impl(
&self,
task: Arc<dyn Task<S>>,
key: Option<&str>,
next_at: Option<Timestamp>,
deps: Option<Vec<TaskId>>,
retry: Option<RetryPolicy>,
cron: Option<CronSchedule>,
run_on_startup: bool,
) -> ClResult<TaskId> {
let existing = if let Some(k) = key { self.store.find_by_key(k).await? } else { None };
let effective_next_at = if run_on_startup && cron.is_some() {
match &existing {
Some((_existing_id, existing_data)) => {
match existing_data.next_at {
Some(persisted) if persisted > Timestamp::now() => next_at,
_ => Some(Timestamp::now()),
}
}
None => Some(Timestamp::now()), }
} else {
next_at
};
let task_meta = TaskMeta {
task: task.clone(),
next_at: effective_next_at,
deps: deps.clone().unwrap_or_default(),
retry_count: 0,
retry,
cron,
};
if let Some(key) = key
&& let Some((existing_id, existing_data)) = existing
{
let new_serialized = task.serialize();
let existing_serialized = existing_data.input.as_ref();
if new_serialized == existing_serialized {
info!(
"Recurring task '{}' already exists with identical parameters (id={})",
key, existing_id
);
self.store.update_task(existing_id, &task_meta).await?;
self.add_queue(existing_id, task_meta).await?;
return Ok(existing_id);
}
info!("Updating recurring task '{}' (id={}) - parameters changed", key, existing_id);
debug!(" Old params: {}", existing_serialized);
debug!(" New params: {}", new_serialized);
self.remove_from_queues(existing_id)?;
self.store.update_task(existing_id, &task_meta).await?;
self.add_queue(existing_id, task_meta).await?;
return Ok(existing_id);
}
let id = self.store.add(&task_meta, key).await?;
self.add_queue(id, task_meta).await
}
pub async fn add(&self, task: Arc<dyn Task<S>>) -> ClResult<TaskId> {
self.task(task).now().await
}
pub async fn add_queue(&self, id: TaskId, task_meta: TaskMeta<S>) -> ClResult<TaskId> {
{
let mut running = lock!(self.tasks_running, "tasks_running")?;
if let Some(existing_meta) = running.get_mut(&id) {
debug!(
"Task {} is already running, updating metadata (will reschedule on completion)",
id
);
*existing_meta = task_meta;
return Ok(id);
}
}
{
let mut scheduled = lock!(self.tasks_scheduled, "tasks_scheduled")?;
if let Some(key) = scheduled
.iter()
.find(|((_, tid), _)| *tid == id)
.map(|((ts, tid), _)| (*ts, *tid))
{
scheduled.remove(&key);
debug!("Removed existing scheduled entry for task {} before re-queueing", id);
}
}
{
let mut waiting = lock!(self.tasks_waiting, "tasks_waiting")?;
if waiting.remove(&id).is_some() {
debug!("Removed existing waiting entry for task {} before re-queueing", id);
}
}
let deps = task_meta.deps.clone();
if !deps.is_empty() && task_meta.next_at.is_some() {
warn!(
"Task {} has both dependencies and scheduled time - ignoring next_at, placing in waiting queue",
id
);
lock!(self.tasks_waiting, "tasks_waiting")?.insert(id, task_meta);
debug!("Task {} is waiting for {:?}", id, &deps);
for dep in &deps {
lock!(self.task_dependents, "task_dependents")?
.entry(*dep)
.or_default()
.push(id);
}
self.check_and_resolve_completed_deps(id, &deps).await?;
return Ok(id);
}
if deps.is_empty() && task_meta.next_at.unwrap_or(Timestamp(0)) < Timestamp::now() {
debug!("Spawning task {}", id);
lock!(self.tasks_scheduled, "tasks_scheduled")?.insert((Timestamp(0), id), task_meta);
self.notify_schedule.notify_one();
} else if let Some(next_at) = task_meta.next_at {
debug!("Scheduling task {} for {}", id, next_at);
lock!(self.tasks_scheduled, "tasks_scheduled")?.insert((next_at, id), task_meta);
self.notify_schedule.notify_one();
} else {
lock!(self.tasks_waiting, "tasks_waiting")?.insert(id, task_meta);
debug!("Task {} is waiting for {:?}", id, &deps);
for dep in &deps {
lock!(self.task_dependents, "task_dependents")?
.entry(*dep)
.or_default()
.push(id);
}
self.check_and_resolve_completed_deps(id, &deps).await?;
}
Ok(id)
}
async fn check_and_resolve_completed_deps(&self, id: TaskId, deps: &[TaskId]) -> ClResult<()> {
let completed_deps = self.store.find_completed_deps(deps).await?;
if completed_deps.is_empty() {
return Ok(());
}
let mut waiting = lock!(self.tasks_waiting, "tasks_waiting")?;
if let Some(task_meta) = waiting.get_mut(&id) {
for dep in &completed_deps {
task_meta.deps.retain(|d| *d != *dep);
}
if task_meta.deps.is_empty()
&& let Some(ready_task) = waiting.remove(&id)
{
drop(waiting);
let mut dependents = lock!(self.task_dependents, "task_dependents")?;
for dep in deps {
if let Some(dep_list) = dependents.get_mut(dep) {
dep_list.retain(|d| *d != id);
if dep_list.is_empty() {
dependents.remove(dep);
}
}
}
drop(dependents);
debug!("Task {} deps already completed, scheduling immediately", id);
lock!(self.tasks_scheduled, "tasks_scheduled")?
.insert((Timestamp(0), id), ready_task);
self.notify_schedule.notify_one();
}
}
Ok(())
}
fn remove_from_queues(&self, task_id: TaskId) -> ClResult<Option<TaskMeta<S>>> {
if let Some(task_meta) = lock!(self.tasks_waiting, "tasks_waiting")?.remove(&task_id) {
debug!("Removed task {} from waiting queue for update", task_id);
return Ok(Some(task_meta));
}
{
let mut scheduled = lock!(self.tasks_scheduled, "tasks_scheduled")?;
if let Some(key) = scheduled
.iter()
.find(|((_, id), _)| *id == task_id)
.map(|((ts, id), _)| (*ts, *id))
&& let Some(task_meta) = scheduled.remove(&key)
{
debug!("Removed task {} from scheduled queue for update", task_id);
return Ok(Some(task_meta));
}
}
if let Some(task_meta) = lock!(self.tasks_running, "tasks_running")?.remove(&task_id) {
warn!("Removed task {} from running queue during update", task_id);
return Ok(Some(task_meta));
}
Ok(None)
}
fn release_dependents(
&self,
completed_task_id: TaskId,
) -> ClResult<Vec<(TaskId, TaskMeta<S>)>> {
let dependents = {
let mut deps_map = lock!(self.task_dependents, "task_dependents")?;
deps_map.remove(&completed_task_id).unwrap_or_default()
};
if dependents.is_empty() {
return Ok(Vec::new()); }
debug!("Releasing {} dependents of completed task {}", dependents.len(), completed_task_id);
let mut ready_to_spawn = Vec::new();
for dependent_id in dependents {
{
let mut waiting = lock!(self.tasks_waiting, "tasks_waiting")?;
if let Some(task_meta) = waiting.get_mut(&dependent_id) {
task_meta.deps.retain(|x| *x != completed_task_id);
if task_meta.deps.is_empty() {
if let Some(task_to_spawn) = waiting.remove(&dependent_id) {
debug!(
"Dependent task {} ready to spawn (all dependencies cleared)",
dependent_id
);
ready_to_spawn.push((dependent_id, task_to_spawn));
}
} else {
debug!(
"Task {} still has {} remaining dependencies",
dependent_id,
task_meta.deps.len()
);
}
continue;
}
}
{
let mut scheduled = lock!(self.tasks_scheduled, "tasks_scheduled")?;
if let Some(scheduled_key) = scheduled
.iter()
.find(|((_, id), _)| *id == dependent_id)
.map(|((ts, id), _)| (*ts, *id))
{
if let Some(task_meta) = scheduled.get_mut(&scheduled_key) {
task_meta.deps.retain(|x| *x != completed_task_id);
let remaining = task_meta.deps.len();
if remaining == 0 {
debug!(
"Task {} in scheduled queue has no remaining dependencies",
dependent_id
);
} else {
debug!(
"Task {} in scheduled queue has {} remaining dependencies",
dependent_id, remaining
);
}
}
continue;
}
}
warn!(
"Dependent task {} of completed task {} not found in any queue",
dependent_id, completed_task_id
);
}
Ok(ready_to_spawn)
}
async fn load(&self) -> ClResult<()> {
let tasks = self.store.load().await?;
debug!("Loaded {} tasks from store", tasks.len());
for t in tasks {
if let TaskStatus::Pending = t.status {
debug!("Loading task {} {}", t.id, t.kind);
let task = {
let builder_map = self
.task_builders
.read()
.map_err(|_| Error::Internal("task_builders RwLock poisoned".into()))?;
let builder = builder_map.get(t.kind.as_ref()).ok_or(Error::Internal(
format!("task builder not registered: {}", t.kind),
))?;
builder(t.id, &t.input)?
};
let (retry_count, retry) = match t.retry_data {
Some(retry_str) => {
let (retry_count, retry_min, retry_max, retry_times) = retry_str
.split(',')
.collect_tuple()
.ok_or(Error::Internal("invalid retry policy format".into()))?;
let retry_count: u16 = retry_count
.parse()
.map_err(|_| Error::Internal("retry count must be u16".into()))?;
let retry = RetryPolicy {
wait_min_max: (
retry_min
.parse()
.map_err(|_| Error::Internal("retry_min must be u64".into()))?,
retry_max
.parse()
.map_err(|_| Error::Internal("retry_max must be u64".into()))?,
),
times: retry_times
.parse()
.map_err(|_| Error::Internal("retry times must be u64".into()))?,
};
debug!("Loaded retry policy: {:?}", retry);
(retry_count, Some(retry))
}
_ => (0, None),
};
let cron =
t.cron_data.as_ref().and_then(|cron_str| CronSchedule::parse(cron_str).ok());
let task_meta = TaskMeta {
task,
next_at: t.next_at,
deps: t.deps.into(),
retry_count,
retry,
cron,
};
self.add_queue(t.id, task_meta).await?;
}
}
Ok(())
}
fn spawn_task(&self, state: S, task: Arc<dyn Task<S>>, id: TaskId, task_meta: TaskMeta<S>) {
let tx_finish = self.tx_finish.clone();
let store = self.store.clone();
let scheduler = self.clone();
tokio::spawn(async move {
match task.run(&state).await {
Ok(()) => {
debug!("Task {} completed successfully", id);
tx_finish.send(id).unwrap_or(());
}
Err(e) => {
let is_retryable = e.is_retryable();
if let Some(retry_policy) = &task_meta.retry {
if is_retryable && retry_policy.should_retry(task_meta.retry_count) {
let backoff = retry_policy.calculate_backoff(task_meta.retry_count);
let next_at = Timestamp::from_now(backoff.cast_signed());
info!(
"Task {} failed (attempt {}/{}). Scheduling retry in {} seconds: {}",
id,
task_meta.retry_count + 1,
retry_policy.times,
backoff,
e
);
store
.update_task_error(id, &e.to_string(), Some(next_at))
.await
.unwrap_or(());
match scheduler.tasks_running.lock() {
Ok(mut tasks_running) => {
tasks_running.remove(&id);
}
Err(poisoned) => {
error!("Mutex poisoned: tasks_running (recovering)");
poisoned.into_inner().remove(&id);
}
}
let mut retry_meta = task_meta.clone();
retry_meta.retry_count += 1;
retry_meta.next_at = Some(next_at);
scheduler.add_queue(id, retry_meta).await.unwrap_or(id);
} else {
if is_retryable {
error!(
"Task {} failed after {} retries: {}",
id, task_meta.retry_count, e
);
} else {
error!("Task {} failed permanently (non-retryable): {}", id, e);
}
store.update_task_error(id, &e.to_string(), None).await.unwrap_or(());
task.on_failed(&state, task_meta.retry_count, &e.to_string()).await;
tx_finish.send(id).unwrap_or(());
}
} else {
error!("Task {} failed: {}", id, e);
store.update_task_error(id, &e.to_string(), None).await.unwrap_or(());
task.on_failed(&state, 0, &e.to_string()).await;
tx_finish.send(id).unwrap_or(());
}
}
}
});
}
pub async fn health_check(&self) -> ClResult<SchedulerHealth> {
let waiting_count = lock!(self.tasks_waiting, "tasks_waiting")?.len();
let scheduled_count = lock!(self.tasks_scheduled, "tasks_scheduled")?.len();
let running_count = lock!(self.tasks_running, "tasks_running")?.len();
let dependents_count = lock!(self.task_dependents, "task_dependents")?.len();
let mut stuck_tasks = Vec::new();
let mut tasks_with_missing_deps = Vec::new();
{
let waiting = lock!(self.tasks_waiting, "tasks_waiting")?;
let _deps_map = lock!(self.task_dependents, "task_dependents")?;
for (id, task_meta) in waiting.iter() {
if task_meta.deps.is_empty() {
stuck_tasks.push(*id);
warn!("SCHEDULER HEALTH: Task {} in waiting with no dependencies", id);
} else {
for dep in &task_meta.deps {
let dep_exists = waiting.contains_key(dep)
|| self.tasks_running.lock().ok().is_some_and(|r| r.contains_key(dep))
|| self
.tasks_scheduled
.lock()
.ok()
.is_some_and(|s| s.iter().any(|((_, task_id), _)| task_id == dep));
if !dep_exists {
tasks_with_missing_deps.push((*id, *dep));
warn!(
"SCHEDULER HEALTH: Task {} depends on non-existent task {}",
id, dep
);
}
}
}
}
}
Ok(SchedulerHealth {
waiting: waiting_count,
scheduled: scheduled_count,
running: running_count,
dependents: dependents_count,
stuck_tasks,
tasks_with_missing_deps,
})
}
}
#[derive(Debug, Clone)]
pub struct SchedulerHealth {
pub waiting: usize,
pub scheduled: usize,
pub running: usize,
pub dependents: usize,
pub stuck_tasks: Vec<TaskId>,
pub tasks_with_missing_deps: Vec<(TaskId, TaskId)>,
}
#[cfg(test)]
mod tests {
use super::*;
use serde::{Deserialize, Serialize};
type State = Arc<Mutex<Vec<u8>>>;
#[derive(Debug, Serialize, Deserialize)]
struct TestTask {
num: u8,
}
impl TestTask {
pub fn new(num: u8) -> Arc<Self> {
Arc::new(Self { num })
}
}
#[async_trait]
impl Task<State> for TestTask {
fn kind() -> &'static str {
"test"
}
fn build(_id: TaskId, ctx: &str) -> ClResult<Arc<dyn Task<State>>> {
let num: u8 = ctx
.parse()
.map_err(|_| Error::Internal("test task context must be u8".into()))?;
let task = TestTask::new(num);
Ok(task)
}
fn serialize(&self) -> String {
self.num.to_string()
}
fn kind_of(&self) -> &'static str {
"test"
}
async fn run(&self, state: &State) -> ClResult<()> {
info!("Running task {}", self.num);
tokio::time::sleep(std::time::Duration::from_millis(200 * u64::from(self.num))).await;
info!("Completed task {}", self.num);
state.lock().unwrap().push(self.num);
Ok(())
}
}
#[derive(Debug, Clone)]
struct FailingTask {
id: u8,
fail_count: u8,
attempt: Arc<Mutex<u8>>,
}
impl FailingTask {
pub fn new(id: u8, fail_count: u8) -> Arc<Self> {
Arc::new(Self { id, fail_count, attempt: Arc::new(Mutex::new(0)) })
}
}
#[async_trait]
impl Task<State> for FailingTask {
fn kind() -> &'static str {
"failing"
}
fn build(_id: TaskId, ctx: &str) -> ClResult<Arc<dyn Task<State>>> {
let parts: Vec<&str> = ctx.split(',').collect();
if parts.len() != 2 {
return Err(Error::Internal("failing task context must have 2 parts".into()));
}
let id: u8 = parts[0]
.parse()
.map_err(|_| Error::Internal("failing task id must be u8".into()))?;
let fail_count: u8 = parts[1]
.parse()
.map_err(|_| Error::Internal("failing task fail_count must be u8".into()))?;
Ok(FailingTask::new(id, fail_count))
}
fn serialize(&self) -> String {
format!("{},{}", self.id, self.fail_count)
}
fn kind_of(&self) -> &'static str {
"failing"
}
async fn run(&self, state: &State) -> ClResult<()> {
let mut attempt = self.attempt.lock().unwrap();
*attempt += 1;
let current_attempt = *attempt;
info!("FailingTask {} - attempt {}/{}", self.id, current_attempt, self.fail_count + 1);
if current_attempt <= self.fail_count {
error!("FailingTask {} failed on attempt {}", self.id, current_attempt);
return Err(Error::ServiceUnavailable(format!("Task {} failed", self.id)));
}
info!("FailingTask {} succeeded on attempt {}", self.id, current_attempt);
state.lock().unwrap().push(self.id);
Ok(())
}
}
#[tokio::test]
pub async fn test_scheduler() {
let _ = tracing_subscriber::fmt().try_init();
let task_store: Arc<dyn TaskStore<State>> = InMemoryTaskStore::new();
let state: State = Arc::new(Mutex::new(Vec::new()));
let scheduler = Scheduler::new(task_store);
scheduler.start(state.clone());
scheduler.register::<TestTask>().unwrap();
let _task1 = TestTask::new(1);
let task2 = TestTask::new(1);
let task3 = TestTask::new(1);
let task2_id = scheduler.task(task2).schedule_after(2).schedule().await.unwrap();
let task3_id = scheduler.add(task3).await.unwrap();
scheduler
.task(TestTask::new(1))
.depend_on(vec![task2_id, task3_id])
.schedule()
.await
.unwrap();
tokio::time::sleep(std::time::Duration::from_secs(4)).await;
let task4 = TestTask::new(1);
let task5 = TestTask::new(1);
scheduler.task(task4).schedule_after(2).schedule().await.unwrap();
scheduler.task(task5).schedule_after(1).schedule().await.unwrap();
tokio::time::sleep(std::time::Duration::from_secs(3)).await;
let st = state.lock().unwrap();
info!("res: {}", st.len());
let str_vec = st.iter().map(std::string::ToString::to_string).collect::<Vec<String>>();
assert_eq!(str_vec.join(":"), "1:1:1:1:1");
}
#[tokio::test]
pub async fn test_retry_with_backoff() {
let _ = tracing_subscriber::fmt().try_init();
let task_store: Arc<dyn TaskStore<State>> = InMemoryTaskStore::new();
let state: State = Arc::new(Mutex::new(Vec::new()));
let scheduler = Scheduler::new(task_store);
scheduler.start(state.clone());
scheduler.register::<FailingTask>().unwrap();
let failing_task = FailingTask::new(42, 2);
let retry_policy = RetryPolicy { wait_min_max: (1, 3600), times: 3 };
scheduler.task(failing_task).with_retry(retry_policy).schedule().await.unwrap();
tokio::time::sleep(std::time::Duration::from_secs(6)).await;
let st = state.lock().unwrap();
assert_eq!(st.len(), 1, "Task should have succeeded after retries");
assert_eq!(st[0], 42);
}
#[tokio::test]
pub async fn test_builder_simple_schedule() {
let task_store: Arc<dyn TaskStore<State>> = InMemoryTaskStore::new();
let state: State = Arc::new(Mutex::new(Vec::new()));
let scheduler = Scheduler::new(task_store);
scheduler.start(state.clone());
scheduler.register::<TestTask>().unwrap();
let task = TestTask::new(1);
let id = scheduler.task(task).now().await.unwrap();
assert!(id > 0, "Task ID should be positive");
tokio::time::sleep(std::time::Duration::from_millis(500)).await;
let st = state.lock().unwrap();
assert_eq!(st.len(), 1, "Task should have executed");
assert_eq!(st[0], 1);
}
#[tokio::test]
pub async fn test_builder_with_key() {
let task_store: Arc<dyn TaskStore<State>> = InMemoryTaskStore::new();
let state: State = Arc::new(Mutex::new(Vec::new()));
let scheduler = Scheduler::new(task_store);
scheduler.start(state.clone());
scheduler.register::<TestTask>().unwrap();
let task = TestTask::new(1);
let _id = scheduler.task(task).key("my-task-key").now().await.unwrap();
tokio::time::sleep(std::time::Duration::from_millis(500)).await;
let st = state.lock().unwrap();
assert_eq!(st.len(), 1);
assert_eq!(st[0], 1);
}
#[tokio::test]
pub async fn test_builder_with_delay() {
let task_store: Arc<dyn TaskStore<State>> = InMemoryTaskStore::new();
let state: State = Arc::new(Mutex::new(Vec::new()));
let scheduler = Scheduler::new(task_store);
scheduler.start(state.clone());
scheduler.register::<TestTask>().unwrap();
let task = TestTask::new(1);
let _id = scheduler
.task(task)
.after(1) .await
.unwrap();
tokio::time::sleep(std::time::Duration::from_millis(500)).await;
{
let st = state.lock().unwrap();
assert_eq!(st.len(), 0, "Task should not execute yet");
}
tokio::time::sleep(std::time::Duration::from_millis(800)).await;
{
let st = state.lock().unwrap();
assert_eq!(st.len(), 1, "Task should have executed");
assert_eq!(st[0], 1);
}
}
#[tokio::test]
pub async fn test_builder_with_dependencies() {
let task_store: Arc<dyn TaskStore<State>> = InMemoryTaskStore::new();
let state: State = Arc::new(Mutex::new(Vec::new()));
let scheduler = Scheduler::new(task_store);
scheduler.start(state.clone());
scheduler.register::<TestTask>().unwrap();
let task1 = TestTask::new(1);
let id1 = scheduler.task(task1).now().await.unwrap();
let task2 = TestTask::new(1);
let id2 = scheduler.task(task2).now().await.unwrap();
let task3 = TestTask::new(1);
let _id3 = scheduler.task(task3).depend_on(vec![id1, id2]).schedule().await.unwrap();
tokio::time::sleep(std::time::Duration::from_millis(1500)).await;
let st = state.lock().unwrap();
let str_vec = st.iter().map(std::string::ToString::to_string).collect::<Vec<String>>();
assert_eq!(str_vec.join(":"), "1:1:1");
}
#[tokio::test]
pub async fn test_builder_with_retry() {
let task_store: Arc<dyn TaskStore<State>> = InMemoryTaskStore::new();
let state: State = Arc::new(Mutex::new(Vec::new()));
let scheduler = Scheduler::new(task_store);
scheduler.start(state.clone());
scheduler.register::<FailingTask>().unwrap();
let failing_task = FailingTask::new(55, 1); let retry_policy = RetryPolicy { wait_min_max: (1, 3600), times: 3 };
let _id = scheduler.task(failing_task).with_retry(retry_policy).schedule().await.unwrap();
tokio::time::sleep(std::time::Duration::from_secs(3)).await;
let st = state.lock().unwrap();
assert_eq!(st.len(), 1);
assert_eq!(st[0], 55);
}
#[tokio::test]
pub async fn test_builder_with_automatic_retry() {
let task_store: Arc<dyn TaskStore<State>> = InMemoryTaskStore::new();
let state: State = Arc::new(Mutex::new(Vec::new()));
let scheduler = Scheduler::new(task_store);
scheduler.start(state.clone());
scheduler.register::<FailingTask>().unwrap();
let failing_task = FailingTask::new(66, 1);
let _id = scheduler.task(failing_task).with_automatic_retry().await.unwrap();
tokio::time::sleep(std::time::Duration::from_millis(500)).await;
let st = state.lock().unwrap();
let _ = st.len(); }
#[tokio::test]
pub async fn test_builder_fluent_chaining() {
let task_store: Arc<dyn TaskStore<State>> = InMemoryTaskStore::new();
let state: State = Arc::new(Mutex::new(Vec::new()));
let scheduler = Scheduler::new(task_store);
scheduler.start(state.clone());
scheduler.register::<TestTask>().unwrap();
let dep1 = scheduler.task(TestTask::new(1)).now().await.unwrap();
let dep2 = scheduler.task(TestTask::new(1)).now().await.unwrap();
let retry_policy = RetryPolicy { wait_min_max: (1, 3600), times: 3 };
let task = TestTask::new(1);
let _id = scheduler
.task(task)
.key("complex-task")
.schedule_after(0) .depend_on(vec![dep1, dep2])
.with_retry(retry_policy)
.schedule()
.await
.unwrap();
tokio::time::sleep(std::time::Duration::from_millis(800)).await;
let st = state.lock().unwrap();
let str_vec = st.iter().map(std::string::ToString::to_string).collect::<Vec<String>>();
assert_eq!(str_vec.join(":"), "1:1:1");
}
#[tokio::test]
pub async fn test_builder_backward_compatibility() {
let task_store: Arc<dyn TaskStore<State>> = InMemoryTaskStore::new();
let state: State = Arc::new(Mutex::new(Vec::new()));
let scheduler = Scheduler::new(task_store);
scheduler.start(state.clone());
scheduler.register::<TestTask>().unwrap();
let _id1 = scheduler.add(TestTask::new(1)).await.unwrap();
let _id2 = scheduler.task(TestTask::new(1)).now().await.unwrap();
tokio::time::sleep(std::time::Duration::from_millis(800)).await;
let st = state.lock().unwrap();
assert_eq!(st.len(), 2);
let str_vec = st.iter().map(std::string::ToString::to_string).collect::<Vec<String>>();
assert_eq!(str_vec.join(":"), "1:1");
}
#[tokio::test]
pub async fn test_builder_pipeline_scenario() {
let task_store: Arc<dyn TaskStore<State>> = InMemoryTaskStore::new();
let state: State = Arc::new(Mutex::new(Vec::new()));
let scheduler = Scheduler::new(task_store);
scheduler.start(state.clone());
scheduler.register::<TestTask>().unwrap();
let id1 = scheduler.task(TestTask::new(1)).key("stage-1").now().await.unwrap();
let id2 = scheduler.task(TestTask::new(1)).key("stage-2").after_task(id1).await.unwrap();
let _id3 = scheduler.task(TestTask::new(1)).key("stage-3").after_task(id2).await.unwrap();
tokio::time::sleep(std::time::Duration::from_millis(1200)).await;
let st = state.lock().unwrap();
let str_vec = st.iter().map(std::string::ToString::to_string).collect::<Vec<String>>();
assert_eq!(str_vec.join(":"), "1:1:1");
}
#[tokio::test]
pub async fn test_builder_multi_dependency_join() {
let task_store: Arc<dyn TaskStore<State>> = InMemoryTaskStore::new();
let state: State = Arc::new(Mutex::new(Vec::new()));
let scheduler = Scheduler::new(task_store);
scheduler.start(state.clone());
scheduler.register::<TestTask>().unwrap();
let id1 = scheduler.task(TestTask::new(1)).now().await.unwrap();
let id2 = scheduler.task(TestTask::new(1)).now().await.unwrap();
let _id3 = scheduler
.task(TestTask::new(1))
.depend_on(vec![id1, id2])
.schedule()
.await
.unwrap();
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
let st = state.lock().unwrap();
let str_vec = st.iter().map(std::string::ToString::to_string).collect::<Vec<String>>();
assert_eq!(str_vec.join(":"), "1:1:1");
}
#[tokio::test]
pub async fn test_builder_scheduled_task_with_dependencies() {
let task_store: Arc<dyn TaskStore<State>> = InMemoryTaskStore::new();
let state: State = Arc::new(Mutex::new(Vec::new()));
let scheduler = Scheduler::new(task_store);
scheduler.start(state.clone());
scheduler.register::<TestTask>().unwrap();
let dep_id = scheduler.task(TestTask::new(1)).now().await.unwrap();
let ts = Timestamp::from_now(1);
let _task_id = scheduler
.task(TestTask::new(1))
.schedule_at(ts)
.depend_on(vec![dep_id])
.schedule()
.await
.unwrap();
tokio::time::sleep(std::time::Duration::from_millis(300)).await;
{
let st = state.lock().unwrap();
assert_eq!(st.len(), 1); }
tokio::time::sleep(std::time::Duration::from_millis(800)).await;
{
let st = state.lock().unwrap();
let str_vec = st.iter().map(std::string::ToString::to_string).collect::<Vec<String>>();
assert_eq!(str_vec.join(":"), "1:1");
}
}
#[tokio::test]
pub async fn test_builder_mixed_features() {
let task_store: Arc<dyn TaskStore<State>> = InMemoryTaskStore::new();
let state: State = Arc::new(Mutex::new(Vec::new()));
let scheduler = Scheduler::new(task_store);
scheduler.start(state.clone());
scheduler.register::<TestTask>().unwrap();
scheduler.register::<FailingTask>().unwrap();
let id1 = scheduler.task(TestTask::new(1)).now().await.unwrap();
let _id2 = scheduler
.task(TestTask::new(1))
.key("critical-task")
.schedule_after(0)
.depend_on(vec![id1])
.schedule()
.await
.unwrap();
let _id3 = scheduler
.task(FailingTask::new(1, 0)) .key("retryable-task")
.with_retry(RetryPolicy {
wait_min_max: (1, 3600),
times: 3,
})
.schedule()
.await
.unwrap();
tokio::time::sleep(std::time::Duration::from_millis(1200)).await;
let st = state.lock().unwrap();
let str_vec = st.iter().map(std::string::ToString::to_string).collect::<Vec<String>>();
assert_eq!(str_vec.join(":"), "1:1:1");
}
#[tokio::test]
pub async fn test_builder_builder_reuse_not_possible() {
let task_store: Arc<dyn TaskStore<State>> = InMemoryTaskStore::new();
let _state: State = Arc::new(Mutex::new(Vec::new()));
let scheduler = Scheduler::new(task_store);
let task = TestTask::new(1);
let builder = scheduler.task(task);
let _id = builder.now().await.unwrap();
}
#[tokio::test]
pub async fn test_builder_different_task_types() {
let task_store: Arc<dyn TaskStore<State>> = InMemoryTaskStore::new();
let state: State = Arc::new(Mutex::new(Vec::new()));
let scheduler = Scheduler::new(task_store);
scheduler.start(state.clone());
scheduler.register::<TestTask>().unwrap();
scheduler.register::<FailingTask>().unwrap();
let _id1 = scheduler.task(TestTask::new(1)).key("test-task").now().await.unwrap();
let _id2 = scheduler
.task(FailingTask::new(1, 0)) .key("failing-task")
.now()
.await
.unwrap();
let _id3 = scheduler.task(TestTask::new(1)).now().await.unwrap();
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
let st = state.lock().unwrap();
assert_eq!(st.len(), 3);
let str_vec = st.iter().map(std::string::ToString::to_string).collect::<Vec<String>>();
assert_eq!(str_vec.join(":"), "1:1:1");
}
#[tokio::test]
pub async fn test_builder_cron_placeholder_syntax() {
let task_store: Arc<dyn TaskStore<State>> = InMemoryTaskStore::new();
let state: State = Arc::new(Mutex::new(Vec::new()));
let scheduler = Scheduler::new(task_store);
scheduler.start(state.clone());
scheduler.register::<TestTask>().unwrap();
let task = TestTask::new(1);
let _id = scheduler
.task(task)
.key("cron-task")
.cron("0 9 * * *") .schedule()
.await
.unwrap();
tokio::time::sleep(std::time::Duration::from_millis(500)).await;
let st = state.lock().unwrap();
assert_eq!(st.len(), 0); }
#[tokio::test]
pub async fn test_builder_daily_at_placeholder() {
let task_store: Arc<dyn TaskStore<State>> = InMemoryTaskStore::new();
let state: State = Arc::new(Mutex::new(Vec::new()));
let scheduler = Scheduler::new(task_store);
scheduler.start(state.clone());
scheduler.register::<TestTask>().unwrap();
let task = TestTask::new(1);
let _id = scheduler
.task(task)
.key("daily-task")
.daily_at(14, 30) .schedule()
.await
.unwrap();
tokio::time::sleep(std::time::Duration::from_millis(500)).await;
let st = state.lock().unwrap();
assert_eq!(st.len(), 0);
}
#[tokio::test]
pub async fn test_builder_weekly_at_placeholder() {
let task_store: Arc<dyn TaskStore<State>> = InMemoryTaskStore::new();
let state: State = Arc::new(Mutex::new(Vec::new()));
let scheduler = Scheduler::new(task_store);
scheduler.start(state.clone());
scheduler.register::<TestTask>().unwrap();
let task = TestTask::new(1);
let _id = scheduler
.task(task)
.key("weekly-task")
.weekly_at(1, 9, 0) .schedule()
.await
.unwrap();
tokio::time::sleep(std::time::Duration::from_millis(500)).await;
let st = state.lock().unwrap();
assert_eq!(st.len(), 0);
}
#[tokio::test]
pub async fn test_builder_cron_with_retry() {
let task_store: Arc<dyn TaskStore<State>> = InMemoryTaskStore::new();
let state: State = Arc::new(Mutex::new(Vec::new()));
let scheduler = Scheduler::new(task_store);
scheduler.start(state.clone());
scheduler.register::<TestTask>().unwrap();
let task = TestTask::new(1);
let _id = scheduler
.task(task)
.key("reliable-scheduled-task")
.daily_at(2, 0) .with_retry(RetryPolicy {
wait_min_max: (60, 3600),
times: 5,
})
.schedule()
.await
.unwrap();
tokio::time::sleep(std::time::Duration::from_millis(500)).await;
let st = state.lock().unwrap();
assert_eq!(st.len(), 0);
}
#[test]
fn test_cron_to_string() {
let cron = CronSchedule::parse("*/5 * * * *").unwrap();
assert_eq!(cron.to_cron_string(), "*/5 * * * *");
}
#[tokio::test]
pub async fn test_running_task_not_double_scheduled() {
let _ = tracing_subscriber::fmt().try_init();
let task_store: Arc<dyn TaskStore<State>> = InMemoryTaskStore::new();
let state: State = Arc::new(Mutex::new(Vec::new()));
let scheduler = Scheduler::new(task_store);
scheduler.start(state.clone());
scheduler.register::<TestTask>().unwrap();
let task = TestTask::new(5); let task_id = scheduler.add(task.clone()).await.unwrap();
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
{
let running = scheduler.tasks_running.lock().unwrap();
assert!(running.contains_key(&task_id), "Task should be in running queue");
}
let task_meta = TaskMeta {
task: task.clone(),
next_at: Some(Timestamp::now()),
deps: vec![],
retry_count: 0,
retry: None,
cron: None,
};
let result = scheduler.add_queue(task_id, task_meta).await;
assert!(result.is_ok(), "add_queue should succeed");
{
let sched_queue = scheduler.tasks_scheduled.lock().unwrap();
let in_scheduled = sched_queue.iter().any(|((_, id), _)| *id == task_id);
assert!(!in_scheduled, "Task should NOT be in scheduled queue while running");
}
tokio::time::sleep(std::time::Duration::from_secs(2)).await;
let st = state.lock().unwrap();
assert_eq!(st.len(), 1, "Only one task execution should have occurred");
assert_eq!(st[0], 5);
}
#[tokio::test]
pub async fn test_running_task_metadata_updated() {
let _ = tracing_subscriber::fmt().try_init();
let task_store: Arc<dyn TaskStore<State>> = InMemoryTaskStore::new();
let state: State = Arc::new(Mutex::new(Vec::new()));
let scheduler = Scheduler::new(task_store);
scheduler.start(state.clone());
scheduler.register::<TestTask>().unwrap();
let task = TestTask::new(5); let task_id = scheduler.add(task.clone()).await.unwrap();
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
{
let running = scheduler.tasks_running.lock().unwrap();
let meta = running.get(&task_id).expect("Task should be running");
assert!(meta.cron.is_none(), "Task should have no cron initially");
}
let cron = CronSchedule::parse("*/5 * * * *").unwrap();
let task_meta_with_cron = TaskMeta {
task: task.clone(),
next_at: Some(Timestamp::now()),
deps: vec![],
retry_count: 0,
retry: None,
cron: Some(cron.clone()),
};
let result = scheduler.add_queue(task_id, task_meta_with_cron).await;
assert!(result.is_ok(), "add_queue should succeed");
{
let running = scheduler.tasks_running.lock().unwrap();
let meta = running.get(&task_id).expect("Task should still be running");
assert!(meta.cron.is_some(), "Task should now have cron after update");
}
tokio::time::sleep(std::time::Duration::from_secs(2)).await;
}
}