Skip to main content

cloudillo_core/
scheduler.rs

1// SPDX-FileCopyrightText: Szilárd Hajba
2// SPDX-License-Identifier: LGPL-3.0-or-later
3
4//! Scheduler subsystem. Handles async tasks, dependencies, fallbacks, repetitions, persistence..
5
6use async_trait::async_trait;
7use itertools::Itertools;
8use std::{
9	collections::{BTreeMap, HashMap},
10	fmt::Debug,
11	sync::{Arc, Mutex, RwLock},
12};
13
14use chrono::{DateTime, Utc};
15use croner::Cron;
16use std::str::FromStr;
17
18use crate::prelude::*;
19use cloudillo_types::{lock, meta_adapter};
20
21pub type TaskId = u64;
22
23pub enum TaskType {
24	Periodic,
25	Once,
26}
27
28/// Cron schedule wrapper using the croner crate
29/// Stores the expression string for serialization
30#[derive(Debug, Clone)]
31pub struct CronSchedule {
32	/// The original cron expression string
33	expr: Box<str>,
34	/// Parsed cron object
35	cron: Cron,
36}
37
38impl CronSchedule {
39	/// Parse a cron expression (5 fields: minute hour day month weekday)
40	pub fn parse(expr: &str) -> ClResult<Self> {
41		let cron = Cron::from_str(expr)
42			.map_err(|e| Error::ValidationError(format!("invalid cron expression: {}", e)))?;
43		Ok(Self { expr: expr.into(), cron })
44	}
45
46	/// Calculate the next execution time after the given timestamp
47	///
48	/// Returns an error if no next occurrence can be found (should be rare
49	/// for valid expressions within reasonable time bounds).
50	pub fn next_execution(&self, after: Timestamp) -> ClResult<Timestamp> {
51		let dt = DateTime::<Utc>::from_timestamp(after.0, 0).unwrap_or_else(Utc::now);
52
53		self.cron
54			.find_next_occurrence(&dt, false)
55			.map(|next| Timestamp(next.timestamp()))
56			.map_err(|e| {
57				tracing::error!("Failed to find next cron occurrence for '{}': {}", self.expr, e);
58				Error::ValidationError(format!("cron next_execution failed: {}", e))
59			})
60	}
61
62	/// Convert back to cron expression string
63	pub fn to_cron_string(&self) -> String {
64		self.expr.to_string()
65	}
66}
67
68impl PartialEq for CronSchedule {
69	fn eq(&self, other: &Self) -> bool {
70		self.expr == other.expr
71	}
72}
73
74impl Eq for CronSchedule {}
75
76#[async_trait]
77pub trait Task<S: Clone>: Send + Sync + Debug {
78	fn kind() -> &'static str
79	where
80		Self: Sized;
81	fn build(id: TaskId, context: &str) -> ClResult<Arc<dyn Task<S>>>
82	where
83		Self: Sized;
84	fn serialize(&self) -> String;
85	async fn run(&self, state: &S) -> ClResult<()>;
86
87	fn kind_of(&self) -> &'static str;
88
89	/// Called when the task transitions to `Failed` after exhausting retries
90	/// (or on the very first failure when no retry policy is set). Lets the
91	/// task perform irreversible cleanup — e.g. mark a related domain row as
92	/// permanently failed — that should not happen on retryable failures.
93	/// Default: no-op.
94	async fn on_failed(&self, _state: &S, _attempts: u16, _last_error: &str) {}
95}
96
97#[derive(Debug)]
98pub enum TaskStatus {
99	Pending,
100	Completed,
101	Failed,
102}
103
104pub struct TaskData {
105	id: TaskId,
106	kind: Box<str>,
107	status: TaskStatus,
108	input: Box<str>,
109	deps: Box<[TaskId]>,
110	retry_data: Option<Box<str>>,
111	cron_data: Option<Box<str>>,
112	next_at: Option<Timestamp>,
113}
114
115#[async_trait]
116pub trait TaskStore<S: Clone>: Send + Sync {
117	async fn add(&self, task: &TaskMeta<S>, key: Option<&str>) -> ClResult<TaskId>;
118	async fn finished(&self, id: TaskId, output: &str) -> ClResult<()>;
119	async fn load(&self) -> ClResult<Vec<TaskData>>;
120	async fn update_task_error(
121		&self,
122		task_id: TaskId,
123		output: &str,
124		next_at: Option<Timestamp>,
125	) -> ClResult<()>;
126	async fn find_by_key(&self, key: &str) -> ClResult<Option<(TaskId, TaskData)>>;
127	async fn update_task(&self, id: TaskId, task: &TaskMeta<S>) -> ClResult<()>;
128	async fn find_completed_deps(&self, deps: &[TaskId]) -> ClResult<Vec<TaskId>>;
129}
130
131// InMemoryTaskStore
132//*******************
133pub struct InMemoryTaskStore {
134	last_id: Mutex<TaskId>,
135}
136
137impl InMemoryTaskStore {
138	pub fn new() -> Arc<Self> {
139		Arc::new(Self { last_id: Mutex::new(0) })
140	}
141}
142
143#[async_trait]
144impl<S: Clone> TaskStore<S> for InMemoryTaskStore {
145	async fn add(&self, _task: &TaskMeta<S>, _key: Option<&str>) -> ClResult<TaskId> {
146		let mut last_id = lock!(self.last_id)?;
147		*last_id += 1;
148		Ok(*last_id)
149	}
150
151	async fn finished(&self, _id: TaskId, _output: &str) -> ClResult<()> {
152		Ok(())
153	}
154
155	async fn load(&self) -> ClResult<Vec<TaskData>> {
156		Ok(vec![])
157	}
158
159	async fn update_task_error(
160		&self,
161		_task_id: TaskId,
162		_output: &str,
163		_next_at: Option<Timestamp>,
164	) -> ClResult<()> {
165		Ok(())
166	}
167
168	async fn find_by_key(&self, _key: &str) -> ClResult<Option<(TaskId, TaskData)>> {
169		// In-memory store doesn't support persistence or keys
170		Ok(None)
171	}
172
173	async fn update_task(&self, _id: TaskId, _task: &TaskMeta<S>) -> ClResult<()> {
174		// In-memory store doesn't support persistence
175		Ok(())
176	}
177
178	async fn find_completed_deps(&self, _deps: &[TaskId]) -> ClResult<Vec<TaskId>> {
179		Ok(vec![])
180	}
181}
182
183// MetaAdapterTaskStore
184//**********************
185pub struct MetaAdapterTaskStore {
186	meta_adapter: Arc<dyn meta_adapter::MetaAdapter>,
187}
188
189impl MetaAdapterTaskStore {
190	pub fn new(meta_adapter: Arc<dyn meta_adapter::MetaAdapter>) -> Arc<Self> {
191		Arc::new(Self { meta_adapter })
192	}
193}
194
195#[async_trait]
196impl<S: Clone> TaskStore<S> for MetaAdapterTaskStore {
197	async fn add(&self, task: &TaskMeta<S>, key: Option<&str>) -> ClResult<TaskId> {
198		let id = self
199			.meta_adapter
200			.create_task(task.task.kind_of(), key, &task.task.serialize(), &task.deps)
201			.await?;
202
203		// Store cron schedule if present
204		if let Some(cron) = &task.cron {
205			self.meta_adapter
206				.update_task(
207					id,
208					&meta_adapter::TaskPatch {
209						cron: Patch::Value(cron.to_cron_string()),
210						..Default::default()
211					},
212				)
213				.await?;
214		}
215
216		Ok(id)
217	}
218
219	async fn finished(&self, id: TaskId, output: &str) -> ClResult<()> {
220		self.meta_adapter.update_task_finished(id, output).await
221	}
222
223	async fn load(&self) -> ClResult<Vec<TaskData>> {
224		let tasks = self.meta_adapter.list_tasks(meta_adapter::ListTaskOptions::default()).await?;
225		let tasks = tasks
226			.into_iter()
227			.map(|t| TaskData {
228				id: t.task_id,
229				kind: t.kind,
230				status: match t.status {
231					'P' => TaskStatus::Pending,
232					'F' => TaskStatus::Completed,
233					// 'E' or unknown status = Failed
234					_ => TaskStatus::Failed,
235				},
236				input: t.input,
237				deps: t.deps,
238				retry_data: t.retry,
239				cron_data: t.cron,
240				next_at: t.next_at,
241			})
242			.collect();
243		Ok(tasks)
244	}
245
246	async fn update_task_error(
247		&self,
248		task_id: TaskId,
249		output: &str,
250		next_at: Option<Timestamp>,
251	) -> ClResult<()> {
252		self.meta_adapter.update_task_error(task_id, output, next_at).await
253	}
254
255	async fn find_by_key(&self, key: &str) -> ClResult<Option<(TaskId, TaskData)>> {
256		let task_opt = self.meta_adapter.find_task_by_key(key).await?;
257
258		match task_opt {
259			Some(t) => Ok(Some((
260				t.task_id,
261				TaskData {
262					id: t.task_id,
263					kind: t.kind,
264					status: match t.status {
265						'P' => TaskStatus::Pending,
266						'F' => TaskStatus::Completed,
267						// 'E' or unknown status = Failed
268						_ => TaskStatus::Failed,
269					},
270					input: t.input,
271					deps: t.deps,
272					retry_data: t.retry,
273					cron_data: t.cron,
274					next_at: t.next_at,
275				},
276			))),
277			None => Ok(None),
278		}
279	}
280
281	async fn update_task(&self, id: TaskId, task: &TaskMeta<S>) -> ClResult<()> {
282		use cloudillo_types::types::Patch;
283
284		// Build TaskPatch from TaskMeta
285		let mut patch = meta_adapter::TaskPatch {
286			input: Patch::Value(task.task.serialize()),
287			next_at: match task.next_at {
288				Some(ts) => Patch::Value(ts),
289				None => Patch::Null,
290			},
291			..Default::default()
292		};
293
294		// Update deps
295		if !task.deps.is_empty() {
296			patch.deps = Patch::Value(task.deps.clone());
297		}
298
299		// Update retry policy
300		if let Some(ref retry) = task.retry {
301			let retry_str = format!(
302				"{},{},{},{}",
303				task.retry_count, retry.wait_min_max.0, retry.wait_min_max.1, retry.times
304			);
305			patch.retry = Patch::Value(retry_str);
306		}
307
308		// Update cron schedule
309		if let Some(ref cron) = task.cron {
310			patch.cron = Patch::Value(cron.to_cron_string());
311		}
312
313		self.meta_adapter.update_task(id, &patch).await
314	}
315
316	async fn find_completed_deps(&self, deps: &[TaskId]) -> ClResult<Vec<TaskId>> {
317		self.meta_adapter.find_completed_deps(deps).await
318	}
319}
320
321// Task metadata
322type TaskBuilder<S> = dyn Fn(TaskId, &str) -> ClResult<Arc<dyn Task<S>>> + Send + Sync;
323
324#[derive(Debug, Clone)]
325pub struct RetryPolicy {
326	wait_min_max: (u64, u64),
327	times: u16,
328}
329
330impl Default for RetryPolicy {
331	fn default() -> Self {
332		Self { wait_min_max: (60, 3600), times: 10 }
333	}
334}
335
336impl RetryPolicy {
337	/// Create a new RetryPolicy with custom min/max backoff and number of retries
338	pub fn new(wait_min_max: (u64, u64), times: u16) -> Self {
339		Self { wait_min_max, times }
340	}
341
342	/// Calculate exponential backoff in seconds: min * (2^attempt), capped at max
343	pub fn calculate_backoff(&self, attempt_count: u16) -> u64 {
344		let (min, max) = self.wait_min_max;
345		let backoff = min * (1u64 << u64::from(attempt_count));
346		backoff.min(max)
347	}
348
349	/// Check if we should continue retrying
350	pub fn should_retry(&self, attempt_count: u16) -> bool {
351		attempt_count < self.times
352	}
353}
354
355// TaskSchedulerBuilder - Fluent API for task scheduling
356//************************************************************
357pub struct TaskSchedulerBuilder<'a, S: Clone> {
358	scheduler: &'a Scheduler<S>,
359	task: Arc<dyn Task<S>>,
360	key: Option<String>,
361	next_at: Option<Timestamp>,
362	deps: Vec<TaskId>,
363	retry: Option<RetryPolicy>,
364	cron: Option<CronSchedule>,
365}
366
367impl<'a, S: Clone + Send + Sync + 'static> TaskSchedulerBuilder<'a, S> {
368	/// Create a new builder for scheduling a task
369	fn new(scheduler: &'a Scheduler<S>, task: Arc<dyn Task<S>>) -> Self {
370		Self {
371			scheduler,
372			task,
373			key: None,
374			next_at: None,
375			deps: Vec::new(),
376			retry: None,
377			cron: None,
378		}
379	}
380
381	/// Set a string key for task identification
382	pub fn key(mut self, key: impl Into<String>) -> Self {
383		self.key = Some(key.into());
384		self
385	}
386
387	/// Schedule for a specific absolute timestamp
388	pub fn schedule_at(mut self, timestamp: Timestamp) -> Self {
389		self.next_at = Some(timestamp);
390		self
391	}
392
393	/// Schedule after a relative delay (in seconds)
394	pub fn schedule_after(mut self, seconds: i64) -> Self {
395		self.next_at = Some(Timestamp::from_now(seconds));
396		self
397	}
398
399	/// Add task dependencies - task waits for all of these to complete
400	pub fn depend_on(mut self, deps: Vec<TaskId>) -> Self {
401		self.deps = deps;
402		self
403	}
404
405	/// Add a single task dependency
406	pub fn depends_on(mut self, dep: TaskId) -> Self {
407		self.deps.push(dep);
408		self
409	}
410
411	/// Enable automatic retry with exponential backoff
412	pub fn with_retry(mut self, policy: RetryPolicy) -> Self {
413		self.retry = Some(policy);
414		self
415	}
416
417	// ===== Cron Scheduling Methods =====
418
419	/// Schedule task with cron expression
420	/// Example: `.cron("0 9 * * *")` for 9 AM daily
421	pub fn cron(mut self, expr: impl Into<String>) -> Self {
422		if let Ok(cron_schedule) = CronSchedule::parse(&expr.into()) {
423			// Calculate initial next_at from cron schedule
424			// Use .ok() - cron was just parsed successfully, should never fail
425			self.next_at = cron_schedule.next_execution(Timestamp::now()).ok();
426			self.cron = Some(cron_schedule);
427		}
428		self
429	}
430
431	/// Schedule task daily at specified time
432	/// Example: `.daily_at(2, 30)` for 2:30 AM daily
433	pub fn daily_at(mut self, hour: u8, minute: u8) -> Self {
434		if hour <= 23 && minute <= 59 {
435			let expr = format!("{} {} * * *", minute, hour);
436			if let Ok(cron_schedule) = CronSchedule::parse(&expr) {
437				// Calculate initial next_at from cron schedule
438				// Use .ok() - cron was just parsed successfully, should never fail
439				self.next_at = cron_schedule.next_execution(Timestamp::now()).ok();
440				self.cron = Some(cron_schedule);
441			}
442		}
443		self
444	}
445
446	/// Schedule task weekly at specified day and time
447	/// Example: `.weekly_at(1, 14, 30)` for Mondays at 2:30 PM
448	/// weekday: 0=Sunday, 1=Monday, ..., 6=Saturday
449	pub fn weekly_at(mut self, weekday: u8, hour: u8, minute: u8) -> Self {
450		if weekday <= 6 && hour <= 23 && minute <= 59 {
451			let expr = format!("{} {} * * {}", minute, hour, weekday);
452			if let Ok(cron_schedule) = CronSchedule::parse(&expr) {
453				// Calculate initial next_at from cron schedule
454				// Use .ok() - cron was just parsed successfully, should never fail
455				self.next_at = cron_schedule.next_execution(Timestamp::now()).ok();
456				self.cron = Some(cron_schedule);
457			}
458		}
459		self
460	}
461
462	/// Execute the scheduled task immediately
463	pub async fn now(self) -> ClResult<TaskId> {
464		self.schedule().await
465	}
466
467	/// Execute the scheduled task at a specific timestamp
468	pub async fn at(mut self, ts: Timestamp) -> ClResult<TaskId> {
469		self.next_at = Some(ts);
470		self.schedule().await
471	}
472
473	/// Execute the scheduled task after a delay (in seconds)
474	pub async fn after(mut self, seconds: i64) -> ClResult<TaskId> {
475		self.next_at = Some(Timestamp::from_now(seconds));
476		self.schedule().await
477	}
478
479	/// Execute the scheduled task after another task completes
480	pub async fn after_task(mut self, dep: TaskId) -> ClResult<TaskId> {
481		self.deps.push(dep);
482		self.schedule().await
483	}
484
485	/// Execute the scheduled task with automatic retry using default policy
486	pub async fn with_automatic_retry(mut self) -> ClResult<TaskId> {
487		self.retry = Some(RetryPolicy::default());
488		self.schedule().await
489	}
490
491	/// Execute the task with all configured options - main terminal method
492	pub async fn schedule(self) -> ClResult<TaskId> {
493		self.scheduler
494			.schedule_task_impl(
495				self.task,
496				self.key.as_deref(),
497				self.next_at,
498				if self.deps.is_empty() { None } else { Some(self.deps) },
499				self.retry,
500				self.cron,
501			)
502			.await
503	}
504}
505
506#[derive(Debug, Clone)]
507pub struct TaskMeta<S: Clone> {
508	pub task: Arc<dyn Task<S>>,
509	pub next_at: Option<Timestamp>,
510	pub deps: Vec<TaskId>,
511	retry_count: u16,
512	pub retry: Option<RetryPolicy>,
513	pub cron: Option<CronSchedule>,
514}
515
516type TaskBuilderRegistry<S> = HashMap<&'static str, Box<TaskBuilder<S>>>;
517type ScheduledTaskMap<S> = BTreeMap<(Timestamp, TaskId), TaskMeta<S>>;
518
519// Scheduler
520#[derive(Clone)]
521pub struct Scheduler<S: Clone> {
522	task_builders: Arc<RwLock<TaskBuilderRegistry<S>>>,
523	store: Arc<dyn TaskStore<S>>,
524	tasks_running: Arc<Mutex<HashMap<TaskId, TaskMeta<S>>>>,
525	tasks_waiting: Arc<Mutex<HashMap<TaskId, TaskMeta<S>>>>,
526	task_dependents: Arc<Mutex<HashMap<TaskId, Vec<TaskId>>>>,
527	tasks_scheduled: Arc<Mutex<ScheduledTaskMap<S>>>,
528	tx_finish: flume::Sender<TaskId>,
529	rx_finish: flume::Receiver<TaskId>,
530	notify_schedule: Arc<tokio::sync::Notify>,
531}
532
533impl<S: Clone + Send + Sync + 'static> Scheduler<S> {
534	pub fn new(store: Arc<dyn TaskStore<S>>) -> Arc<Self> {
535		let (tx_finish, rx_finish) = flume::unbounded();
536
537		let scheduler = Self {
538			task_builders: Arc::new(RwLock::new(HashMap::new())),
539			store,
540			tasks_running: Arc::new(Mutex::new(HashMap::new())),
541			tasks_waiting: Arc::new(Mutex::new(HashMap::new())),
542			task_dependents: Arc::new(Mutex::new(HashMap::new())),
543			tasks_scheduled: Arc::new(Mutex::new(BTreeMap::new())),
544			tx_finish,
545			rx_finish,
546			notify_schedule: Arc::new(tokio::sync::Notify::new()),
547		};
548
549		//scheduler.run(rx_finish)?;
550
551		Arc::new(scheduler)
552	}
553
554	pub fn start(&self, state: S) {
555		// Handle finished tasks and dependencies
556		let schedule = self.clone();
557		let stat = state.clone();
558		let rx_finish = self.rx_finish.clone();
559
560		tokio::spawn(async move {
561			while let Ok(id) = rx_finish.recv_async().await {
562				debug!("Completed task {} (notified)", id);
563
564				// Get task metadata WITHOUT removing - we only remove after successful transition
565				let task_meta_opt = {
566					let tasks_running = match schedule.tasks_running.lock() {
567						Ok(guard) => guard,
568						Err(poisoned) => {
569							error!("Mutex poisoned: tasks_running (recovering)");
570							poisoned.into_inner()
571						}
572					};
573					tasks_running.get(&id).cloned()
574				};
575
576				if let Some(task_meta) = task_meta_opt {
577					// Track if transition was successful
578					let mut transition_ok = false;
579
580					// Check if this is a recurring task with cron schedule
581					if let Some(ref cron) = task_meta.cron {
582						// Calculate next execution time
583						let next_at = match cron.next_execution(Timestamp::now()) {
584							Ok(ts) => ts,
585							Err(e) => {
586								error!(
587									"Failed to calculate next execution for recurring task {}: {} - task will not reschedule",
588									id, e
589								);
590								// Mark as finished since we can't reschedule
591								if let Err(e) = schedule.store.finished(id, "").await {
592									error!("Failed to mark task {} as finished: {}", id, e);
593								}
594								continue;
595							}
596						};
597						info!(
598							"Recurring task {} completed, scheduling next execution at {}",
599							id, next_at
600						);
601
602						// Update task with new next_at
603						let mut updated_meta = task_meta.clone();
604						updated_meta.next_at = Some(next_at);
605
606						// Update database with new next_at (keep status as Pending)
607						if let Err(e) = schedule.store.update_task(id, &updated_meta).await {
608							error!("Failed to update recurring task {} next_at: {}", id, e);
609						}
610
611						// Remove from running BEFORE add_queue (so add_queue doesn't see it as running)
612						match schedule.tasks_running.lock() {
613							Ok(mut tasks_running) => {
614								tasks_running.remove(&id);
615							}
616							Err(poisoned) => {
617								error!("Mutex poisoned: tasks_running (recovering)");
618								poisoned.into_inner().remove(&id);
619							}
620						}
621
622						// Re-add to scheduler with new execution time
623						match schedule.add_queue(id, updated_meta).await {
624							Ok(_) => transition_ok = true,
625							Err(e) => {
626								error!(
627									"Failed to reschedule recurring task {}: {} - task lost!",
628									id, e
629								);
630							}
631						}
632					} else {
633						// One-time task - mark as finished
634						match schedule.store.finished(id, "").await {
635							Ok(()) => transition_ok = true,
636							Err(e) => {
637								error!(
638									"Failed to mark task {} as finished: {} - task remains in running queue",
639									id, e
640								);
641							}
642						}
643					}
644
645					// Only remove from running queue after successful transition
646					if transition_ok {
647						match schedule.tasks_running.lock() {
648							Ok(mut tasks_running) => {
649								tasks_running.remove(&id);
650							}
651							Err(poisoned) => {
652								error!("Mutex poisoned: tasks_running (recovering)");
653								poisoned.into_inner().remove(&id);
654							}
655						}
656					}
657
658					// Handle dependencies of finished task using atomic release method
659					match schedule.release_dependents(id) {
660						Ok(ready_to_spawn) => {
661							for (dep_id, dep_task_meta) in ready_to_spawn {
662								// Add to running queue before spawning
663								match schedule.tasks_running.lock() {
664									Ok(mut tasks_running) => {
665										tasks_running.insert(dep_id, dep_task_meta.clone());
666									}
667									Err(poisoned) => {
668										error!("Mutex poisoned: tasks_running (recovering)");
669										poisoned.into_inner().insert(dep_id, dep_task_meta.clone());
670									}
671								}
672								schedule.spawn_task(
673									stat.clone(),
674									dep_task_meta.task.clone(),
675									dep_id,
676									dep_task_meta,
677								);
678							}
679						}
680						Err(e) => {
681							error!("Failed to release dependents of task {}: {}", id, e);
682						}
683					}
684				} else {
685					warn!("Completed task {} not found in running queue", id);
686				}
687			}
688		});
689
690		// Handle scheduled tasks
691		let schedule = self.clone();
692		tokio::spawn(async move {
693			loop {
694				let is_empty = match schedule.tasks_scheduled.lock() {
695					Ok(guard) => guard.is_empty(),
696					Err(poisoned) => {
697						error!("Mutex poisoned: tasks_scheduled (recovering)");
698						poisoned.into_inner().is_empty()
699					}
700				};
701				if is_empty {
702					schedule.notify_schedule.notified().await;
703				}
704				let time = Timestamp::now();
705				if let Some((timestamp, _id)) = loop {
706					let mut tasks_scheduled = match schedule.tasks_scheduled.lock() {
707						Ok(guard) => guard,
708						Err(poisoned) => {
709							error!("Mutex poisoned: tasks_scheduled (recovering)");
710							poisoned.into_inner()
711						}
712					};
713					if let Some((&(timestamp, id), _)) = tasks_scheduled.first_key_value() {
714						let (timestamp, id) = (timestamp, id);
715						if timestamp <= Timestamp::now() {
716							debug!("Spawning task id {} (from schedule)", id);
717							if let Some(task) = tasks_scheduled.remove(&(timestamp, id)) {
718								let mut tasks_running = match schedule.tasks_running.lock() {
719									Ok(guard) => guard,
720									Err(poisoned) => {
721										error!("Mutex poisoned: tasks_running (recovering)");
722										poisoned.into_inner()
723									}
724								};
725								tasks_running.insert(id, task.clone());
726								schedule.spawn_task(state.clone(), task.task.clone(), id, task);
727							} else {
728								error!("Task disappeared while being removed from schedule");
729								break None;
730							}
731						} else {
732							break Some((timestamp, id));
733						}
734					} else {
735						break None;
736					}
737				} {
738					let diff = timestamp.0 - time.0;
739					let wait =
740						tokio::time::Duration::from_secs(u64::try_from(diff).unwrap_or_default());
741					tokio::select! {
742						() = tokio::time::sleep(wait) => (), () = schedule.notify_schedule.notified() => ()
743					};
744				}
745			}
746		});
747
748		let schedule = self.clone();
749		tokio::spawn(async move {
750			let _ignore_err = schedule.load().await;
751		});
752	}
753
754	fn register_builder(
755		&self,
756		name: &'static str,
757		builder: &'static TaskBuilder<S>,
758	) -> ClResult<&Self> {
759		let mut task_builders = self
760			.task_builders
761			.write()
762			.map_err(|_| Error::Internal("task_builders RwLock poisoned".into()))?;
763		task_builders.insert(name, Box::new(builder));
764		Ok(self)
765	}
766
767	pub fn register<T: Task<S>>(&self) -> ClResult<&Self> {
768		info!("Registering task type {}", T::kind());
769		self.register_builder(T::kind(), &|id: TaskId, params: &str| T::build(id, params))?;
770		Ok(self)
771	}
772
773	/// Create a builder for scheduling a task using the fluent API
774	pub fn task(&self, task: Arc<dyn Task<S>>) -> TaskSchedulerBuilder<'_, S> {
775		TaskSchedulerBuilder::new(self, task)
776	}
777
778	/// Internal method to schedule a task with all options
779	/// This is the core implementation used by the builder pattern
780	async fn schedule_task_impl(
781		&self,
782		task: Arc<dyn Task<S>>,
783		key: Option<&str>,
784		next_at: Option<Timestamp>,
785		deps: Option<Vec<TaskId>>,
786		retry: Option<RetryPolicy>,
787		cron: Option<CronSchedule>,
788	) -> ClResult<TaskId> {
789		let task_meta = TaskMeta {
790			task: task.clone(),
791			next_at,
792			deps: deps.clone().unwrap_or_default(),
793			retry_count: 0,
794			retry,
795			cron,
796		};
797
798		// Check if a task with this key already exists (key-based deduplication)
799		if let Some(key) = key
800			&& let Some((existing_id, existing_data)) = self.store.find_by_key(key).await?
801		{
802			let new_serialized = task.serialize();
803			let existing_serialized = existing_data.input.as_ref();
804
805			// Compare serialized parameters
806			if new_serialized == existing_serialized {
807				info!(
808					"Recurring task '{}' already exists with identical parameters (id={})",
809					key, existing_id
810				);
811				// Update DB with current cron/next_at (may differ from what's stored)
812				self.store.update_task(existing_id, &task_meta).await?;
813				// Ensure the existing task is queued (may be loaded from DB but not yet in queue)
814				self.add_queue(existing_id, task_meta).await?;
815				return Ok(existing_id);
816			}
817			info!("Updating recurring task '{}' (id={}) - parameters changed", key, existing_id);
818			debug!("  Old params: {}", existing_serialized);
819			debug!("  New params: {}", new_serialized);
820
821			// Remove from all queues (if present)
822			self.remove_from_queues(existing_id)?;
823
824			// Update the task in database with new parameters
825			self.store.update_task(existing_id, &task_meta).await?;
826
827			// Re-add to appropriate queue with updated parameters
828			self.add_queue(existing_id, task_meta).await?;
829
830			return Ok(existing_id);
831		}
832
833		// No existing task - create new one
834		let id = self.store.add(&task_meta, key).await?;
835		self.add_queue(id, task_meta).await
836	}
837
838	pub async fn add(&self, task: Arc<dyn Task<S>>) -> ClResult<TaskId> {
839		self.task(task).now().await
840	}
841
842	pub async fn add_queue(&self, id: TaskId, task_meta: TaskMeta<S>) -> ClResult<TaskId> {
843		// If task is already running, update its metadata (especially for cron updates)
844		// but don't add to scheduled queue (it will reschedule on completion)
845		{
846			let mut running = lock!(self.tasks_running, "tasks_running")?;
847			if let Some(existing_meta) = running.get_mut(&id) {
848				debug!(
849					"Task {} is already running, updating metadata (will reschedule on completion)",
850					id
851				);
852				// Update the running task's metadata so it has the latest cron schedule
853				*existing_meta = task_meta;
854				return Ok(id);
855			}
856		}
857
858		// Remove from other queues if present (prevents duplicate entries with different timestamps)
859		{
860			let mut scheduled = lock!(self.tasks_scheduled, "tasks_scheduled")?;
861			if let Some(key) = scheduled
862				.iter()
863				.find(|((_, tid), _)| *tid == id)
864				.map(|((ts, tid), _)| (*ts, *tid))
865			{
866				scheduled.remove(&key);
867				debug!("Removed existing scheduled entry for task {} before re-queueing", id);
868			}
869		}
870		{
871			let mut waiting = lock!(self.tasks_waiting, "tasks_waiting")?;
872			if waiting.remove(&id).is_some() {
873				debug!("Removed existing waiting entry for task {} before re-queueing", id);
874			}
875		}
876
877		let deps = task_meta.deps.clone();
878
879		// VALIDATION: Tasks with dependencies should NEVER be in tasks_scheduled
880		if !deps.is_empty() && task_meta.next_at.is_some() {
881			warn!(
882				"Task {} has both dependencies and scheduled time - ignoring next_at, placing in waiting queue",
883				id
884			);
885			// Force to tasks_waiting instead
886			lock!(self.tasks_waiting, "tasks_waiting")?.insert(id, task_meta);
887			debug!("Task {} is waiting for {:?}", id, &deps);
888			for dep in &deps {
889				lock!(self.task_dependents, "task_dependents")?
890					.entry(*dep)
891					.or_default()
892					.push(id);
893			}
894
895			self.check_and_resolve_completed_deps(id, &deps).await?;
896			return Ok(id);
897		}
898
899		if deps.is_empty() && task_meta.next_at.unwrap_or(Timestamp(0)) < Timestamp::now() {
900			debug!("Spawning task {}", id);
901			lock!(self.tasks_scheduled, "tasks_scheduled")?.insert((Timestamp(0), id), task_meta);
902			self.notify_schedule.notify_one();
903		} else if let Some(next_at) = task_meta.next_at {
904			debug!("Scheduling task {} for {}", id, next_at);
905			lock!(self.tasks_scheduled, "tasks_scheduled")?.insert((next_at, id), task_meta);
906			self.notify_schedule.notify_one();
907		} else {
908			lock!(self.tasks_waiting, "tasks_waiting")?.insert(id, task_meta);
909			debug!("Task {} is waiting for {:?}", id, &deps);
910			for dep in &deps {
911				lock!(self.task_dependents, "task_dependents")?
912					.entry(*dep)
913					.or_default()
914					.push(id);
915			}
916
917			self.check_and_resolve_completed_deps(id, &deps).await?;
918		}
919		Ok(id)
920	}
921
922	/// After registering deps, check if any completed in the meantime.
923	/// If all deps are satisfied, move the task from waiting → scheduled.
924	async fn check_and_resolve_completed_deps(&self, id: TaskId, deps: &[TaskId]) -> ClResult<()> {
925		let completed_deps = self.store.find_completed_deps(deps).await?;
926		if completed_deps.is_empty() {
927			return Ok(());
928		}
929		let mut waiting = lock!(self.tasks_waiting, "tasks_waiting")?;
930		if let Some(task_meta) = waiting.get_mut(&id) {
931			for dep in &completed_deps {
932				task_meta.deps.retain(|d| *d != *dep);
933			}
934			if task_meta.deps.is_empty()
935				&& let Some(ready_task) = waiting.remove(&id)
936			{
937				drop(waiting);
938				let mut dependents = lock!(self.task_dependents, "task_dependents")?;
939				for dep in deps {
940					if let Some(dep_list) = dependents.get_mut(dep) {
941						dep_list.retain(|d| *d != id);
942						if dep_list.is_empty() {
943							dependents.remove(dep);
944						}
945					}
946				}
947				drop(dependents);
948				debug!("Task {} deps already completed, scheduling immediately", id);
949				lock!(self.tasks_scheduled, "tasks_scheduled")?
950					.insert((Timestamp(0), id), ready_task);
951				self.notify_schedule.notify_one();
952			}
953		}
954		Ok(())
955	}
956
957	/// Remove a task from all internal queues (waiting, scheduled, running)
958	/// Returns the removed TaskMeta if found
959	fn remove_from_queues(&self, task_id: TaskId) -> ClResult<Option<TaskMeta<S>>> {
960		// Try tasks_waiting
961		if let Some(task_meta) = lock!(self.tasks_waiting, "tasks_waiting")?.remove(&task_id) {
962			debug!("Removed task {} from waiting queue for update", task_id);
963			return Ok(Some(task_meta));
964		}
965
966		// Try tasks_scheduled (need to find by task_id in BTreeMap)
967		{
968			let mut scheduled = lock!(self.tasks_scheduled, "tasks_scheduled")?;
969			if let Some(key) = scheduled
970				.iter()
971				.find(|((_, id), _)| *id == task_id)
972				.map(|((ts, id), _)| (*ts, *id))
973				&& let Some(task_meta) = scheduled.remove(&key)
974			{
975				debug!("Removed task {} from scheduled queue for update", task_id);
976				return Ok(Some(task_meta));
977			}
978		}
979
980		// Try tasks_running (should rarely happen, but handle it)
981		if let Some(task_meta) = lock!(self.tasks_running, "tasks_running")?.remove(&task_id) {
982			warn!("Removed task {} from running queue during update", task_id);
983			return Ok(Some(task_meta));
984		}
985
986		Ok(None)
987	}
988
989	/// Release all dependent tasks of a completed task
990	/// This method safely handles dependency cleanup and spawning
991	fn release_dependents(
992		&self,
993		completed_task_id: TaskId,
994	) -> ClResult<Vec<(TaskId, TaskMeta<S>)>> {
995		// Get list of dependents (atomic removal to prevent re-processing)
996		let dependents = {
997			let mut deps_map = lock!(self.task_dependents, "task_dependents")?;
998			deps_map.remove(&completed_task_id).unwrap_or_default()
999		};
1000
1001		if dependents.is_empty() {
1002			return Ok(Vec::new()); // No dependents to release
1003		}
1004
1005		debug!("Releasing {} dependents of completed task {}", dependents.len(), completed_task_id);
1006
1007		let mut ready_to_spawn = Vec::new();
1008
1009		// For each dependent, check and remove dependency
1010		for dependent_id in dependents {
1011			// Try tasks_waiting first (most common case for dependent tasks)
1012			{
1013				let mut waiting = lock!(self.tasks_waiting, "tasks_waiting")?;
1014				if let Some(task_meta) = waiting.get_mut(&dependent_id) {
1015					// Remove the completed task from dependencies
1016					task_meta.deps.retain(|x| *x != completed_task_id);
1017
1018					// If all dependencies are cleared, remove and queue for spawning
1019					if task_meta.deps.is_empty() {
1020						if let Some(task_to_spawn) = waiting.remove(&dependent_id) {
1021							debug!(
1022								"Dependent task {} ready to spawn (all dependencies cleared)",
1023								dependent_id
1024							);
1025							ready_to_spawn.push((dependent_id, task_to_spawn));
1026						}
1027					} else {
1028						debug!(
1029							"Task {} still has {} remaining dependencies",
1030							dependent_id,
1031							task_meta.deps.len()
1032						);
1033					}
1034					continue;
1035				}
1036			}
1037
1038			// Try tasks_scheduled if not in waiting (shouldn't happen with validation, but be defensive)
1039			{
1040				let mut scheduled = lock!(self.tasks_scheduled, "tasks_scheduled")?;
1041				if let Some(scheduled_key) = scheduled
1042					.iter()
1043					.find(|((_, id), _)| *id == dependent_id)
1044					.map(|((ts, id), _)| (*ts, *id))
1045				{
1046					if let Some(task_meta) = scheduled.get_mut(&scheduled_key) {
1047						task_meta.deps.retain(|x| *x != completed_task_id);
1048						let remaining = task_meta.deps.len();
1049						if remaining == 0 {
1050							debug!(
1051								"Task {} in scheduled queue has no remaining dependencies",
1052								dependent_id
1053							);
1054						} else {
1055							debug!(
1056								"Task {} in scheduled queue has {} remaining dependencies",
1057								dependent_id, remaining
1058							);
1059						}
1060					}
1061					continue;
1062				}
1063			}
1064
1065			// Task not found in any queue
1066			warn!(
1067				"Dependent task {} of completed task {} not found in any queue",
1068				dependent_id, completed_task_id
1069			);
1070		}
1071
1072		Ok(ready_to_spawn)
1073	}
1074
1075	async fn load(&self) -> ClResult<()> {
1076		let tasks = self.store.load().await?;
1077		debug!("Loaded {} tasks from store", tasks.len());
1078		for t in tasks {
1079			if let TaskStatus::Pending = t.status {
1080				debug!("Loading task {} {}", t.id, t.kind);
1081				let task = {
1082					let builder_map = self
1083						.task_builders
1084						.read()
1085						.map_err(|_| Error::Internal("task_builders RwLock poisoned".into()))?;
1086					let builder = builder_map.get(t.kind.as_ref()).ok_or(Error::Internal(
1087						format!("task builder not registered: {}", t.kind),
1088					))?;
1089					builder(t.id, &t.input)?
1090				};
1091				let (retry_count, retry) = match t.retry_data {
1092					Some(retry_str) => {
1093						let (retry_count, retry_min, retry_max, retry_times) = retry_str
1094							.split(',')
1095							.collect_tuple()
1096							.ok_or(Error::Internal("invalid retry policy format".into()))?;
1097						let retry_count: u16 = retry_count
1098							.parse()
1099							.map_err(|_| Error::Internal("retry count must be u16".into()))?;
1100						let retry = RetryPolicy {
1101							wait_min_max: (
1102								retry_min
1103									.parse()
1104									.map_err(|_| Error::Internal("retry_min must be u64".into()))?,
1105								retry_max
1106									.parse()
1107									.map_err(|_| Error::Internal("retry_max must be u64".into()))?,
1108							),
1109							times: retry_times
1110								.parse()
1111								.map_err(|_| Error::Internal("retry times must be u64".into()))?,
1112						};
1113						debug!("Loaded retry policy: {:?}", retry);
1114						(retry_count, Some(retry))
1115					}
1116					_ => (0, None),
1117				};
1118				// Parse cron data if present
1119				let cron =
1120					t.cron_data.as_ref().and_then(|cron_str| CronSchedule::parse(cron_str).ok());
1121
1122				let task_meta = TaskMeta {
1123					task,
1124					next_at: t.next_at,
1125					deps: t.deps.into(),
1126					retry_count,
1127					retry,
1128					cron,
1129				};
1130				self.add_queue(t.id, task_meta).await?;
1131			}
1132		}
1133		Ok(())
1134	}
1135
1136	fn spawn_task(&self, state: S, task: Arc<dyn Task<S>>, id: TaskId, task_meta: TaskMeta<S>) {
1137		let tx_finish = self.tx_finish.clone();
1138		let store = self.store.clone();
1139		let scheduler = self.clone();
1140		//let state = self.state.clone();
1141		tokio::spawn(async move {
1142			match task.run(&state).await {
1143				Ok(()) => {
1144					debug!("Task {} completed successfully", id);
1145					tx_finish.send(id).unwrap_or(());
1146				}
1147				Err(e) => {
1148					if let Some(retry_policy) = &task_meta.retry {
1149						if retry_policy.should_retry(task_meta.retry_count) {
1150							let backoff = retry_policy.calculate_backoff(task_meta.retry_count);
1151							let next_at = Timestamp::from_now(backoff.cast_signed());
1152
1153							info!(
1154								"Task {} failed (attempt {}/{}). Scheduling retry in {} seconds: {}",
1155								id,
1156								task_meta.retry_count + 1,
1157								retry_policy.times,
1158								backoff,
1159								e
1160							);
1161
1162							// Update database with error and reschedule
1163							store
1164								.update_task_error(id, &e.to_string(), Some(next_at))
1165								.await
1166								.unwrap_or(());
1167
1168							// Remove from running tasks (we're not sending finish event)
1169							match scheduler.tasks_running.lock() {
1170								Ok(mut tasks_running) => {
1171									tasks_running.remove(&id);
1172								}
1173								Err(poisoned) => {
1174									error!("Mutex poisoned: tasks_running (recovering)");
1175									poisoned.into_inner().remove(&id);
1176								}
1177							}
1178
1179							// Re-queue task with incremented retry count
1180							let mut retry_meta = task_meta.clone();
1181							retry_meta.retry_count += 1;
1182							retry_meta.next_at = Some(next_at);
1183							scheduler.add_queue(id, retry_meta).await.unwrap_or(id);
1184						} else {
1185							// Max retries exhausted
1186							error!(
1187								"Task {} failed after {} retries: {}",
1188								id, task_meta.retry_count, e
1189							);
1190							store.update_task_error(id, &e.to_string(), None).await.unwrap_or(());
1191							task.on_failed(&state, task_meta.retry_count, &e.to_string()).await;
1192							tx_finish.send(id).unwrap_or(());
1193						}
1194					} else {
1195						// No retry policy - fail immediately
1196						error!("Task {} failed: {}", id, e);
1197						store.update_task_error(id, &e.to_string(), None).await.unwrap_or(());
1198						task.on_failed(&state, 0, &e.to_string()).await;
1199						tx_finish.send(id).unwrap_or(());
1200					}
1201				}
1202			}
1203		});
1204	}
1205
1206	/// Get health status of the scheduler
1207	/// Returns information about tasks in each queue and detects anomalies
1208	pub async fn health_check(&self) -> ClResult<SchedulerHealth> {
1209		let waiting_count = lock!(self.tasks_waiting, "tasks_waiting")?.len();
1210		let scheduled_count = lock!(self.tasks_scheduled, "tasks_scheduled")?.len();
1211		let running_count = lock!(self.tasks_running, "tasks_running")?.len();
1212		let dependents_count = lock!(self.task_dependents, "task_dependents")?.len();
1213
1214		// Check for anomalies
1215		let mut stuck_tasks = Vec::new();
1216		let mut tasks_with_missing_deps = Vec::new();
1217
1218		// Check tasks_waiting for tasks with no dependencies (stuck)
1219		{
1220			let waiting = lock!(self.tasks_waiting, "tasks_waiting")?;
1221			let _deps_map = lock!(self.task_dependents, "task_dependents")?;
1222
1223			for (id, task_meta) in waiting.iter() {
1224				if task_meta.deps.is_empty() {
1225					stuck_tasks.push(*id);
1226					warn!("SCHEDULER HEALTH: Task {} in waiting with no dependencies", id);
1227				} else {
1228					// Check if all dependencies still exist
1229					for dep in &task_meta.deps {
1230						let dep_exists = waiting.contains_key(dep)
1231							|| self.tasks_running.lock().ok().is_some_and(|r| r.contains_key(dep))
1232							|| self
1233								.tasks_scheduled
1234								.lock()
1235								.ok()
1236								.is_some_and(|s| s.iter().any(|((_, task_id), _)| task_id == dep));
1237
1238						if !dep_exists {
1239							tasks_with_missing_deps.push((*id, *dep));
1240							warn!(
1241								"SCHEDULER HEALTH: Task {} depends on non-existent task {}",
1242								id, dep
1243							);
1244						}
1245					}
1246				}
1247			}
1248		}
1249
1250		Ok(SchedulerHealth {
1251			waiting: waiting_count,
1252			scheduled: scheduled_count,
1253			running: running_count,
1254			dependents: dependents_count,
1255			stuck_tasks,
1256			tasks_with_missing_deps,
1257		})
1258	}
1259}
1260
1261/// Health status of the scheduler
1262#[derive(Debug, Clone)]
1263pub struct SchedulerHealth {
1264	/// Number of tasks waiting for dependencies
1265	pub waiting: usize,
1266	/// Number of tasks scheduled for future execution
1267	pub scheduled: usize,
1268	/// Number of tasks currently running
1269	pub running: usize,
1270	/// Number of task entries in dependents map
1271	pub dependents: usize,
1272	/// IDs of tasks with no dependencies but still in waiting queue
1273	pub stuck_tasks: Vec<TaskId>,
1274	/// Pairs of (task_id, missing_dependency_id) where dependency doesn't exist
1275	pub tasks_with_missing_deps: Vec<(TaskId, TaskId)>,
1276}
1277
1278#[cfg(test)]
1279mod tests {
1280	use super::*;
1281	use serde::{Deserialize, Serialize};
1282
1283	type State = Arc<Mutex<Vec<u8>>>;
1284
1285	#[derive(Debug, Serialize, Deserialize)]
1286	struct TestTask {
1287		num: u8,
1288	}
1289
1290	impl TestTask {
1291		pub fn new(num: u8) -> Arc<Self> {
1292			Arc::new(Self { num })
1293		}
1294	}
1295
1296	#[async_trait]
1297	impl Task<State> for TestTask {
1298		fn kind() -> &'static str {
1299			"test"
1300		}
1301
1302		fn build(_id: TaskId, ctx: &str) -> ClResult<Arc<dyn Task<State>>> {
1303			let num: u8 = ctx
1304				.parse()
1305				.map_err(|_| Error::Internal("test task context must be u8".into()))?;
1306			let task = TestTask::new(num);
1307			Ok(task)
1308		}
1309
1310		fn serialize(&self) -> String {
1311			self.num.to_string()
1312		}
1313
1314		fn kind_of(&self) -> &'static str {
1315			"test"
1316		}
1317
1318		async fn run(&self, state: &State) -> ClResult<()> {
1319			info!("Running task {}", self.num);
1320			tokio::time::sleep(std::time::Duration::from_millis(200 * u64::from(self.num))).await;
1321			info!("Completed task {}", self.num);
1322			state.lock().unwrap().push(self.num);
1323			Ok(())
1324		}
1325	}
1326
1327	#[derive(Debug, Clone)]
1328	struct FailingTask {
1329		id: u8,
1330		fail_count: u8,
1331		attempt: Arc<Mutex<u8>>,
1332	}
1333
1334	impl FailingTask {
1335		pub fn new(id: u8, fail_count: u8) -> Arc<Self> {
1336			Arc::new(Self { id, fail_count, attempt: Arc::new(Mutex::new(0)) })
1337		}
1338	}
1339
1340	#[async_trait]
1341	impl Task<State> for FailingTask {
1342		fn kind() -> &'static str {
1343			"failing"
1344		}
1345
1346		fn build(_id: TaskId, ctx: &str) -> ClResult<Arc<dyn Task<State>>> {
1347			let parts: Vec<&str> = ctx.split(',').collect();
1348			if parts.len() != 2 {
1349				return Err(Error::Internal("failing task context must have 2 parts".into()));
1350			}
1351			let id: u8 = parts[0]
1352				.parse()
1353				.map_err(|_| Error::Internal("failing task id must be u8".into()))?;
1354			let fail_count: u8 = parts[1]
1355				.parse()
1356				.map_err(|_| Error::Internal("failing task fail_count must be u8".into()))?;
1357			Ok(FailingTask::new(id, fail_count))
1358		}
1359
1360		fn serialize(&self) -> String {
1361			format!("{},{}", self.id, self.fail_count)
1362		}
1363
1364		fn kind_of(&self) -> &'static str {
1365			"failing"
1366		}
1367
1368		async fn run(&self, state: &State) -> ClResult<()> {
1369			let mut attempt = self.attempt.lock().unwrap();
1370			*attempt += 1;
1371			let current_attempt = *attempt;
1372
1373			info!("FailingTask {} - attempt {}/{}", self.id, current_attempt, self.fail_count + 1);
1374
1375			if current_attempt <= self.fail_count {
1376				error!("FailingTask {} failed on attempt {}", self.id, current_attempt);
1377				return Err(Error::ServiceUnavailable(format!("Task {} failed", self.id)));
1378			}
1379
1380			info!("FailingTask {} succeeded on attempt {}", self.id, current_attempt);
1381			state.lock().unwrap().push(self.id);
1382			Ok(())
1383		}
1384	}
1385
1386	#[tokio::test]
1387	pub async fn test_scheduler() {
1388		let _ = tracing_subscriber::fmt().try_init();
1389
1390		let task_store: Arc<dyn TaskStore<State>> = InMemoryTaskStore::new();
1391		let state: State = Arc::new(Mutex::new(Vec::new()));
1392		let scheduler = Scheduler::new(task_store);
1393		scheduler.start(state.clone());
1394		scheduler.register::<TestTask>().unwrap();
1395
1396		let _task1 = TestTask::new(1);
1397		let task2 = TestTask::new(1);
1398		let task3 = TestTask::new(1);
1399
1400		let task2_id = scheduler.task(task2).schedule_after(2).schedule().await.unwrap();
1401		let task3_id = scheduler.add(task3).await.unwrap();
1402		scheduler
1403			.task(TestTask::new(1))
1404			.depend_on(vec![task2_id, task3_id])
1405			.schedule()
1406			.await
1407			.unwrap();
1408
1409		tokio::time::sleep(std::time::Duration::from_secs(4)).await;
1410		let task4 = TestTask::new(1);
1411		let task5 = TestTask::new(1);
1412		scheduler.task(task4).schedule_after(2).schedule().await.unwrap();
1413		scheduler.task(task5).schedule_after(1).schedule().await.unwrap();
1414
1415		tokio::time::sleep(std::time::Duration::from_secs(3)).await;
1416
1417		let st = state.lock().unwrap();
1418		info!("res: {}", st.len());
1419		let str_vec = st.iter().map(std::string::ToString::to_string).collect::<Vec<String>>();
1420		assert_eq!(str_vec.join(":"), "1:1:1:1:1");
1421	}
1422
1423	#[tokio::test]
1424	pub async fn test_retry_with_backoff() {
1425		let _ = tracing_subscriber::fmt().try_init();
1426
1427		let task_store: Arc<dyn TaskStore<State>> = InMemoryTaskStore::new();
1428		let state: State = Arc::new(Mutex::new(Vec::new()));
1429		let scheduler = Scheduler::new(task_store);
1430		scheduler.start(state.clone());
1431		scheduler.register::<FailingTask>().unwrap();
1432
1433		// Create a task that fails twice, then succeeds
1434		// With retry policy: min=1s, max=3600s, max_attempts=3
1435		let failing_task = FailingTask::new(42, 2);
1436		let retry_policy = RetryPolicy { wait_min_max: (1, 3600), times: 3 };
1437
1438		scheduler.task(failing_task).with_retry(retry_policy).schedule().await.unwrap();
1439
1440		// Wait for retries: 1s (1st fail) + 1s (2nd fail) + time for success
1441		// First attempt: immediate fail
1442		// Wait 1s (min backoff)
1443		// Second attempt: fail
1444		// Wait 2s (min * 2)
1445		// Third attempt: success
1446		tokio::time::sleep(std::time::Duration::from_secs(6)).await;
1447
1448		let st = state.lock().unwrap();
1449		assert_eq!(st.len(), 1, "Task should have succeeded after retries");
1450		assert_eq!(st[0], 42);
1451	}
1452
1453	// ===== Builder Pattern Tests =====
1454
1455	#[tokio::test]
1456	pub async fn test_builder_simple_schedule() {
1457		let task_store: Arc<dyn TaskStore<State>> = InMemoryTaskStore::new();
1458		let state: State = Arc::new(Mutex::new(Vec::new()));
1459		let scheduler = Scheduler::new(task_store);
1460		scheduler.start(state.clone());
1461		scheduler.register::<TestTask>().unwrap();
1462
1463		// Test basic builder usage: .now()
1464		let task = TestTask::new(1);
1465		let id = scheduler.task(task).now().await.unwrap();
1466
1467		assert!(id > 0, "Task ID should be positive");
1468
1469		tokio::time::sleep(std::time::Duration::from_millis(500)).await;
1470
1471		let st = state.lock().unwrap();
1472		assert_eq!(st.len(), 1, "Task should have executed");
1473		assert_eq!(st[0], 1);
1474	}
1475
1476	#[tokio::test]
1477	pub async fn test_builder_with_key() {
1478		let task_store: Arc<dyn TaskStore<State>> = InMemoryTaskStore::new();
1479		let state: State = Arc::new(Mutex::new(Vec::new()));
1480		let scheduler = Scheduler::new(task_store);
1481		scheduler.start(state.clone());
1482		scheduler.register::<TestTask>().unwrap();
1483
1484		// Test builder with key
1485		let task = TestTask::new(1);
1486		let _id = scheduler.task(task).key("my-task-key").now().await.unwrap();
1487
1488		tokio::time::sleep(std::time::Duration::from_millis(500)).await;
1489
1490		let st = state.lock().unwrap();
1491		assert_eq!(st.len(), 1);
1492		assert_eq!(st[0], 1);
1493	}
1494
1495	#[tokio::test]
1496	pub async fn test_builder_with_delay() {
1497		let task_store: Arc<dyn TaskStore<State>> = InMemoryTaskStore::new();
1498		let state: State = Arc::new(Mutex::new(Vec::new()));
1499		let scheduler = Scheduler::new(task_store);
1500		scheduler.start(state.clone());
1501		scheduler.register::<TestTask>().unwrap();
1502
1503		// Test builder with .after() convenience method
1504		let task = TestTask::new(1);
1505		let _id = scheduler
1506			.task(task)
1507			.after(1)  // 1 second delay
1508			.await
1509			.unwrap();
1510
1511		// Should not have executed yet
1512		tokio::time::sleep(std::time::Duration::from_millis(500)).await;
1513		{
1514			let st = state.lock().unwrap();
1515			assert_eq!(st.len(), 0, "Task should not execute yet");
1516		}
1517
1518		// Wait for execution (1 sec delay + 200ms task sleep + buffer)
1519		tokio::time::sleep(std::time::Duration::from_millis(800)).await;
1520
1521		{
1522			let st = state.lock().unwrap();
1523			assert_eq!(st.len(), 1, "Task should have executed");
1524			assert_eq!(st[0], 1);
1525		}
1526	}
1527
1528	#[tokio::test]
1529	pub async fn test_builder_with_dependencies() {
1530		let task_store: Arc<dyn TaskStore<State>> = InMemoryTaskStore::new();
1531		let state: State = Arc::new(Mutex::new(Vec::new()));
1532		let scheduler = Scheduler::new(task_store);
1533		scheduler.start(state.clone());
1534		scheduler.register::<TestTask>().unwrap();
1535
1536		// Create first task (sleeps 200ms)
1537		let task1 = TestTask::new(1);
1538		let id1 = scheduler.task(task1).now().await.unwrap();
1539
1540		// Create second task (sleeps 400ms)
1541		let task2 = TestTask::new(1);
1542		let id2 = scheduler.task(task2).now().await.unwrap();
1543
1544		// Create third task that depends on first two (sleeps 600ms)
1545		let task3 = TestTask::new(1);
1546		let _id3 = scheduler.task(task3).depend_on(vec![id1, id2]).schedule().await.unwrap();
1547
1548		// Wait for all tasks: task1 200ms, task2 400ms, task3 600ms = ~1200ms
1549		tokio::time::sleep(std::time::Duration::from_millis(1500)).await;
1550
1551		let st = state.lock().unwrap();
1552		// Should have all three tasks in execution order: 1 finishes first (200ms), then 2 (200ms), then 3 (200ms after both)
1553		let str_vec = st.iter().map(std::string::ToString::to_string).collect::<Vec<String>>();
1554		assert_eq!(str_vec.join(":"), "1:1:1");
1555	}
1556
1557	#[tokio::test]
1558	pub async fn test_builder_with_retry() {
1559		let task_store: Arc<dyn TaskStore<State>> = InMemoryTaskStore::new();
1560		let state: State = Arc::new(Mutex::new(Vec::new()));
1561		let scheduler = Scheduler::new(task_store);
1562		scheduler.start(state.clone());
1563		scheduler.register::<FailingTask>().unwrap();
1564
1565		// Create task using builder with retry policy
1566		let failing_task = FailingTask::new(55, 1); // Fails once, succeeds second time
1567		let retry_policy = RetryPolicy { wait_min_max: (1, 3600), times: 3 };
1568
1569		let _id = scheduler.task(failing_task).with_retry(retry_policy).schedule().await.unwrap();
1570
1571		// Wait for retry cycle: 1 fail + 1s wait + 1 success
1572		tokio::time::sleep(std::time::Duration::from_secs(3)).await;
1573
1574		let st = state.lock().unwrap();
1575		assert_eq!(st.len(), 1);
1576		assert_eq!(st[0], 55);
1577	}
1578
1579	#[tokio::test]
1580	pub async fn test_builder_with_automatic_retry() {
1581		let task_store: Arc<dyn TaskStore<State>> = InMemoryTaskStore::new();
1582		let state: State = Arc::new(Mutex::new(Vec::new()));
1583		let scheduler = Scheduler::new(task_store);
1584		scheduler.start(state.clone());
1585		scheduler.register::<FailingTask>().unwrap();
1586
1587		// Create task using builder with automatic retry (default policy)
1588		let failing_task = FailingTask::new(66, 1);
1589		let _id = scheduler.task(failing_task).with_automatic_retry().await.unwrap();
1590
1591		// Wait for retry cycle with default policy (min=60s would be too long for test)
1592		// but we already tested retry logic thoroughly, just verify builder integration
1593		tokio::time::sleep(std::time::Duration::from_millis(500)).await;
1594
1595		// The important part is that this compiles and integrates correctly
1596		let st = state.lock().unwrap();
1597		// With default policy (min=60s), task shouldn't succeed in test timeframe
1598		// Just verify builder chaining works
1599		let _ = st.len(); // Verify state is accessible, but don't assert on timeout-dependent result
1600	}
1601
1602	#[tokio::test]
1603	pub async fn test_builder_fluent_chaining() {
1604		let task_store: Arc<dyn TaskStore<State>> = InMemoryTaskStore::new();
1605		let state: State = Arc::new(Mutex::new(Vec::new()));
1606		let scheduler = Scheduler::new(task_store);
1607		scheduler.start(state.clone());
1608		scheduler.register::<TestTask>().unwrap();
1609
1610		// Create first dependencies
1611		let dep1 = scheduler.task(TestTask::new(1)).now().await.unwrap();
1612		let dep2 = scheduler.task(TestTask::new(1)).now().await.unwrap();
1613
1614		// Test fluent chaining with multiple methods
1615		let retry_policy = RetryPolicy { wait_min_max: (1, 3600), times: 3 };
1616
1617		let task = TestTask::new(1);
1618		let _id = scheduler
1619			.task(task)
1620			.key("complex-task")
1621			.schedule_after(0)  // Schedule immediately
1622			.depend_on(vec![dep1, dep2])
1623			.with_retry(retry_policy)
1624			.schedule()
1625			.await
1626			.unwrap();
1627
1628		tokio::time::sleep(std::time::Duration::from_millis(800)).await;
1629
1630		let st = state.lock().unwrap();
1631		// Should have all tasks: 20:10 (immediate deps) then 30 (after deps)
1632		let str_vec = st.iter().map(std::string::ToString::to_string).collect::<Vec<String>>();
1633		assert_eq!(str_vec.join(":"), "1:1:1");
1634	}
1635
1636	#[tokio::test]
1637	pub async fn test_builder_backward_compatibility() {
1638		let task_store: Arc<dyn TaskStore<State>> = InMemoryTaskStore::new();
1639		let state: State = Arc::new(Mutex::new(Vec::new()));
1640		let scheduler = Scheduler::new(task_store);
1641		scheduler.start(state.clone());
1642		scheduler.register::<TestTask>().unwrap();
1643
1644		// Test that old API still works
1645		let _id1 = scheduler.add(TestTask::new(1)).await.unwrap();
1646
1647		// Test that new builder API works
1648		let _id2 = scheduler.task(TestTask::new(1)).now().await.unwrap();
1649
1650		tokio::time::sleep(std::time::Duration::from_millis(800)).await;
1651
1652		let st = state.lock().unwrap();
1653		// Both old and new API should have executed
1654		assert_eq!(st.len(), 2);
1655		let str_vec = st.iter().map(std::string::ToString::to_string).collect::<Vec<String>>();
1656		assert_eq!(str_vec.join(":"), "1:1");
1657	}
1658
1659	// ===== Phase 2: Integration Tests - Real-world scenarios =====
1660
1661	#[tokio::test]
1662	pub async fn test_builder_pipeline_scenario() {
1663		// Simulates: Task 1 -> Task 2 (depends on 1) -> Task 3 (depends on 2)
1664		let task_store: Arc<dyn TaskStore<State>> = InMemoryTaskStore::new();
1665		let state: State = Arc::new(Mutex::new(Vec::new()));
1666		let scheduler = Scheduler::new(task_store);
1667		scheduler.start(state.clone());
1668		scheduler.register::<TestTask>().unwrap();
1669
1670		// Stage 1: Create initial task
1671		let id1 = scheduler.task(TestTask::new(1)).key("stage-1").now().await.unwrap();
1672
1673		// Stage 2: Create task that depends on stage 1
1674		let id2 = scheduler.task(TestTask::new(1)).key("stage-2").after_task(id1).await.unwrap();
1675
1676		// Stage 3: Create task that depends on stage 2
1677		let _id3 = scheduler.task(TestTask::new(1)).key("stage-3").after_task(id2).await.unwrap();
1678
1679		// Wait for pipeline: 1(200ms) + 2(200ms) + 3(200ms) = 600ms
1680		tokio::time::sleep(std::time::Duration::from_millis(1200)).await;
1681
1682		let st = state.lock().unwrap();
1683		// Should execute in order: 1, 2, 3
1684		let str_vec = st.iter().map(std::string::ToString::to_string).collect::<Vec<String>>();
1685		assert_eq!(str_vec.join(":"), "1:1:1");
1686	}
1687
1688	#[tokio::test]
1689	pub async fn test_builder_multi_dependency_join() {
1690		// Simulates: Task 1 parallel with Task 2, then Task 3 waits for both
1691		let task_store: Arc<dyn TaskStore<State>> = InMemoryTaskStore::new();
1692		let state: State = Arc::new(Mutex::new(Vec::new()));
1693		let scheduler = Scheduler::new(task_store);
1694		scheduler.start(state.clone());
1695		scheduler.register::<TestTask>().unwrap();
1696
1697		// Parallel tasks
1698		let id1 = scheduler.task(TestTask::new(1)).now().await.unwrap();
1699		let id2 = scheduler.task(TestTask::new(1)).now().await.unwrap();
1700
1701		// Join task - waits for both
1702		let _id3 = scheduler
1703			.task(TestTask::new(1))
1704			.depend_on(vec![id1, id2])
1705			.schedule()
1706			.await
1707			.unwrap();
1708
1709		tokio::time::sleep(std::time::Duration::from_secs(1)).await;
1710
1711		let st = state.lock().unwrap();
1712		// 1 and 2 execute in parallel, then 3 executes after both
1713		let str_vec = st.iter().map(std::string::ToString::to_string).collect::<Vec<String>>();
1714		assert_eq!(str_vec.join(":"), "1:1:1");
1715	}
1716
1717	#[tokio::test]
1718	pub async fn test_builder_scheduled_task_with_dependencies() {
1719		// Simulates: Task depends on earlier task AND is scheduled for future time
1720		let task_store: Arc<dyn TaskStore<State>> = InMemoryTaskStore::new();
1721		let state: State = Arc::new(Mutex::new(Vec::new()));
1722		let scheduler = Scheduler::new(task_store);
1723		scheduler.start(state.clone());
1724		scheduler.register::<TestTask>().unwrap();
1725
1726		// Immediate task
1727		let dep_id = scheduler.task(TestTask::new(1)).now().await.unwrap();
1728
1729		// Task that waits for dependency AND scheduled delay
1730		let ts = Timestamp::from_now(1);
1731		let _task_id = scheduler
1732			.task(TestTask::new(1))
1733			.schedule_at(ts)
1734			.depend_on(vec![dep_id])
1735			.schedule()
1736			.await
1737			.unwrap();
1738
1739		// Wait for dependency to complete but before scheduled time
1740		tokio::time::sleep(std::time::Duration::from_millis(300)).await;
1741		{
1742			let st = state.lock().unwrap();
1743			assert_eq!(st.len(), 1); // Only dependency executed
1744		}
1745
1746		// Wait for scheduled time (1s total from initial schedule)
1747		tokio::time::sleep(std::time::Duration::from_millis(800)).await;
1748
1749		{
1750			let st = state.lock().unwrap();
1751			let str_vec = st.iter().map(std::string::ToString::to_string).collect::<Vec<String>>();
1752			assert_eq!(str_vec.join(":"), "1:1");
1753		}
1754	}
1755
1756	#[tokio::test]
1757	pub async fn test_builder_mixed_features() {
1758		// Simulates: Complex real-world scenario with key, scheduling, deps, and retry
1759		let task_store: Arc<dyn TaskStore<State>> = InMemoryTaskStore::new();
1760		let state: State = Arc::new(Mutex::new(Vec::new()));
1761		let scheduler = Scheduler::new(task_store);
1762		scheduler.start(state.clone());
1763		scheduler.register::<TestTask>().unwrap();
1764		scheduler.register::<FailingTask>().unwrap();
1765
1766		// Create initial tasks
1767		let id1 = scheduler.task(TestTask::new(1)).now().await.unwrap();
1768
1769		// Create complex task: scheduled + depends on id1 + has key
1770		let _id2 = scheduler
1771			.task(TestTask::new(1))
1772			.key("critical-task")
1773			.schedule_after(0)
1774			.depend_on(vec![id1])
1775			.schedule()
1776			.await
1777			.unwrap();
1778
1779		// Create task with retry
1780		let _id3 = scheduler
1781			.task(FailingTask::new(1, 0))  // Fails 0 times, succeeds immediately
1782			.key("retryable-task")
1783			.with_retry(RetryPolicy {
1784				wait_min_max: (1, 3600),
1785				times: 3,
1786			})
1787			.schedule()
1788			.await
1789			.unwrap();
1790
1791		// Wait for tasks: id1 (200ms) + id2 (200ms after id1) + id3 (200ms) = ~600ms
1792		tokio::time::sleep(std::time::Duration::from_millis(1200)).await;
1793
1794		let st = state.lock().unwrap();
1795		// All three tasks should execute
1796		let str_vec = st.iter().map(std::string::ToString::to_string).collect::<Vec<String>>();
1797		assert_eq!(str_vec.join(":"), "1:1:1");
1798	}
1799
1800	#[tokio::test]
1801	pub async fn test_builder_builder_reuse_not_possible() {
1802		// Verify that builder is consumed (moved) and can't be reused
1803		let task_store: Arc<dyn TaskStore<State>> = InMemoryTaskStore::new();
1804		let _state: State = Arc::new(Mutex::new(Vec::new()));
1805		let scheduler = Scheduler::new(task_store);
1806
1807		let task = TestTask::new(1);
1808		let builder = scheduler.task(task);
1809
1810		// This would not compile if uncommented (builder is moved):
1811		// let _id1 = builder.now().await.unwrap();
1812		// let _id2 = builder.now().await.unwrap();  // Error: use of moved value
1813
1814		// Can only call terminal method once
1815		let _id = builder.now().await.unwrap();
1816		// builder is now consumed, can't use again
1817
1818		// Test passes if it compiles (verifying move semantics)
1819	}
1820
1821	#[tokio::test]
1822	pub async fn test_builder_different_task_types() {
1823		// Test builder works with different task implementations
1824		let task_store: Arc<dyn TaskStore<State>> = InMemoryTaskStore::new();
1825		let state: State = Arc::new(Mutex::new(Vec::new()));
1826		let scheduler = Scheduler::new(task_store);
1827		scheduler.start(state.clone());
1828		scheduler.register::<TestTask>().unwrap();
1829		scheduler.register::<FailingTask>().unwrap();
1830
1831		// Mix of different task types
1832		let _id1 = scheduler.task(TestTask::new(1)).key("test-task").now().await.unwrap();
1833
1834		let _id2 = scheduler
1835			.task(FailingTask::new(1, 0))  // Won't fail
1836			.key("failing-task")
1837			.now()
1838			.await
1839			.unwrap();
1840
1841		let _id3 = scheduler.task(TestTask::new(1)).now().await.unwrap();
1842
1843		tokio::time::sleep(std::time::Duration::from_secs(1)).await;
1844
1845		let st = state.lock().unwrap();
1846		assert_eq!(st.len(), 3);
1847		let str_vec = st.iter().map(std::string::ToString::to_string).collect::<Vec<String>>();
1848		// All three tasks should execute
1849		assert_eq!(str_vec.join(":"), "1:1:1");
1850	}
1851
1852	// ===== Phase 3: Cron Placeholder Tests =====
1853	// These tests verify that cron methods compile and integrate
1854	// Actual cron functionality will be implemented in Phase 3
1855
1856	#[tokio::test]
1857	pub async fn test_builder_cron_placeholder_syntax() {
1858		// Verify cron placeholder methods compile and chain properly
1859		let task_store: Arc<dyn TaskStore<State>> = InMemoryTaskStore::new();
1860		let state: State = Arc::new(Mutex::new(Vec::new()));
1861		let scheduler = Scheduler::new(task_store);
1862		scheduler.start(state.clone());
1863		scheduler.register::<TestTask>().unwrap();
1864
1865		// Test that cron methods compile (they're no-ops in Phase 2)
1866		let task = TestTask::new(1);
1867		let _id = scheduler
1868			.task(task)
1869			.key("cron-task")
1870			.cron("0 9 * * *")  // 9 AM daily
1871			.schedule()
1872			.await
1873			.unwrap();
1874
1875		// Cron scheduling - task will execute at the next scheduled time
1876		// For cron "0 9 * * *", that's tomorrow at 9 AM, so task won't execute in this test
1877		// This test just verifies the methods compile and chain properly
1878		tokio::time::sleep(std::time::Duration::from_millis(500)).await;
1879
1880		let st = state.lock().unwrap();
1881		// Task is scheduled for future (9 AM), so it won't have executed yet
1882		// The important thing is that the cron methods compile and integrate
1883		assert_eq!(st.len(), 0); // Not executed yet since scheduled for future
1884	}
1885
1886	#[tokio::test]
1887	pub async fn test_builder_daily_at_placeholder() {
1888		// Verify daily_at placeholder compiles and integrates
1889		let task_store: Arc<dyn TaskStore<State>> = InMemoryTaskStore::new();
1890		let state: State = Arc::new(Mutex::new(Vec::new()));
1891		let scheduler = Scheduler::new(task_store);
1892		scheduler.start(state.clone());
1893		scheduler.register::<TestTask>().unwrap();
1894
1895		// Test that daily_at placeholder compiles
1896		let task = TestTask::new(1);
1897		let _id = scheduler
1898			.task(task)
1899			.key("daily-task")
1900			.daily_at(14, 30)  // 2:30 PM daily
1901			.schedule()
1902			.await
1903			.unwrap();
1904
1905		// Daily_at scheduling - task will execute at the specified time (2:30 PM daily)
1906		// Task is scheduled for future, so it won't execute in this test
1907		tokio::time::sleep(std::time::Duration::from_millis(500)).await;
1908
1909		let st = state.lock().unwrap();
1910		// Task is scheduled for future (2:30 PM), not executed yet
1911		// The important thing is that daily_at compiles and integrates properly
1912		assert_eq!(st.len(), 0);
1913	}
1914
1915	#[tokio::test]
1916	pub async fn test_builder_weekly_at_placeholder() {
1917		// Verify weekly_at placeholder compiles and integrates
1918		let task_store: Arc<dyn TaskStore<State>> = InMemoryTaskStore::new();
1919		let state: State = Arc::new(Mutex::new(Vec::new()));
1920		let scheduler = Scheduler::new(task_store);
1921		scheduler.start(state.clone());
1922		scheduler.register::<TestTask>().unwrap();
1923
1924		// Test that weekly_at placeholder compiles
1925		let task = TestTask::new(1);
1926		let _id = scheduler
1927			.task(task)
1928			.key("weekly-task")
1929			.weekly_at(1, 9, 0)  // Monday at 9 AM
1930			.schedule()
1931			.await
1932			.unwrap();
1933
1934		// Weekly_at scheduling - task will execute on Monday at 9 AM
1935		// Task is scheduled for future, so it won't execute in this test
1936		tokio::time::sleep(std::time::Duration::from_millis(500)).await;
1937
1938		let st = state.lock().unwrap();
1939		// Task is scheduled for future (Monday 9 AM), not executed yet
1940		// The important thing is that weekly_at compiles and integrates properly
1941		assert_eq!(st.len(), 0);
1942	}
1943
1944	#[tokio::test]
1945	pub async fn test_builder_cron_with_retry() {
1946		// Verify cron methods chain with retry (future combined usage)
1947		let task_store: Arc<dyn TaskStore<State>> = InMemoryTaskStore::new();
1948		let state: State = Arc::new(Mutex::new(Vec::new()));
1949		let scheduler = Scheduler::new(task_store);
1950		scheduler.start(state.clone());
1951		scheduler.register::<TestTask>().unwrap();
1952
1953		// Test future usage pattern: cron + retry
1954		let task = TestTask::new(1);
1955		let _id = scheduler
1956			.task(task)
1957			.key("reliable-scheduled-task")
1958			.daily_at(2, 0)  // 2 AM daily
1959			.with_retry(RetryPolicy {
1960				wait_min_max: (60, 3600),
1961				times: 5,
1962			})
1963			.schedule()
1964			.await
1965			.unwrap();
1966
1967		// Verify cron+retry chain compiles properly
1968		// Task is scheduled for 2 AM, so won't execute in this test
1969		tokio::time::sleep(std::time::Duration::from_millis(500)).await;
1970
1971		let st = state.lock().unwrap();
1972		// Task scheduled for future (2 AM), not executed yet
1973		// The important thing is that chaining cron + retry works
1974		assert_eq!(st.len(), 0);
1975	}
1976
1977	// ===== Cron Schedule Tests =====
1978
1979	#[test]
1980	fn test_cron_to_string() {
1981		// Test that to_cron_string returns the original expression
1982		let cron = CronSchedule::parse("*/5 * * * *").unwrap();
1983		assert_eq!(cron.to_cron_string(), "*/5 * * * *");
1984	}
1985
1986	#[tokio::test]
1987	pub async fn test_running_task_not_double_scheduled() {
1988		let _ = tracing_subscriber::fmt().try_init();
1989
1990		let task_store: Arc<dyn TaskStore<State>> = InMemoryTaskStore::new();
1991		let state: State = Arc::new(Mutex::new(Vec::new()));
1992		let scheduler = Scheduler::new(task_store);
1993		scheduler.start(state.clone());
1994		scheduler.register::<TestTask>().unwrap();
1995
1996		// Create a task
1997		let task = TestTask::new(5); // Takes 1 second (5 * 200ms)
1998		let task_id = scheduler.add(task.clone()).await.unwrap();
1999
2000		// Wait a bit for task to start running
2001		tokio::time::sleep(std::time::Duration::from_millis(100)).await;
2002
2003		// Verify task is in tasks_running
2004		{
2005			let running = scheduler.tasks_running.lock().unwrap();
2006			assert!(running.contains_key(&task_id), "Task should be in running queue");
2007		}
2008
2009		// Try to add the same task again via add_queue
2010		let task_meta = TaskMeta {
2011			task: task.clone(),
2012			next_at: Some(Timestamp::now()),
2013			deps: vec![],
2014			retry_count: 0,
2015			retry: None,
2016			cron: None,
2017		};
2018		let result = scheduler.add_queue(task_id, task_meta).await;
2019
2020		// Should succeed but not actually add to scheduled queue
2021		assert!(result.is_ok(), "add_queue should succeed");
2022
2023		// Verify task is NOT in tasks_scheduled (only in running)
2024		{
2025			let sched_queue = scheduler.tasks_scheduled.lock().unwrap();
2026			let in_scheduled = sched_queue.iter().any(|((_, id), _)| *id == task_id);
2027			assert!(!in_scheduled, "Task should NOT be in scheduled queue while running");
2028		}
2029
2030		// Wait for original task to complete
2031		tokio::time::sleep(std::time::Duration::from_secs(2)).await;
2032
2033		// Verify task completed
2034		let st = state.lock().unwrap();
2035		assert_eq!(st.len(), 1, "Only one task execution should have occurred");
2036		assert_eq!(st[0], 5);
2037	}
2038
2039	#[tokio::test]
2040	pub async fn test_running_task_metadata_updated() {
2041		let _ = tracing_subscriber::fmt().try_init();
2042
2043		let task_store: Arc<dyn TaskStore<State>> = InMemoryTaskStore::new();
2044		let state: State = Arc::new(Mutex::new(Vec::new()));
2045		let scheduler = Scheduler::new(task_store);
2046		scheduler.start(state.clone());
2047		scheduler.register::<TestTask>().unwrap();
2048
2049		// Create a task without cron
2050		let task = TestTask::new(5); // Takes 1 second (5 * 200ms)
2051		let task_id = scheduler.add(task.clone()).await.unwrap();
2052
2053		// Wait a bit for task to start running
2054		tokio::time::sleep(std::time::Duration::from_millis(100)).await;
2055
2056		// Verify task is running and has no cron
2057		{
2058			let running = scheduler.tasks_running.lock().unwrap();
2059			let meta = running.get(&task_id).expect("Task should be running");
2060			assert!(meta.cron.is_none(), "Task should have no cron initially");
2061		}
2062
2063		// Try to update the running task with a cron schedule
2064		let cron = CronSchedule::parse("*/5 * * * *").unwrap();
2065		let task_meta_with_cron = TaskMeta {
2066			task: task.clone(),
2067			next_at: Some(Timestamp::now()),
2068			deps: vec![],
2069			retry_count: 0,
2070			retry: None,
2071			cron: Some(cron.clone()),
2072		};
2073		let result = scheduler.add_queue(task_id, task_meta_with_cron).await;
2074
2075		// Should succeed
2076		assert!(result.is_ok(), "add_queue should succeed");
2077
2078		// Verify the running task now has the cron schedule
2079		{
2080			let running = scheduler.tasks_running.lock().unwrap();
2081			let meta = running.get(&task_id).expect("Task should still be running");
2082			assert!(meta.cron.is_some(), "Task should now have cron after update");
2083		}
2084
2085		// Wait for task to complete
2086		tokio::time::sleep(std::time::Duration::from_secs(2)).await;
2087	}
2088}
2089
2090// vim: ts=4