Skip to main content

cloudillo_core/
scheduler.rs

1// SPDX-FileCopyrightText: Szilárd Hajba
2// SPDX-License-Identifier: LGPL-3.0-or-later
3
4//! Scheduler subsystem. Handles async tasks, dependencies, fallbacks, repetitions, persistence..
5
6use async_trait::async_trait;
7use itertools::Itertools;
8use std::{
9	collections::{BTreeMap, HashMap},
10	fmt::Debug,
11	sync::{Arc, Mutex, RwLock},
12};
13
14use chrono::{DateTime, Utc};
15use croner::Cron;
16use std::str::FromStr;
17
18use crate::prelude::*;
19use cloudillo_types::{lock, meta_adapter};
20
21pub type TaskId = u64;
22
23pub enum TaskType {
24	Periodic,
25	Once,
26}
27
28/// Cron schedule wrapper using the croner crate
29/// Stores the expression string for serialization
30#[derive(Debug, Clone)]
31pub struct CronSchedule {
32	/// The original cron expression string
33	expr: Box<str>,
34	/// Parsed cron object
35	cron: Cron,
36}
37
38impl CronSchedule {
39	/// Parse a cron expression (5 fields: minute hour day month weekday)
40	pub fn parse(expr: &str) -> ClResult<Self> {
41		let cron = Cron::from_str(expr)
42			.map_err(|e| Error::ValidationError(format!("invalid cron expression: {}", e)))?;
43		Ok(Self { expr: expr.into(), cron })
44	}
45
46	/// Calculate the next execution time after the given timestamp
47	///
48	/// Returns an error if no next occurrence can be found (should be rare
49	/// for valid expressions within reasonable time bounds).
50	pub fn next_execution(&self, after: Timestamp) -> ClResult<Timestamp> {
51		let dt = DateTime::<Utc>::from_timestamp(after.0, 0).unwrap_or_else(Utc::now);
52
53		self.cron
54			.find_next_occurrence(&dt, false)
55			.map(|next| Timestamp(next.timestamp()))
56			.map_err(|e| {
57				tracing::error!("Failed to find next cron occurrence for '{}': {}", self.expr, e);
58				Error::ValidationError(format!("cron next_execution failed: {}", e))
59			})
60	}
61
62	/// Convert back to cron expression string
63	pub fn to_cron_string(&self) -> String {
64		self.expr.to_string()
65	}
66}
67
68impl PartialEq for CronSchedule {
69	fn eq(&self, other: &Self) -> bool {
70		self.expr == other.expr
71	}
72}
73
74impl Eq for CronSchedule {}
75
76#[async_trait]
77pub trait Task<S: Clone>: Send + Sync + Debug {
78	fn kind() -> &'static str
79	where
80		Self: Sized;
81	fn build(id: TaskId, context: &str) -> ClResult<Arc<dyn Task<S>>>
82	where
83		Self: Sized;
84	fn serialize(&self) -> String;
85	async fn run(&self, state: &S) -> ClResult<()>;
86
87	fn kind_of(&self) -> &'static str;
88
89	/// Called when the task transitions to `Failed` after exhausting retries
90	/// (or on the very first failure when no retry policy is set). Lets the
91	/// task perform irreversible cleanup — e.g. mark a related domain row as
92	/// permanently failed — that should not happen on retryable failures.
93	/// Default: no-op.
94	async fn on_failed(&self, _state: &S, _attempts: u16, _last_error: &str) {}
95}
96
97#[derive(Debug)]
98pub enum TaskStatus {
99	Pending,
100	Completed,
101	Failed,
102}
103
104pub struct TaskData {
105	id: TaskId,
106	kind: Box<str>,
107	status: TaskStatus,
108	input: Box<str>,
109	deps: Box<[TaskId]>,
110	retry_data: Option<Box<str>>,
111	cron_data: Option<Box<str>>,
112	next_at: Option<Timestamp>,
113}
114
115#[async_trait]
116pub trait TaskStore<S: Clone>: Send + Sync {
117	async fn add(&self, task: &TaskMeta<S>, key: Option<&str>) -> ClResult<TaskId>;
118	async fn finished(&self, id: TaskId, output: &str) -> ClResult<()>;
119	async fn load(&self) -> ClResult<Vec<TaskData>>;
120	async fn update_task_error(
121		&self,
122		task_id: TaskId,
123		output: &str,
124		next_at: Option<Timestamp>,
125	) -> ClResult<()>;
126	async fn find_by_key(&self, key: &str) -> ClResult<Option<(TaskId, TaskData)>>;
127	async fn update_task(&self, id: TaskId, task: &TaskMeta<S>) -> ClResult<()>;
128	async fn find_completed_deps(&self, deps: &[TaskId]) -> ClResult<Vec<TaskId>>;
129}
130
131// InMemoryTaskStore
132//*******************
133pub struct InMemoryTaskStore {
134	last_id: Mutex<TaskId>,
135}
136
137impl InMemoryTaskStore {
138	pub fn new() -> Arc<Self> {
139		Arc::new(Self { last_id: Mutex::new(0) })
140	}
141}
142
143#[async_trait]
144impl<S: Clone> TaskStore<S> for InMemoryTaskStore {
145	async fn add(&self, _task: &TaskMeta<S>, _key: Option<&str>) -> ClResult<TaskId> {
146		let mut last_id = lock!(self.last_id)?;
147		*last_id += 1;
148		Ok(*last_id)
149	}
150
151	async fn finished(&self, _id: TaskId, _output: &str) -> ClResult<()> {
152		Ok(())
153	}
154
155	async fn load(&self) -> ClResult<Vec<TaskData>> {
156		Ok(vec![])
157	}
158
159	async fn update_task_error(
160		&self,
161		_task_id: TaskId,
162		_output: &str,
163		_next_at: Option<Timestamp>,
164	) -> ClResult<()> {
165		Ok(())
166	}
167
168	async fn find_by_key(&self, _key: &str) -> ClResult<Option<(TaskId, TaskData)>> {
169		// In-memory store doesn't support persistence or keys
170		Ok(None)
171	}
172
173	async fn update_task(&self, _id: TaskId, _task: &TaskMeta<S>) -> ClResult<()> {
174		// In-memory store doesn't support persistence
175		Ok(())
176	}
177
178	async fn find_completed_deps(&self, _deps: &[TaskId]) -> ClResult<Vec<TaskId>> {
179		Ok(vec![])
180	}
181}
182
183// MetaAdapterTaskStore
184//**********************
185pub struct MetaAdapterTaskStore {
186	meta_adapter: Arc<dyn meta_adapter::MetaAdapter>,
187}
188
189impl MetaAdapterTaskStore {
190	pub fn new(meta_adapter: Arc<dyn meta_adapter::MetaAdapter>) -> Arc<Self> {
191		Arc::new(Self { meta_adapter })
192	}
193}
194
195#[async_trait]
196impl<S: Clone> TaskStore<S> for MetaAdapterTaskStore {
197	async fn add(&self, task: &TaskMeta<S>, key: Option<&str>) -> ClResult<TaskId> {
198		let id = self
199			.meta_adapter
200			.create_task(task.task.kind_of(), key, &task.task.serialize(), &task.deps)
201			.await?;
202
203		// Store cron schedule if present
204		if let Some(cron) = &task.cron {
205			self.meta_adapter
206				.update_task(
207					id,
208					&meta_adapter::TaskPatch {
209						cron: Patch::Value(cron.to_cron_string()),
210						..Default::default()
211					},
212				)
213				.await?;
214		}
215
216		Ok(id)
217	}
218
219	async fn finished(&self, id: TaskId, output: &str) -> ClResult<()> {
220		self.meta_adapter.update_task_finished(id, output).await
221	}
222
223	async fn load(&self) -> ClResult<Vec<TaskData>> {
224		let tasks = self.meta_adapter.list_tasks(meta_adapter::ListTaskOptions::default()).await?;
225		let tasks = tasks
226			.into_iter()
227			.map(|t| TaskData {
228				id: t.task_id,
229				kind: t.kind,
230				status: match t.status {
231					'P' => TaskStatus::Pending,
232					'F' => TaskStatus::Completed,
233					// 'E' or unknown status = Failed
234					_ => TaskStatus::Failed,
235				},
236				input: t.input,
237				deps: t.deps,
238				retry_data: t.retry,
239				cron_data: t.cron,
240				next_at: t.next_at,
241			})
242			.collect();
243		Ok(tasks)
244	}
245
246	async fn update_task_error(
247		&self,
248		task_id: TaskId,
249		output: &str,
250		next_at: Option<Timestamp>,
251	) -> ClResult<()> {
252		self.meta_adapter.update_task_error(task_id, output, next_at).await
253	}
254
255	async fn find_by_key(&self, key: &str) -> ClResult<Option<(TaskId, TaskData)>> {
256		let task_opt = self.meta_adapter.find_task_by_key(key).await?;
257
258		match task_opt {
259			Some(t) => Ok(Some((
260				t.task_id,
261				TaskData {
262					id: t.task_id,
263					kind: t.kind,
264					status: match t.status {
265						'P' => TaskStatus::Pending,
266						'F' => TaskStatus::Completed,
267						// 'E' or unknown status = Failed
268						_ => TaskStatus::Failed,
269					},
270					input: t.input,
271					deps: t.deps,
272					retry_data: t.retry,
273					cron_data: t.cron,
274					next_at: t.next_at,
275				},
276			))),
277			None => Ok(None),
278		}
279	}
280
281	async fn update_task(&self, id: TaskId, task: &TaskMeta<S>) -> ClResult<()> {
282		use cloudillo_types::types::Patch;
283
284		// Build TaskPatch from TaskMeta
285		let mut patch = meta_adapter::TaskPatch {
286			input: Patch::Value(task.task.serialize()),
287			next_at: match task.next_at {
288				Some(ts) => Patch::Value(ts),
289				None => Patch::Null,
290			},
291			..Default::default()
292		};
293
294		// Update deps
295		if !task.deps.is_empty() {
296			patch.deps = Patch::Value(task.deps.clone());
297		}
298
299		// Update retry policy
300		if let Some(ref retry) = task.retry {
301			let retry_str = format!(
302				"{},{},{},{}",
303				task.retry_count, retry.wait_min_max.0, retry.wait_min_max.1, retry.times
304			);
305			patch.retry = Patch::Value(retry_str);
306		}
307
308		// Update cron schedule
309		if let Some(ref cron) = task.cron {
310			patch.cron = Patch::Value(cron.to_cron_string());
311		}
312
313		self.meta_adapter.update_task(id, &patch).await
314	}
315
316	async fn find_completed_deps(&self, deps: &[TaskId]) -> ClResult<Vec<TaskId>> {
317		self.meta_adapter.find_completed_deps(deps).await
318	}
319}
320
321// Task metadata
322type TaskBuilder<S> = dyn Fn(TaskId, &str) -> ClResult<Arc<dyn Task<S>>> + Send + Sync;
323
324#[derive(Debug, Clone)]
325pub struct RetryPolicy {
326	wait_min_max: (u64, u64),
327	times: u16,
328}
329
330impl Default for RetryPolicy {
331	fn default() -> Self {
332		Self { wait_min_max: (60, 3600), times: 10 }
333	}
334}
335
336impl RetryPolicy {
337	/// Create a new RetryPolicy with custom min/max backoff and number of retries
338	pub fn new(wait_min_max: (u64, u64), times: u16) -> Self {
339		Self { wait_min_max, times }
340	}
341
342	/// Calculate exponential backoff in seconds: min * (2^attempt), capped at max
343	pub fn calculate_backoff(&self, attempt_count: u16) -> u64 {
344		let (min, max) = self.wait_min_max;
345		let backoff = min * (1u64 << u64::from(attempt_count));
346		backoff.min(max)
347	}
348
349	/// Check if we should continue retrying
350	pub fn should_retry(&self, attempt_count: u16) -> bool {
351		attempt_count < self.times
352	}
353}
354
355// TaskSchedulerBuilder - Fluent API for task scheduling
356//************************************************************
357pub struct TaskSchedulerBuilder<'a, S: Clone> {
358	scheduler: &'a Scheduler<S>,
359	task: Arc<dyn Task<S>>,
360	key: Option<String>,
361	next_at: Option<Timestamp>,
362	deps: Vec<TaskId>,
363	retry: Option<RetryPolicy>,
364	cron: Option<CronSchedule>,
365	run_on_startup: bool,
366}
367
368impl<'a, S: Clone + Send + Sync + 'static> TaskSchedulerBuilder<'a, S> {
369	/// Create a new builder for scheduling a task
370	fn new(scheduler: &'a Scheduler<S>, task: Arc<dyn Task<S>>) -> Self {
371		Self {
372			scheduler,
373			task,
374			key: None,
375			next_at: None,
376			deps: Vec::new(),
377			retry: None,
378			cron: None,
379			run_on_startup: false,
380		}
381	}
382
383	/// Set a string key for task identification
384	pub fn key(mut self, key: impl Into<String>) -> Self {
385		self.key = Some(key.into());
386		self
387	}
388
389	/// Schedule for a specific absolute timestamp
390	pub fn schedule_at(mut self, timestamp: Timestamp) -> Self {
391		self.next_at = Some(timestamp);
392		self
393	}
394
395	/// Schedule after a relative delay (in seconds)
396	pub fn schedule_after(mut self, seconds: i64) -> Self {
397		self.next_at = Some(Timestamp::from_now(seconds));
398		self
399	}
400
401	/// Add task dependencies - task waits for all of these to complete
402	pub fn depend_on(mut self, deps: Vec<TaskId>) -> Self {
403		self.deps = deps;
404		self
405	}
406
407	/// Add a single task dependency
408	pub fn depends_on(mut self, dep: TaskId) -> Self {
409		self.deps.push(dep);
410		self
411	}
412
413	/// Enable automatic retry with exponential backoff
414	pub fn with_retry(mut self, policy: RetryPolicy) -> Self {
415		self.retry = Some(policy);
416		self
417	}
418
419	// ===== Cron Scheduling Methods =====
420
421	/// Schedule task with cron expression
422	/// Example: `.cron("0 9 * * *")` for 9 AM daily
423	pub fn cron(mut self, expr: impl Into<String>) -> Self {
424		if let Ok(cron_schedule) = CronSchedule::parse(&expr.into()) {
425			// Calculate initial next_at from cron schedule
426			// Use .ok() - cron was just parsed successfully, should never fail
427			self.next_at = cron_schedule.next_execution(Timestamp::now()).ok();
428			self.cron = Some(cron_schedule);
429		}
430		self
431	}
432
433	/// Schedule task daily at specified time
434	/// Example: `.daily_at(2, 30)` for 2:30 AM daily
435	pub fn daily_at(mut self, hour: u8, minute: u8) -> Self {
436		if hour <= 23 && minute <= 59 {
437			let expr = format!("{} {} * * *", minute, hour);
438			if let Ok(cron_schedule) = CronSchedule::parse(&expr) {
439				// Calculate initial next_at from cron schedule
440				// Use .ok() - cron was just parsed successfully, should never fail
441				self.next_at = cron_schedule.next_execution(Timestamp::now()).ok();
442				self.cron = Some(cron_schedule);
443			}
444		}
445		self
446	}
447
448	/// Schedule task weekly at specified day and time
449	/// Example: `.weekly_at(1, 14, 30)` for Mondays at 2:30 PM
450	/// weekday: 0=Sunday, 1=Monday, ..., 6=Saturday
451	pub fn weekly_at(mut self, weekday: u8, hour: u8, minute: u8) -> Self {
452		if weekday <= 6 && hour <= 23 && minute <= 59 {
453			let expr = format!("{} {} * * {}", minute, hour, weekday);
454			if let Ok(cron_schedule) = CronSchedule::parse(&expr) {
455				// Calculate initial next_at from cron schedule
456				// Use .ok() - cron was just parsed successfully, should never fail
457				self.next_at = cron_schedule.next_execution(Timestamp::now()).ok();
458				self.cron = Some(cron_schedule);
459			}
460		}
461		self
462	}
463
464	/// Opt-in: if this is a cron task and a scheduled run was missed
465	/// while the server was down (or this is the first time the task
466	/// is being registered), run it once immediately on startup before
467	/// resuming the normal cron schedule.
468	pub fn run_on_startup(mut self) -> Self {
469		self.run_on_startup = true;
470		self
471	}
472
473	/// Execute the scheduled task immediately
474	pub async fn now(self) -> ClResult<TaskId> {
475		self.schedule().await
476	}
477
478	/// Execute the scheduled task at a specific timestamp
479	pub async fn at(mut self, ts: Timestamp) -> ClResult<TaskId> {
480		self.next_at = Some(ts);
481		self.schedule().await
482	}
483
484	/// Execute the scheduled task after a delay (in seconds)
485	pub async fn after(mut self, seconds: i64) -> ClResult<TaskId> {
486		self.next_at = Some(Timestamp::from_now(seconds));
487		self.schedule().await
488	}
489
490	/// Execute the scheduled task after another task completes
491	pub async fn after_task(mut self, dep: TaskId) -> ClResult<TaskId> {
492		self.deps.push(dep);
493		self.schedule().await
494	}
495
496	/// Execute the scheduled task with automatic retry using default policy
497	pub async fn with_automatic_retry(mut self) -> ClResult<TaskId> {
498		self.retry = Some(RetryPolicy::default());
499		self.schedule().await
500	}
501
502	/// Execute the task with all configured options - main terminal method
503	pub async fn schedule(self) -> ClResult<TaskId> {
504		self.scheduler
505			.schedule_task_impl(
506				self.task,
507				self.key.as_deref(),
508				self.next_at,
509				if self.deps.is_empty() { None } else { Some(self.deps) },
510				self.retry,
511				self.cron,
512				self.run_on_startup,
513			)
514			.await
515	}
516}
517
518#[derive(Debug, Clone)]
519pub struct TaskMeta<S: Clone> {
520	pub task: Arc<dyn Task<S>>,
521	pub next_at: Option<Timestamp>,
522	pub deps: Vec<TaskId>,
523	retry_count: u16,
524	pub retry: Option<RetryPolicy>,
525	pub cron: Option<CronSchedule>,
526}
527
528type TaskBuilderRegistry<S> = HashMap<&'static str, Box<TaskBuilder<S>>>;
529type ScheduledTaskMap<S> = BTreeMap<(Timestamp, TaskId), TaskMeta<S>>;
530
531// Scheduler
532#[derive(Clone)]
533pub struct Scheduler<S: Clone> {
534	task_builders: Arc<RwLock<TaskBuilderRegistry<S>>>,
535	store: Arc<dyn TaskStore<S>>,
536	tasks_running: Arc<Mutex<HashMap<TaskId, TaskMeta<S>>>>,
537	tasks_waiting: Arc<Mutex<HashMap<TaskId, TaskMeta<S>>>>,
538	task_dependents: Arc<Mutex<HashMap<TaskId, Vec<TaskId>>>>,
539	tasks_scheduled: Arc<Mutex<ScheduledTaskMap<S>>>,
540	tx_finish: flume::Sender<TaskId>,
541	rx_finish: flume::Receiver<TaskId>,
542	notify_schedule: Arc<tokio::sync::Notify>,
543}
544
545impl<S: Clone + Send + Sync + 'static> Scheduler<S> {
546	pub fn new(store: Arc<dyn TaskStore<S>>) -> Arc<Self> {
547		let (tx_finish, rx_finish) = flume::unbounded();
548
549		let scheduler = Self {
550			task_builders: Arc::new(RwLock::new(HashMap::new())),
551			store,
552			tasks_running: Arc::new(Mutex::new(HashMap::new())),
553			tasks_waiting: Arc::new(Mutex::new(HashMap::new())),
554			task_dependents: Arc::new(Mutex::new(HashMap::new())),
555			tasks_scheduled: Arc::new(Mutex::new(BTreeMap::new())),
556			tx_finish,
557			rx_finish,
558			notify_schedule: Arc::new(tokio::sync::Notify::new()),
559		};
560
561		//scheduler.run(rx_finish)?;
562
563		Arc::new(scheduler)
564	}
565
566	pub fn start(&self, state: S) {
567		// Handle finished tasks and dependencies
568		let schedule = self.clone();
569		let stat = state.clone();
570		let rx_finish = self.rx_finish.clone();
571
572		tokio::spawn(async move {
573			while let Ok(id) = rx_finish.recv_async().await {
574				debug!("Completed task {} (notified)", id);
575
576				// Get task metadata WITHOUT removing - we only remove after successful transition
577				let task_meta_opt = {
578					let tasks_running = match schedule.tasks_running.lock() {
579						Ok(guard) => guard,
580						Err(poisoned) => {
581							error!("Mutex poisoned: tasks_running (recovering)");
582							poisoned.into_inner()
583						}
584					};
585					tasks_running.get(&id).cloned()
586				};
587
588				if let Some(task_meta) = task_meta_opt {
589					// Track if transition was successful
590					let mut transition_ok = false;
591
592					// Check if this is a recurring task with cron schedule
593					if let Some(ref cron) = task_meta.cron {
594						// Calculate next execution time
595						let next_at = match cron.next_execution(Timestamp::now()) {
596							Ok(ts) => ts,
597							Err(e) => {
598								error!(
599									"Failed to calculate next execution for recurring task {}: {} - task will not reschedule",
600									id, e
601								);
602								// Mark as finished since we can't reschedule
603								if let Err(e) = schedule.store.finished(id, "").await {
604									error!("Failed to mark task {} as finished: {}", id, e);
605								}
606								continue;
607							}
608						};
609						info!(
610							"Recurring task {} completed, scheduling next execution at {}",
611							id, next_at
612						);
613
614						// Update task with new next_at
615						let mut updated_meta = task_meta.clone();
616						updated_meta.next_at = Some(next_at);
617
618						// Update database with new next_at (keep status as Pending)
619						if let Err(e) = schedule.store.update_task(id, &updated_meta).await {
620							error!("Failed to update recurring task {} next_at: {}", id, e);
621						}
622
623						// Remove from running BEFORE add_queue (so add_queue doesn't see it as running)
624						match schedule.tasks_running.lock() {
625							Ok(mut tasks_running) => {
626								tasks_running.remove(&id);
627							}
628							Err(poisoned) => {
629								error!("Mutex poisoned: tasks_running (recovering)");
630								poisoned.into_inner().remove(&id);
631							}
632						}
633
634						// Re-add to scheduler with new execution time
635						match schedule.add_queue(id, updated_meta).await {
636							Ok(_) => transition_ok = true,
637							Err(e) => {
638								error!(
639									"Failed to reschedule recurring task {}: {} - task lost!",
640									id, e
641								);
642							}
643						}
644					} else {
645						// One-time task - mark as finished
646						match schedule.store.finished(id, "").await {
647							Ok(()) => transition_ok = true,
648							Err(e) => {
649								error!(
650									"Failed to mark task {} as finished: {} - task remains in running queue",
651									id, e
652								);
653							}
654						}
655					}
656
657					// Only remove from running queue after successful transition
658					if transition_ok {
659						match schedule.tasks_running.lock() {
660							Ok(mut tasks_running) => {
661								tasks_running.remove(&id);
662							}
663							Err(poisoned) => {
664								error!("Mutex poisoned: tasks_running (recovering)");
665								poisoned.into_inner().remove(&id);
666							}
667						}
668					}
669
670					// Handle dependencies of finished task using atomic release method
671					match schedule.release_dependents(id) {
672						Ok(ready_to_spawn) => {
673							for (dep_id, dep_task_meta) in ready_to_spawn {
674								// Add to running queue before spawning
675								match schedule.tasks_running.lock() {
676									Ok(mut tasks_running) => {
677										tasks_running.insert(dep_id, dep_task_meta.clone());
678									}
679									Err(poisoned) => {
680										error!("Mutex poisoned: tasks_running (recovering)");
681										poisoned.into_inner().insert(dep_id, dep_task_meta.clone());
682									}
683								}
684								schedule.spawn_task(
685									stat.clone(),
686									dep_task_meta.task.clone(),
687									dep_id,
688									dep_task_meta,
689								);
690							}
691						}
692						Err(e) => {
693							error!("Failed to release dependents of task {}: {}", id, e);
694						}
695					}
696				} else {
697					warn!("Completed task {} not found in running queue", id);
698				}
699			}
700		});
701
702		// Handle scheduled tasks
703		let schedule = self.clone();
704		tokio::spawn(async move {
705			loop {
706				let is_empty = match schedule.tasks_scheduled.lock() {
707					Ok(guard) => guard.is_empty(),
708					Err(poisoned) => {
709						error!("Mutex poisoned: tasks_scheduled (recovering)");
710						poisoned.into_inner().is_empty()
711					}
712				};
713				if is_empty {
714					schedule.notify_schedule.notified().await;
715				}
716				let time = Timestamp::now();
717				if let Some((timestamp, _id)) = loop {
718					let mut tasks_scheduled = match schedule.tasks_scheduled.lock() {
719						Ok(guard) => guard,
720						Err(poisoned) => {
721							error!("Mutex poisoned: tasks_scheduled (recovering)");
722							poisoned.into_inner()
723						}
724					};
725					if let Some((&(timestamp, id), _)) = tasks_scheduled.first_key_value() {
726						let (timestamp, id) = (timestamp, id);
727						if timestamp <= Timestamp::now() {
728							debug!("Spawning task id {} (from schedule)", id);
729							if let Some(task) = tasks_scheduled.remove(&(timestamp, id)) {
730								let mut tasks_running = match schedule.tasks_running.lock() {
731									Ok(guard) => guard,
732									Err(poisoned) => {
733										error!("Mutex poisoned: tasks_running (recovering)");
734										poisoned.into_inner()
735									}
736								};
737								tasks_running.insert(id, task.clone());
738								schedule.spawn_task(state.clone(), task.task.clone(), id, task);
739							} else {
740								error!("Task disappeared while being removed from schedule");
741								break None;
742							}
743						} else {
744							break Some((timestamp, id));
745						}
746					} else {
747						break None;
748					}
749				} {
750					let diff = timestamp.0 - time.0;
751					let wait =
752						tokio::time::Duration::from_secs(u64::try_from(diff).unwrap_or_default());
753					tokio::select! {
754						() = tokio::time::sleep(wait) => (), () = schedule.notify_schedule.notified() => ()
755					};
756				}
757			}
758		});
759
760		let schedule = self.clone();
761		tokio::spawn(async move {
762			let _ignore_err = schedule.load().await;
763		});
764	}
765
766	fn register_builder(
767		&self,
768		name: &'static str,
769		builder: &'static TaskBuilder<S>,
770	) -> ClResult<&Self> {
771		let mut task_builders = self
772			.task_builders
773			.write()
774			.map_err(|_| Error::Internal("task_builders RwLock poisoned".into()))?;
775		task_builders.insert(name, Box::new(builder));
776		Ok(self)
777	}
778
779	pub fn register<T: Task<S>>(&self) -> ClResult<&Self> {
780		info!("Registering task type {}", T::kind());
781		self.register_builder(T::kind(), &|id: TaskId, params: &str| T::build(id, params))?;
782		Ok(self)
783	}
784
785	/// Create a builder for scheduling a task using the fluent API
786	pub fn task(&self, task: Arc<dyn Task<S>>) -> TaskSchedulerBuilder<'_, S> {
787		TaskSchedulerBuilder::new(self, task)
788	}
789
790	/// Internal method to schedule a task with all options
791	/// This is the core implementation used by the builder pattern
792	#[allow(clippy::too_many_arguments)]
793	async fn schedule_task_impl(
794		&self,
795		task: Arc<dyn Task<S>>,
796		key: Option<&str>,
797		next_at: Option<Timestamp>,
798		deps: Option<Vec<TaskId>>,
799		retry: Option<RetryPolicy>,
800		cron: Option<CronSchedule>,
801		run_on_startup: bool,
802	) -> ClResult<TaskId> {
803		// Look up any existing task by key once; reuse for both the
804		// run_on_startup decision and the dedup branch below.
805		let existing = if let Some(k) = key { self.store.find_by_key(k).await? } else { None };
806
807		// Resolve effective next_at, factoring in run_on_startup for cron tasks.
808		let effective_next_at = if run_on_startup && cron.is_some() {
809			match &existing {
810				Some((_existing_id, existing_data)) => {
811					// Task exists from a previous run. If its persisted
812					// next_at has already passed (or is missing), we missed
813					// a run while down — fire now. Otherwise honor the
814					// persisted future schedule.
815					match existing_data.next_at {
816						Some(persisted) if persisted > Timestamp::now() => next_at,
817						_ => Some(Timestamp::now()),
818					}
819				}
820				None => Some(Timestamp::now()), // fresh registration → run now
821			}
822		} else {
823			next_at
824		};
825
826		let task_meta = TaskMeta {
827			task: task.clone(),
828			next_at: effective_next_at,
829			deps: deps.clone().unwrap_or_default(),
830			retry_count: 0,
831			retry,
832			cron,
833		};
834
835		// Check if a task with this key already exists (key-based deduplication)
836		if let Some(key) = key
837			&& let Some((existing_id, existing_data)) = existing
838		{
839			let new_serialized = task.serialize();
840			let existing_serialized = existing_data.input.as_ref();
841
842			// Compare serialized parameters
843			if new_serialized == existing_serialized {
844				info!(
845					"Recurring task '{}' already exists with identical parameters (id={})",
846					key, existing_id
847				);
848				// Update DB with current cron/next_at (may differ from what's stored)
849				self.store.update_task(existing_id, &task_meta).await?;
850				// Ensure the existing task is queued (may be loaded from DB but not yet in queue)
851				self.add_queue(existing_id, task_meta).await?;
852				return Ok(existing_id);
853			}
854			info!("Updating recurring task '{}' (id={}) - parameters changed", key, existing_id);
855			debug!("  Old params: {}", existing_serialized);
856			debug!("  New params: {}", new_serialized);
857
858			// Remove from all queues (if present)
859			self.remove_from_queues(existing_id)?;
860
861			// Update the task in database with new parameters
862			self.store.update_task(existing_id, &task_meta).await?;
863
864			// Re-add to appropriate queue with updated parameters
865			self.add_queue(existing_id, task_meta).await?;
866
867			return Ok(existing_id);
868		}
869
870		// No existing task - create new one
871		let id = self.store.add(&task_meta, key).await?;
872		self.add_queue(id, task_meta).await
873	}
874
875	pub async fn add(&self, task: Arc<dyn Task<S>>) -> ClResult<TaskId> {
876		self.task(task).now().await
877	}
878
879	pub async fn add_queue(&self, id: TaskId, task_meta: TaskMeta<S>) -> ClResult<TaskId> {
880		// If task is already running, update its metadata (especially for cron updates)
881		// but don't add to scheduled queue (it will reschedule on completion)
882		{
883			let mut running = lock!(self.tasks_running, "tasks_running")?;
884			if let Some(existing_meta) = running.get_mut(&id) {
885				debug!(
886					"Task {} is already running, updating metadata (will reschedule on completion)",
887					id
888				);
889				// Update the running task's metadata so it has the latest cron schedule
890				*existing_meta = task_meta;
891				return Ok(id);
892			}
893		}
894
895		// Remove from other queues if present (prevents duplicate entries with different timestamps)
896		{
897			let mut scheduled = lock!(self.tasks_scheduled, "tasks_scheduled")?;
898			if let Some(key) = scheduled
899				.iter()
900				.find(|((_, tid), _)| *tid == id)
901				.map(|((ts, tid), _)| (*ts, *tid))
902			{
903				scheduled.remove(&key);
904				debug!("Removed existing scheduled entry for task {} before re-queueing", id);
905			}
906		}
907		{
908			let mut waiting = lock!(self.tasks_waiting, "tasks_waiting")?;
909			if waiting.remove(&id).is_some() {
910				debug!("Removed existing waiting entry for task {} before re-queueing", id);
911			}
912		}
913
914		let deps = task_meta.deps.clone();
915
916		// VALIDATION: Tasks with dependencies should NEVER be in tasks_scheduled
917		if !deps.is_empty() && task_meta.next_at.is_some() {
918			warn!(
919				"Task {} has both dependencies and scheduled time - ignoring next_at, placing in waiting queue",
920				id
921			);
922			// Force to tasks_waiting instead
923			lock!(self.tasks_waiting, "tasks_waiting")?.insert(id, task_meta);
924			debug!("Task {} is waiting for {:?}", id, &deps);
925			for dep in &deps {
926				lock!(self.task_dependents, "task_dependents")?
927					.entry(*dep)
928					.or_default()
929					.push(id);
930			}
931
932			self.check_and_resolve_completed_deps(id, &deps).await?;
933			return Ok(id);
934		}
935
936		if deps.is_empty() && task_meta.next_at.unwrap_or(Timestamp(0)) < Timestamp::now() {
937			debug!("Spawning task {}", id);
938			lock!(self.tasks_scheduled, "tasks_scheduled")?.insert((Timestamp(0), id), task_meta);
939			self.notify_schedule.notify_one();
940		} else if let Some(next_at) = task_meta.next_at {
941			debug!("Scheduling task {} for {}", id, next_at);
942			lock!(self.tasks_scheduled, "tasks_scheduled")?.insert((next_at, id), task_meta);
943			self.notify_schedule.notify_one();
944		} else {
945			lock!(self.tasks_waiting, "tasks_waiting")?.insert(id, task_meta);
946			debug!("Task {} is waiting for {:?}", id, &deps);
947			for dep in &deps {
948				lock!(self.task_dependents, "task_dependents")?
949					.entry(*dep)
950					.or_default()
951					.push(id);
952			}
953
954			self.check_and_resolve_completed_deps(id, &deps).await?;
955		}
956		Ok(id)
957	}
958
959	/// After registering deps, check if any completed in the meantime.
960	/// If all deps are satisfied, move the task from waiting → scheduled.
961	async fn check_and_resolve_completed_deps(&self, id: TaskId, deps: &[TaskId]) -> ClResult<()> {
962		let completed_deps = self.store.find_completed_deps(deps).await?;
963		if completed_deps.is_empty() {
964			return Ok(());
965		}
966		let mut waiting = lock!(self.tasks_waiting, "tasks_waiting")?;
967		if let Some(task_meta) = waiting.get_mut(&id) {
968			for dep in &completed_deps {
969				task_meta.deps.retain(|d| *d != *dep);
970			}
971			if task_meta.deps.is_empty()
972				&& let Some(ready_task) = waiting.remove(&id)
973			{
974				drop(waiting);
975				let mut dependents = lock!(self.task_dependents, "task_dependents")?;
976				for dep in deps {
977					if let Some(dep_list) = dependents.get_mut(dep) {
978						dep_list.retain(|d| *d != id);
979						if dep_list.is_empty() {
980							dependents.remove(dep);
981						}
982					}
983				}
984				drop(dependents);
985				debug!("Task {} deps already completed, scheduling immediately", id);
986				lock!(self.tasks_scheduled, "tasks_scheduled")?
987					.insert((Timestamp(0), id), ready_task);
988				self.notify_schedule.notify_one();
989			}
990		}
991		Ok(())
992	}
993
994	/// Remove a task from all internal queues (waiting, scheduled, running)
995	/// Returns the removed TaskMeta if found
996	fn remove_from_queues(&self, task_id: TaskId) -> ClResult<Option<TaskMeta<S>>> {
997		// Try tasks_waiting
998		if let Some(task_meta) = lock!(self.tasks_waiting, "tasks_waiting")?.remove(&task_id) {
999			debug!("Removed task {} from waiting queue for update", task_id);
1000			return Ok(Some(task_meta));
1001		}
1002
1003		// Try tasks_scheduled (need to find by task_id in BTreeMap)
1004		{
1005			let mut scheduled = lock!(self.tasks_scheduled, "tasks_scheduled")?;
1006			if let Some(key) = scheduled
1007				.iter()
1008				.find(|((_, id), _)| *id == task_id)
1009				.map(|((ts, id), _)| (*ts, *id))
1010				&& let Some(task_meta) = scheduled.remove(&key)
1011			{
1012				debug!("Removed task {} from scheduled queue for update", task_id);
1013				return Ok(Some(task_meta));
1014			}
1015		}
1016
1017		// Try tasks_running (should rarely happen, but handle it)
1018		if let Some(task_meta) = lock!(self.tasks_running, "tasks_running")?.remove(&task_id) {
1019			warn!("Removed task {} from running queue during update", task_id);
1020			return Ok(Some(task_meta));
1021		}
1022
1023		Ok(None)
1024	}
1025
1026	/// Release all dependent tasks of a completed task
1027	/// This method safely handles dependency cleanup and spawning
1028	fn release_dependents(
1029		&self,
1030		completed_task_id: TaskId,
1031	) -> ClResult<Vec<(TaskId, TaskMeta<S>)>> {
1032		// Get list of dependents (atomic removal to prevent re-processing)
1033		let dependents = {
1034			let mut deps_map = lock!(self.task_dependents, "task_dependents")?;
1035			deps_map.remove(&completed_task_id).unwrap_or_default()
1036		};
1037
1038		if dependents.is_empty() {
1039			return Ok(Vec::new()); // No dependents to release
1040		}
1041
1042		debug!("Releasing {} dependents of completed task {}", dependents.len(), completed_task_id);
1043
1044		let mut ready_to_spawn = Vec::new();
1045
1046		// For each dependent, check and remove dependency
1047		for dependent_id in dependents {
1048			// Try tasks_waiting first (most common case for dependent tasks)
1049			{
1050				let mut waiting = lock!(self.tasks_waiting, "tasks_waiting")?;
1051				if let Some(task_meta) = waiting.get_mut(&dependent_id) {
1052					// Remove the completed task from dependencies
1053					task_meta.deps.retain(|x| *x != completed_task_id);
1054
1055					// If all dependencies are cleared, remove and queue for spawning
1056					if task_meta.deps.is_empty() {
1057						if let Some(task_to_spawn) = waiting.remove(&dependent_id) {
1058							debug!(
1059								"Dependent task {} ready to spawn (all dependencies cleared)",
1060								dependent_id
1061							);
1062							ready_to_spawn.push((dependent_id, task_to_spawn));
1063						}
1064					} else {
1065						debug!(
1066							"Task {} still has {} remaining dependencies",
1067							dependent_id,
1068							task_meta.deps.len()
1069						);
1070					}
1071					continue;
1072				}
1073			}
1074
1075			// Try tasks_scheduled if not in waiting (shouldn't happen with validation, but be defensive)
1076			{
1077				let mut scheduled = lock!(self.tasks_scheduled, "tasks_scheduled")?;
1078				if let Some(scheduled_key) = scheduled
1079					.iter()
1080					.find(|((_, id), _)| *id == dependent_id)
1081					.map(|((ts, id), _)| (*ts, *id))
1082				{
1083					if let Some(task_meta) = scheduled.get_mut(&scheduled_key) {
1084						task_meta.deps.retain(|x| *x != completed_task_id);
1085						let remaining = task_meta.deps.len();
1086						if remaining == 0 {
1087							debug!(
1088								"Task {} in scheduled queue has no remaining dependencies",
1089								dependent_id
1090							);
1091						} else {
1092							debug!(
1093								"Task {} in scheduled queue has {} remaining dependencies",
1094								dependent_id, remaining
1095							);
1096						}
1097					}
1098					continue;
1099				}
1100			}
1101
1102			// Task not found in any queue
1103			warn!(
1104				"Dependent task {} of completed task {} not found in any queue",
1105				dependent_id, completed_task_id
1106			);
1107		}
1108
1109		Ok(ready_to_spawn)
1110	}
1111
1112	async fn load(&self) -> ClResult<()> {
1113		let tasks = self.store.load().await?;
1114		debug!("Loaded {} tasks from store", tasks.len());
1115		for t in tasks {
1116			if let TaskStatus::Pending = t.status {
1117				debug!("Loading task {} {}", t.id, t.kind);
1118				let task = {
1119					let builder_map = self
1120						.task_builders
1121						.read()
1122						.map_err(|_| Error::Internal("task_builders RwLock poisoned".into()))?;
1123					let builder = builder_map.get(t.kind.as_ref()).ok_or(Error::Internal(
1124						format!("task builder not registered: {}", t.kind),
1125					))?;
1126					builder(t.id, &t.input)?
1127				};
1128				let (retry_count, retry) = match t.retry_data {
1129					Some(retry_str) => {
1130						let (retry_count, retry_min, retry_max, retry_times) = retry_str
1131							.split(',')
1132							.collect_tuple()
1133							.ok_or(Error::Internal("invalid retry policy format".into()))?;
1134						let retry_count: u16 = retry_count
1135							.parse()
1136							.map_err(|_| Error::Internal("retry count must be u16".into()))?;
1137						let retry = RetryPolicy {
1138							wait_min_max: (
1139								retry_min
1140									.parse()
1141									.map_err(|_| Error::Internal("retry_min must be u64".into()))?,
1142								retry_max
1143									.parse()
1144									.map_err(|_| Error::Internal("retry_max must be u64".into()))?,
1145							),
1146							times: retry_times
1147								.parse()
1148								.map_err(|_| Error::Internal("retry times must be u64".into()))?,
1149						};
1150						debug!("Loaded retry policy: {:?}", retry);
1151						(retry_count, Some(retry))
1152					}
1153					_ => (0, None),
1154				};
1155				// Parse cron data if present
1156				let cron =
1157					t.cron_data.as_ref().and_then(|cron_str| CronSchedule::parse(cron_str).ok());
1158
1159				let task_meta = TaskMeta {
1160					task,
1161					next_at: t.next_at,
1162					deps: t.deps.into(),
1163					retry_count,
1164					retry,
1165					cron,
1166				};
1167				self.add_queue(t.id, task_meta).await?;
1168			}
1169		}
1170		Ok(())
1171	}
1172
1173	fn spawn_task(&self, state: S, task: Arc<dyn Task<S>>, id: TaskId, task_meta: TaskMeta<S>) {
1174		let tx_finish = self.tx_finish.clone();
1175		let store = self.store.clone();
1176		let scheduler = self.clone();
1177		//let state = self.state.clone();
1178		tokio::spawn(async move {
1179			match task.run(&state).await {
1180				Ok(()) => {
1181					debug!("Task {} completed successfully", id);
1182					tx_finish.send(id).unwrap_or(());
1183				}
1184				Err(e) => {
1185					let is_retryable = e.is_retryable();
1186					if let Some(retry_policy) = &task_meta.retry {
1187						if is_retryable && retry_policy.should_retry(task_meta.retry_count) {
1188							let backoff = retry_policy.calculate_backoff(task_meta.retry_count);
1189							let next_at = Timestamp::from_now(backoff.cast_signed());
1190
1191							info!(
1192								"Task {} failed (attempt {}/{}). Scheduling retry in {} seconds: {}",
1193								id,
1194								task_meta.retry_count + 1,
1195								retry_policy.times,
1196								backoff,
1197								e
1198							);
1199
1200							// Update database with error and reschedule
1201							store
1202								.update_task_error(id, &e.to_string(), Some(next_at))
1203								.await
1204								.unwrap_or(());
1205
1206							// Remove from running tasks (we're not sending finish event)
1207							match scheduler.tasks_running.lock() {
1208								Ok(mut tasks_running) => {
1209									tasks_running.remove(&id);
1210								}
1211								Err(poisoned) => {
1212									error!("Mutex poisoned: tasks_running (recovering)");
1213									poisoned.into_inner().remove(&id);
1214								}
1215							}
1216
1217							// Re-queue task with incremented retry count
1218							let mut retry_meta = task_meta.clone();
1219							retry_meta.retry_count += 1;
1220							retry_meta.next_at = Some(next_at);
1221							scheduler.add_queue(id, retry_meta).await.unwrap_or(id);
1222						} else {
1223							// Max retries exhausted OR error is permanent
1224							if is_retryable {
1225								error!(
1226									"Task {} failed after {} retries: {}",
1227									id, task_meta.retry_count, e
1228								);
1229							} else {
1230								error!("Task {} failed permanently (non-retryable): {}", id, e);
1231							}
1232							store.update_task_error(id, &e.to_string(), None).await.unwrap_or(());
1233							task.on_failed(&state, task_meta.retry_count, &e.to_string()).await;
1234							tx_finish.send(id).unwrap_or(());
1235						}
1236					} else {
1237						// No retry policy - fail immediately
1238						error!("Task {} failed: {}", id, e);
1239						store.update_task_error(id, &e.to_string(), None).await.unwrap_or(());
1240						task.on_failed(&state, 0, &e.to_string()).await;
1241						tx_finish.send(id).unwrap_or(());
1242					}
1243				}
1244			}
1245		});
1246	}
1247
1248	/// Get health status of the scheduler
1249	/// Returns information about tasks in each queue and detects anomalies
1250	pub async fn health_check(&self) -> ClResult<SchedulerHealth> {
1251		let waiting_count = lock!(self.tasks_waiting, "tasks_waiting")?.len();
1252		let scheduled_count = lock!(self.tasks_scheduled, "tasks_scheduled")?.len();
1253		let running_count = lock!(self.tasks_running, "tasks_running")?.len();
1254		let dependents_count = lock!(self.task_dependents, "task_dependents")?.len();
1255
1256		// Check for anomalies
1257		let mut stuck_tasks = Vec::new();
1258		let mut tasks_with_missing_deps = Vec::new();
1259
1260		// Check tasks_waiting for tasks with no dependencies (stuck)
1261		{
1262			let waiting = lock!(self.tasks_waiting, "tasks_waiting")?;
1263			let _deps_map = lock!(self.task_dependents, "task_dependents")?;
1264
1265			for (id, task_meta) in waiting.iter() {
1266				if task_meta.deps.is_empty() {
1267					stuck_tasks.push(*id);
1268					warn!("SCHEDULER HEALTH: Task {} in waiting with no dependencies", id);
1269				} else {
1270					// Check if all dependencies still exist
1271					for dep in &task_meta.deps {
1272						let dep_exists = waiting.contains_key(dep)
1273							|| self.tasks_running.lock().ok().is_some_and(|r| r.contains_key(dep))
1274							|| self
1275								.tasks_scheduled
1276								.lock()
1277								.ok()
1278								.is_some_and(|s| s.iter().any(|((_, task_id), _)| task_id == dep));
1279
1280						if !dep_exists {
1281							tasks_with_missing_deps.push((*id, *dep));
1282							warn!(
1283								"SCHEDULER HEALTH: Task {} depends on non-existent task {}",
1284								id, dep
1285							);
1286						}
1287					}
1288				}
1289			}
1290		}
1291
1292		Ok(SchedulerHealth {
1293			waiting: waiting_count,
1294			scheduled: scheduled_count,
1295			running: running_count,
1296			dependents: dependents_count,
1297			stuck_tasks,
1298			tasks_with_missing_deps,
1299		})
1300	}
1301}
1302
1303/// Health status of the scheduler
1304#[derive(Debug, Clone)]
1305pub struct SchedulerHealth {
1306	/// Number of tasks waiting for dependencies
1307	pub waiting: usize,
1308	/// Number of tasks scheduled for future execution
1309	pub scheduled: usize,
1310	/// Number of tasks currently running
1311	pub running: usize,
1312	/// Number of task entries in dependents map
1313	pub dependents: usize,
1314	/// IDs of tasks with no dependencies but still in waiting queue
1315	pub stuck_tasks: Vec<TaskId>,
1316	/// Pairs of (task_id, missing_dependency_id) where dependency doesn't exist
1317	pub tasks_with_missing_deps: Vec<(TaskId, TaskId)>,
1318}
1319
1320#[cfg(test)]
1321mod tests {
1322	use super::*;
1323	use serde::{Deserialize, Serialize};
1324
1325	type State = Arc<Mutex<Vec<u8>>>;
1326
1327	#[derive(Debug, Serialize, Deserialize)]
1328	struct TestTask {
1329		num: u8,
1330	}
1331
1332	impl TestTask {
1333		pub fn new(num: u8) -> Arc<Self> {
1334			Arc::new(Self { num })
1335		}
1336	}
1337
1338	#[async_trait]
1339	impl Task<State> for TestTask {
1340		fn kind() -> &'static str {
1341			"test"
1342		}
1343
1344		fn build(_id: TaskId, ctx: &str) -> ClResult<Arc<dyn Task<State>>> {
1345			let num: u8 = ctx
1346				.parse()
1347				.map_err(|_| Error::Internal("test task context must be u8".into()))?;
1348			let task = TestTask::new(num);
1349			Ok(task)
1350		}
1351
1352		fn serialize(&self) -> String {
1353			self.num.to_string()
1354		}
1355
1356		fn kind_of(&self) -> &'static str {
1357			"test"
1358		}
1359
1360		async fn run(&self, state: &State) -> ClResult<()> {
1361			info!("Running task {}", self.num);
1362			tokio::time::sleep(std::time::Duration::from_millis(200 * u64::from(self.num))).await;
1363			info!("Completed task {}", self.num);
1364			state.lock().unwrap().push(self.num);
1365			Ok(())
1366		}
1367	}
1368
1369	#[derive(Debug, Clone)]
1370	struct FailingTask {
1371		id: u8,
1372		fail_count: u8,
1373		attempt: Arc<Mutex<u8>>,
1374	}
1375
1376	impl FailingTask {
1377		pub fn new(id: u8, fail_count: u8) -> Arc<Self> {
1378			Arc::new(Self { id, fail_count, attempt: Arc::new(Mutex::new(0)) })
1379		}
1380	}
1381
1382	#[async_trait]
1383	impl Task<State> for FailingTask {
1384		fn kind() -> &'static str {
1385			"failing"
1386		}
1387
1388		fn build(_id: TaskId, ctx: &str) -> ClResult<Arc<dyn Task<State>>> {
1389			let parts: Vec<&str> = ctx.split(',').collect();
1390			if parts.len() != 2 {
1391				return Err(Error::Internal("failing task context must have 2 parts".into()));
1392			}
1393			let id: u8 = parts[0]
1394				.parse()
1395				.map_err(|_| Error::Internal("failing task id must be u8".into()))?;
1396			let fail_count: u8 = parts[1]
1397				.parse()
1398				.map_err(|_| Error::Internal("failing task fail_count must be u8".into()))?;
1399			Ok(FailingTask::new(id, fail_count))
1400		}
1401
1402		fn serialize(&self) -> String {
1403			format!("{},{}", self.id, self.fail_count)
1404		}
1405
1406		fn kind_of(&self) -> &'static str {
1407			"failing"
1408		}
1409
1410		async fn run(&self, state: &State) -> ClResult<()> {
1411			let mut attempt = self.attempt.lock().unwrap();
1412			*attempt += 1;
1413			let current_attempt = *attempt;
1414
1415			info!("FailingTask {} - attempt {}/{}", self.id, current_attempt, self.fail_count + 1);
1416
1417			if current_attempt <= self.fail_count {
1418				error!("FailingTask {} failed on attempt {}", self.id, current_attempt);
1419				return Err(Error::ServiceUnavailable(format!("Task {} failed", self.id)));
1420			}
1421
1422			info!("FailingTask {} succeeded on attempt {}", self.id, current_attempt);
1423			state.lock().unwrap().push(self.id);
1424			Ok(())
1425		}
1426	}
1427
1428	#[tokio::test]
1429	pub async fn test_scheduler() {
1430		let _ = tracing_subscriber::fmt().try_init();
1431
1432		let task_store: Arc<dyn TaskStore<State>> = InMemoryTaskStore::new();
1433		let state: State = Arc::new(Mutex::new(Vec::new()));
1434		let scheduler = Scheduler::new(task_store);
1435		scheduler.start(state.clone());
1436		scheduler.register::<TestTask>().unwrap();
1437
1438		let _task1 = TestTask::new(1);
1439		let task2 = TestTask::new(1);
1440		let task3 = TestTask::new(1);
1441
1442		let task2_id = scheduler.task(task2).schedule_after(2).schedule().await.unwrap();
1443		let task3_id = scheduler.add(task3).await.unwrap();
1444		scheduler
1445			.task(TestTask::new(1))
1446			.depend_on(vec![task2_id, task3_id])
1447			.schedule()
1448			.await
1449			.unwrap();
1450
1451		tokio::time::sleep(std::time::Duration::from_secs(4)).await;
1452		let task4 = TestTask::new(1);
1453		let task5 = TestTask::new(1);
1454		scheduler.task(task4).schedule_after(2).schedule().await.unwrap();
1455		scheduler.task(task5).schedule_after(1).schedule().await.unwrap();
1456
1457		tokio::time::sleep(std::time::Duration::from_secs(3)).await;
1458
1459		let st = state.lock().unwrap();
1460		info!("res: {}", st.len());
1461		let str_vec = st.iter().map(std::string::ToString::to_string).collect::<Vec<String>>();
1462		assert_eq!(str_vec.join(":"), "1:1:1:1:1");
1463	}
1464
1465	#[tokio::test]
1466	pub async fn test_retry_with_backoff() {
1467		let _ = tracing_subscriber::fmt().try_init();
1468
1469		let task_store: Arc<dyn TaskStore<State>> = InMemoryTaskStore::new();
1470		let state: State = Arc::new(Mutex::new(Vec::new()));
1471		let scheduler = Scheduler::new(task_store);
1472		scheduler.start(state.clone());
1473		scheduler.register::<FailingTask>().unwrap();
1474
1475		// Create a task that fails twice, then succeeds
1476		// With retry policy: min=1s, max=3600s, max_attempts=3
1477		let failing_task = FailingTask::new(42, 2);
1478		let retry_policy = RetryPolicy { wait_min_max: (1, 3600), times: 3 };
1479
1480		scheduler.task(failing_task).with_retry(retry_policy).schedule().await.unwrap();
1481
1482		// Wait for retries: 1s (1st fail) + 1s (2nd fail) + time for success
1483		// First attempt: immediate fail
1484		// Wait 1s (min backoff)
1485		// Second attempt: fail
1486		// Wait 2s (min * 2)
1487		// Third attempt: success
1488		tokio::time::sleep(std::time::Duration::from_secs(6)).await;
1489
1490		let st = state.lock().unwrap();
1491		assert_eq!(st.len(), 1, "Task should have succeeded after retries");
1492		assert_eq!(st[0], 42);
1493	}
1494
1495	// ===== Builder Pattern Tests =====
1496
1497	#[tokio::test]
1498	pub async fn test_builder_simple_schedule() {
1499		let task_store: Arc<dyn TaskStore<State>> = InMemoryTaskStore::new();
1500		let state: State = Arc::new(Mutex::new(Vec::new()));
1501		let scheduler = Scheduler::new(task_store);
1502		scheduler.start(state.clone());
1503		scheduler.register::<TestTask>().unwrap();
1504
1505		// Test basic builder usage: .now()
1506		let task = TestTask::new(1);
1507		let id = scheduler.task(task).now().await.unwrap();
1508
1509		assert!(id > 0, "Task ID should be positive");
1510
1511		tokio::time::sleep(std::time::Duration::from_millis(500)).await;
1512
1513		let st = state.lock().unwrap();
1514		assert_eq!(st.len(), 1, "Task should have executed");
1515		assert_eq!(st[0], 1);
1516	}
1517
1518	#[tokio::test]
1519	pub async fn test_builder_with_key() {
1520		let task_store: Arc<dyn TaskStore<State>> = InMemoryTaskStore::new();
1521		let state: State = Arc::new(Mutex::new(Vec::new()));
1522		let scheduler = Scheduler::new(task_store);
1523		scheduler.start(state.clone());
1524		scheduler.register::<TestTask>().unwrap();
1525
1526		// Test builder with key
1527		let task = TestTask::new(1);
1528		let _id = scheduler.task(task).key("my-task-key").now().await.unwrap();
1529
1530		tokio::time::sleep(std::time::Duration::from_millis(500)).await;
1531
1532		let st = state.lock().unwrap();
1533		assert_eq!(st.len(), 1);
1534		assert_eq!(st[0], 1);
1535	}
1536
1537	#[tokio::test]
1538	pub async fn test_builder_with_delay() {
1539		let task_store: Arc<dyn TaskStore<State>> = InMemoryTaskStore::new();
1540		let state: State = Arc::new(Mutex::new(Vec::new()));
1541		let scheduler = Scheduler::new(task_store);
1542		scheduler.start(state.clone());
1543		scheduler.register::<TestTask>().unwrap();
1544
1545		// Test builder with .after() convenience method
1546		let task = TestTask::new(1);
1547		let _id = scheduler
1548			.task(task)
1549			.after(1)  // 1 second delay
1550			.await
1551			.unwrap();
1552
1553		// Should not have executed yet
1554		tokio::time::sleep(std::time::Duration::from_millis(500)).await;
1555		{
1556			let st = state.lock().unwrap();
1557			assert_eq!(st.len(), 0, "Task should not execute yet");
1558		}
1559
1560		// Wait for execution (1 sec delay + 200ms task sleep + buffer)
1561		tokio::time::sleep(std::time::Duration::from_millis(800)).await;
1562
1563		{
1564			let st = state.lock().unwrap();
1565			assert_eq!(st.len(), 1, "Task should have executed");
1566			assert_eq!(st[0], 1);
1567		}
1568	}
1569
1570	#[tokio::test]
1571	pub async fn test_builder_with_dependencies() {
1572		let task_store: Arc<dyn TaskStore<State>> = InMemoryTaskStore::new();
1573		let state: State = Arc::new(Mutex::new(Vec::new()));
1574		let scheduler = Scheduler::new(task_store);
1575		scheduler.start(state.clone());
1576		scheduler.register::<TestTask>().unwrap();
1577
1578		// Create first task (sleeps 200ms)
1579		let task1 = TestTask::new(1);
1580		let id1 = scheduler.task(task1).now().await.unwrap();
1581
1582		// Create second task (sleeps 400ms)
1583		let task2 = TestTask::new(1);
1584		let id2 = scheduler.task(task2).now().await.unwrap();
1585
1586		// Create third task that depends on first two (sleeps 600ms)
1587		let task3 = TestTask::new(1);
1588		let _id3 = scheduler.task(task3).depend_on(vec![id1, id2]).schedule().await.unwrap();
1589
1590		// Wait for all tasks: task1 200ms, task2 400ms, task3 600ms = ~1200ms
1591		tokio::time::sleep(std::time::Duration::from_millis(1500)).await;
1592
1593		let st = state.lock().unwrap();
1594		// Should have all three tasks in execution order: 1 finishes first (200ms), then 2 (200ms), then 3 (200ms after both)
1595		let str_vec = st.iter().map(std::string::ToString::to_string).collect::<Vec<String>>();
1596		assert_eq!(str_vec.join(":"), "1:1:1");
1597	}
1598
1599	#[tokio::test]
1600	pub async fn test_builder_with_retry() {
1601		let task_store: Arc<dyn TaskStore<State>> = InMemoryTaskStore::new();
1602		let state: State = Arc::new(Mutex::new(Vec::new()));
1603		let scheduler = Scheduler::new(task_store);
1604		scheduler.start(state.clone());
1605		scheduler.register::<FailingTask>().unwrap();
1606
1607		// Create task using builder with retry policy
1608		let failing_task = FailingTask::new(55, 1); // Fails once, succeeds second time
1609		let retry_policy = RetryPolicy { wait_min_max: (1, 3600), times: 3 };
1610
1611		let _id = scheduler.task(failing_task).with_retry(retry_policy).schedule().await.unwrap();
1612
1613		// Wait for retry cycle: 1 fail + 1s wait + 1 success
1614		tokio::time::sleep(std::time::Duration::from_secs(3)).await;
1615
1616		let st = state.lock().unwrap();
1617		assert_eq!(st.len(), 1);
1618		assert_eq!(st[0], 55);
1619	}
1620
1621	#[tokio::test]
1622	pub async fn test_builder_with_automatic_retry() {
1623		let task_store: Arc<dyn TaskStore<State>> = InMemoryTaskStore::new();
1624		let state: State = Arc::new(Mutex::new(Vec::new()));
1625		let scheduler = Scheduler::new(task_store);
1626		scheduler.start(state.clone());
1627		scheduler.register::<FailingTask>().unwrap();
1628
1629		// Create task using builder with automatic retry (default policy)
1630		let failing_task = FailingTask::new(66, 1);
1631		let _id = scheduler.task(failing_task).with_automatic_retry().await.unwrap();
1632
1633		// Wait for retry cycle with default policy (min=60s would be too long for test)
1634		// but we already tested retry logic thoroughly, just verify builder integration
1635		tokio::time::sleep(std::time::Duration::from_millis(500)).await;
1636
1637		// The important part is that this compiles and integrates correctly
1638		let st = state.lock().unwrap();
1639		// With default policy (min=60s), task shouldn't succeed in test timeframe
1640		// Just verify builder chaining works
1641		let _ = st.len(); // Verify state is accessible, but don't assert on timeout-dependent result
1642	}
1643
1644	#[tokio::test]
1645	pub async fn test_builder_fluent_chaining() {
1646		let task_store: Arc<dyn TaskStore<State>> = InMemoryTaskStore::new();
1647		let state: State = Arc::new(Mutex::new(Vec::new()));
1648		let scheduler = Scheduler::new(task_store);
1649		scheduler.start(state.clone());
1650		scheduler.register::<TestTask>().unwrap();
1651
1652		// Create first dependencies
1653		let dep1 = scheduler.task(TestTask::new(1)).now().await.unwrap();
1654		let dep2 = scheduler.task(TestTask::new(1)).now().await.unwrap();
1655
1656		// Test fluent chaining with multiple methods
1657		let retry_policy = RetryPolicy { wait_min_max: (1, 3600), times: 3 };
1658
1659		let task = TestTask::new(1);
1660		let _id = scheduler
1661			.task(task)
1662			.key("complex-task")
1663			.schedule_after(0)  // Schedule immediately
1664			.depend_on(vec![dep1, dep2])
1665			.with_retry(retry_policy)
1666			.schedule()
1667			.await
1668			.unwrap();
1669
1670		tokio::time::sleep(std::time::Duration::from_millis(800)).await;
1671
1672		let st = state.lock().unwrap();
1673		// Should have all tasks: 20:10 (immediate deps) then 30 (after deps)
1674		let str_vec = st.iter().map(std::string::ToString::to_string).collect::<Vec<String>>();
1675		assert_eq!(str_vec.join(":"), "1:1:1");
1676	}
1677
1678	#[tokio::test]
1679	pub async fn test_builder_backward_compatibility() {
1680		let task_store: Arc<dyn TaskStore<State>> = InMemoryTaskStore::new();
1681		let state: State = Arc::new(Mutex::new(Vec::new()));
1682		let scheduler = Scheduler::new(task_store);
1683		scheduler.start(state.clone());
1684		scheduler.register::<TestTask>().unwrap();
1685
1686		// Test that old API still works
1687		let _id1 = scheduler.add(TestTask::new(1)).await.unwrap();
1688
1689		// Test that new builder API works
1690		let _id2 = scheduler.task(TestTask::new(1)).now().await.unwrap();
1691
1692		tokio::time::sleep(std::time::Duration::from_millis(800)).await;
1693
1694		let st = state.lock().unwrap();
1695		// Both old and new API should have executed
1696		assert_eq!(st.len(), 2);
1697		let str_vec = st.iter().map(std::string::ToString::to_string).collect::<Vec<String>>();
1698		assert_eq!(str_vec.join(":"), "1:1");
1699	}
1700
1701	// ===== Phase 2: Integration Tests - Real-world scenarios =====
1702
1703	#[tokio::test]
1704	pub async fn test_builder_pipeline_scenario() {
1705		// Simulates: Task 1 -> Task 2 (depends on 1) -> Task 3 (depends on 2)
1706		let task_store: Arc<dyn TaskStore<State>> = InMemoryTaskStore::new();
1707		let state: State = Arc::new(Mutex::new(Vec::new()));
1708		let scheduler = Scheduler::new(task_store);
1709		scheduler.start(state.clone());
1710		scheduler.register::<TestTask>().unwrap();
1711
1712		// Stage 1: Create initial task
1713		let id1 = scheduler.task(TestTask::new(1)).key("stage-1").now().await.unwrap();
1714
1715		// Stage 2: Create task that depends on stage 1
1716		let id2 = scheduler.task(TestTask::new(1)).key("stage-2").after_task(id1).await.unwrap();
1717
1718		// Stage 3: Create task that depends on stage 2
1719		let _id3 = scheduler.task(TestTask::new(1)).key("stage-3").after_task(id2).await.unwrap();
1720
1721		// Wait for pipeline: 1(200ms) + 2(200ms) + 3(200ms) = 600ms
1722		tokio::time::sleep(std::time::Duration::from_millis(1200)).await;
1723
1724		let st = state.lock().unwrap();
1725		// Should execute in order: 1, 2, 3
1726		let str_vec = st.iter().map(std::string::ToString::to_string).collect::<Vec<String>>();
1727		assert_eq!(str_vec.join(":"), "1:1:1");
1728	}
1729
1730	#[tokio::test]
1731	pub async fn test_builder_multi_dependency_join() {
1732		// Simulates: Task 1 parallel with Task 2, then Task 3 waits for both
1733		let task_store: Arc<dyn TaskStore<State>> = InMemoryTaskStore::new();
1734		let state: State = Arc::new(Mutex::new(Vec::new()));
1735		let scheduler = Scheduler::new(task_store);
1736		scheduler.start(state.clone());
1737		scheduler.register::<TestTask>().unwrap();
1738
1739		// Parallel tasks
1740		let id1 = scheduler.task(TestTask::new(1)).now().await.unwrap();
1741		let id2 = scheduler.task(TestTask::new(1)).now().await.unwrap();
1742
1743		// Join task - waits for both
1744		let _id3 = scheduler
1745			.task(TestTask::new(1))
1746			.depend_on(vec![id1, id2])
1747			.schedule()
1748			.await
1749			.unwrap();
1750
1751		tokio::time::sleep(std::time::Duration::from_secs(1)).await;
1752
1753		let st = state.lock().unwrap();
1754		// 1 and 2 execute in parallel, then 3 executes after both
1755		let str_vec = st.iter().map(std::string::ToString::to_string).collect::<Vec<String>>();
1756		assert_eq!(str_vec.join(":"), "1:1:1");
1757	}
1758
1759	#[tokio::test]
1760	pub async fn test_builder_scheduled_task_with_dependencies() {
1761		// Simulates: Task depends on earlier task AND is scheduled for future time
1762		let task_store: Arc<dyn TaskStore<State>> = InMemoryTaskStore::new();
1763		let state: State = Arc::new(Mutex::new(Vec::new()));
1764		let scheduler = Scheduler::new(task_store);
1765		scheduler.start(state.clone());
1766		scheduler.register::<TestTask>().unwrap();
1767
1768		// Immediate task
1769		let dep_id = scheduler.task(TestTask::new(1)).now().await.unwrap();
1770
1771		// Task that waits for dependency AND scheduled delay
1772		let ts = Timestamp::from_now(1);
1773		let _task_id = scheduler
1774			.task(TestTask::new(1))
1775			.schedule_at(ts)
1776			.depend_on(vec![dep_id])
1777			.schedule()
1778			.await
1779			.unwrap();
1780
1781		// Wait for dependency to complete but before scheduled time
1782		tokio::time::sleep(std::time::Duration::from_millis(300)).await;
1783		{
1784			let st = state.lock().unwrap();
1785			assert_eq!(st.len(), 1); // Only dependency executed
1786		}
1787
1788		// Wait for scheduled time (1s total from initial schedule)
1789		tokio::time::sleep(std::time::Duration::from_millis(800)).await;
1790
1791		{
1792			let st = state.lock().unwrap();
1793			let str_vec = st.iter().map(std::string::ToString::to_string).collect::<Vec<String>>();
1794			assert_eq!(str_vec.join(":"), "1:1");
1795		}
1796	}
1797
1798	#[tokio::test]
1799	pub async fn test_builder_mixed_features() {
1800		// Simulates: Complex real-world scenario with key, scheduling, deps, and retry
1801		let task_store: Arc<dyn TaskStore<State>> = InMemoryTaskStore::new();
1802		let state: State = Arc::new(Mutex::new(Vec::new()));
1803		let scheduler = Scheduler::new(task_store);
1804		scheduler.start(state.clone());
1805		scheduler.register::<TestTask>().unwrap();
1806		scheduler.register::<FailingTask>().unwrap();
1807
1808		// Create initial tasks
1809		let id1 = scheduler.task(TestTask::new(1)).now().await.unwrap();
1810
1811		// Create complex task: scheduled + depends on id1 + has key
1812		let _id2 = scheduler
1813			.task(TestTask::new(1))
1814			.key("critical-task")
1815			.schedule_after(0)
1816			.depend_on(vec![id1])
1817			.schedule()
1818			.await
1819			.unwrap();
1820
1821		// Create task with retry
1822		let _id3 = scheduler
1823			.task(FailingTask::new(1, 0))  // Fails 0 times, succeeds immediately
1824			.key("retryable-task")
1825			.with_retry(RetryPolicy {
1826				wait_min_max: (1, 3600),
1827				times: 3,
1828			})
1829			.schedule()
1830			.await
1831			.unwrap();
1832
1833		// Wait for tasks: id1 (200ms) + id2 (200ms after id1) + id3 (200ms) = ~600ms
1834		tokio::time::sleep(std::time::Duration::from_millis(1200)).await;
1835
1836		let st = state.lock().unwrap();
1837		// All three tasks should execute
1838		let str_vec = st.iter().map(std::string::ToString::to_string).collect::<Vec<String>>();
1839		assert_eq!(str_vec.join(":"), "1:1:1");
1840	}
1841
1842	#[tokio::test]
1843	pub async fn test_builder_builder_reuse_not_possible() {
1844		// Verify that builder is consumed (moved) and can't be reused
1845		let task_store: Arc<dyn TaskStore<State>> = InMemoryTaskStore::new();
1846		let _state: State = Arc::new(Mutex::new(Vec::new()));
1847		let scheduler = Scheduler::new(task_store);
1848
1849		let task = TestTask::new(1);
1850		let builder = scheduler.task(task);
1851
1852		// This would not compile if uncommented (builder is moved):
1853		// let _id1 = builder.now().await.unwrap();
1854		// let _id2 = builder.now().await.unwrap();  // Error: use of moved value
1855
1856		// Can only call terminal method once
1857		let _id = builder.now().await.unwrap();
1858		// builder is now consumed, can't use again
1859
1860		// Test passes if it compiles (verifying move semantics)
1861	}
1862
1863	#[tokio::test]
1864	pub async fn test_builder_different_task_types() {
1865		// Test builder works with different task implementations
1866		let task_store: Arc<dyn TaskStore<State>> = InMemoryTaskStore::new();
1867		let state: State = Arc::new(Mutex::new(Vec::new()));
1868		let scheduler = Scheduler::new(task_store);
1869		scheduler.start(state.clone());
1870		scheduler.register::<TestTask>().unwrap();
1871		scheduler.register::<FailingTask>().unwrap();
1872
1873		// Mix of different task types
1874		let _id1 = scheduler.task(TestTask::new(1)).key("test-task").now().await.unwrap();
1875
1876		let _id2 = scheduler
1877			.task(FailingTask::new(1, 0))  // Won't fail
1878			.key("failing-task")
1879			.now()
1880			.await
1881			.unwrap();
1882
1883		let _id3 = scheduler.task(TestTask::new(1)).now().await.unwrap();
1884
1885		tokio::time::sleep(std::time::Duration::from_secs(1)).await;
1886
1887		let st = state.lock().unwrap();
1888		assert_eq!(st.len(), 3);
1889		let str_vec = st.iter().map(std::string::ToString::to_string).collect::<Vec<String>>();
1890		// All three tasks should execute
1891		assert_eq!(str_vec.join(":"), "1:1:1");
1892	}
1893
1894	// ===== Phase 3: Cron Placeholder Tests =====
1895	// These tests verify that cron methods compile and integrate
1896	// Actual cron functionality will be implemented in Phase 3
1897
1898	#[tokio::test]
1899	pub async fn test_builder_cron_placeholder_syntax() {
1900		// Verify cron placeholder methods compile and chain properly
1901		let task_store: Arc<dyn TaskStore<State>> = InMemoryTaskStore::new();
1902		let state: State = Arc::new(Mutex::new(Vec::new()));
1903		let scheduler = Scheduler::new(task_store);
1904		scheduler.start(state.clone());
1905		scheduler.register::<TestTask>().unwrap();
1906
1907		// Test that cron methods compile (they're no-ops in Phase 2)
1908		let task = TestTask::new(1);
1909		let _id = scheduler
1910			.task(task)
1911			.key("cron-task")
1912			.cron("0 9 * * *")  // 9 AM daily
1913			.schedule()
1914			.await
1915			.unwrap();
1916
1917		// Cron scheduling - task will execute at the next scheduled time
1918		// For cron "0 9 * * *", that's tomorrow at 9 AM, so task won't execute in this test
1919		// This test just verifies the methods compile and chain properly
1920		tokio::time::sleep(std::time::Duration::from_millis(500)).await;
1921
1922		let st = state.lock().unwrap();
1923		// Task is scheduled for future (9 AM), so it won't have executed yet
1924		// The important thing is that the cron methods compile and integrate
1925		assert_eq!(st.len(), 0); // Not executed yet since scheduled for future
1926	}
1927
1928	#[tokio::test]
1929	pub async fn test_builder_daily_at_placeholder() {
1930		// Verify daily_at placeholder compiles and integrates
1931		let task_store: Arc<dyn TaskStore<State>> = InMemoryTaskStore::new();
1932		let state: State = Arc::new(Mutex::new(Vec::new()));
1933		let scheduler = Scheduler::new(task_store);
1934		scheduler.start(state.clone());
1935		scheduler.register::<TestTask>().unwrap();
1936
1937		// Test that daily_at placeholder compiles
1938		let task = TestTask::new(1);
1939		let _id = scheduler
1940			.task(task)
1941			.key("daily-task")
1942			.daily_at(14, 30)  // 2:30 PM daily
1943			.schedule()
1944			.await
1945			.unwrap();
1946
1947		// Daily_at scheduling - task will execute at the specified time (2:30 PM daily)
1948		// Task is scheduled for future, so it won't execute in this test
1949		tokio::time::sleep(std::time::Duration::from_millis(500)).await;
1950
1951		let st = state.lock().unwrap();
1952		// Task is scheduled for future (2:30 PM), not executed yet
1953		// The important thing is that daily_at compiles and integrates properly
1954		assert_eq!(st.len(), 0);
1955	}
1956
1957	#[tokio::test]
1958	pub async fn test_builder_weekly_at_placeholder() {
1959		// Verify weekly_at placeholder compiles and integrates
1960		let task_store: Arc<dyn TaskStore<State>> = InMemoryTaskStore::new();
1961		let state: State = Arc::new(Mutex::new(Vec::new()));
1962		let scheduler = Scheduler::new(task_store);
1963		scheduler.start(state.clone());
1964		scheduler.register::<TestTask>().unwrap();
1965
1966		// Test that weekly_at placeholder compiles
1967		let task = TestTask::new(1);
1968		let _id = scheduler
1969			.task(task)
1970			.key("weekly-task")
1971			.weekly_at(1, 9, 0)  // Monday at 9 AM
1972			.schedule()
1973			.await
1974			.unwrap();
1975
1976		// Weekly_at scheduling - task will execute on Monday at 9 AM
1977		// Task is scheduled for future, so it won't execute in this test
1978		tokio::time::sleep(std::time::Duration::from_millis(500)).await;
1979
1980		let st = state.lock().unwrap();
1981		// Task is scheduled for future (Monday 9 AM), not executed yet
1982		// The important thing is that weekly_at compiles and integrates properly
1983		assert_eq!(st.len(), 0);
1984	}
1985
1986	#[tokio::test]
1987	pub async fn test_builder_cron_with_retry() {
1988		// Verify cron methods chain with retry (future combined usage)
1989		let task_store: Arc<dyn TaskStore<State>> = InMemoryTaskStore::new();
1990		let state: State = Arc::new(Mutex::new(Vec::new()));
1991		let scheduler = Scheduler::new(task_store);
1992		scheduler.start(state.clone());
1993		scheduler.register::<TestTask>().unwrap();
1994
1995		// Test future usage pattern: cron + retry
1996		let task = TestTask::new(1);
1997		let _id = scheduler
1998			.task(task)
1999			.key("reliable-scheduled-task")
2000			.daily_at(2, 0)  // 2 AM daily
2001			.with_retry(RetryPolicy {
2002				wait_min_max: (60, 3600),
2003				times: 5,
2004			})
2005			.schedule()
2006			.await
2007			.unwrap();
2008
2009		// Verify cron+retry chain compiles properly
2010		// Task is scheduled for 2 AM, so won't execute in this test
2011		tokio::time::sleep(std::time::Duration::from_millis(500)).await;
2012
2013		let st = state.lock().unwrap();
2014		// Task scheduled for future (2 AM), not executed yet
2015		// The important thing is that chaining cron + retry works
2016		assert_eq!(st.len(), 0);
2017	}
2018
2019	// ===== Cron Schedule Tests =====
2020
2021	#[test]
2022	fn test_cron_to_string() {
2023		// Test that to_cron_string returns the original expression
2024		let cron = CronSchedule::parse("*/5 * * * *").unwrap();
2025		assert_eq!(cron.to_cron_string(), "*/5 * * * *");
2026	}
2027
2028	#[tokio::test]
2029	pub async fn test_running_task_not_double_scheduled() {
2030		let _ = tracing_subscriber::fmt().try_init();
2031
2032		let task_store: Arc<dyn TaskStore<State>> = InMemoryTaskStore::new();
2033		let state: State = Arc::new(Mutex::new(Vec::new()));
2034		let scheduler = Scheduler::new(task_store);
2035		scheduler.start(state.clone());
2036		scheduler.register::<TestTask>().unwrap();
2037
2038		// Create a task
2039		let task = TestTask::new(5); // Takes 1 second (5 * 200ms)
2040		let task_id = scheduler.add(task.clone()).await.unwrap();
2041
2042		// Wait a bit for task to start running
2043		tokio::time::sleep(std::time::Duration::from_millis(100)).await;
2044
2045		// Verify task is in tasks_running
2046		{
2047			let running = scheduler.tasks_running.lock().unwrap();
2048			assert!(running.contains_key(&task_id), "Task should be in running queue");
2049		}
2050
2051		// Try to add the same task again via add_queue
2052		let task_meta = TaskMeta {
2053			task: task.clone(),
2054			next_at: Some(Timestamp::now()),
2055			deps: vec![],
2056			retry_count: 0,
2057			retry: None,
2058			cron: None,
2059		};
2060		let result = scheduler.add_queue(task_id, task_meta).await;
2061
2062		// Should succeed but not actually add to scheduled queue
2063		assert!(result.is_ok(), "add_queue should succeed");
2064
2065		// Verify task is NOT in tasks_scheduled (only in running)
2066		{
2067			let sched_queue = scheduler.tasks_scheduled.lock().unwrap();
2068			let in_scheduled = sched_queue.iter().any(|((_, id), _)| *id == task_id);
2069			assert!(!in_scheduled, "Task should NOT be in scheduled queue while running");
2070		}
2071
2072		// Wait for original task to complete
2073		tokio::time::sleep(std::time::Duration::from_secs(2)).await;
2074
2075		// Verify task completed
2076		let st = state.lock().unwrap();
2077		assert_eq!(st.len(), 1, "Only one task execution should have occurred");
2078		assert_eq!(st[0], 5);
2079	}
2080
2081	#[tokio::test]
2082	pub async fn test_running_task_metadata_updated() {
2083		let _ = tracing_subscriber::fmt().try_init();
2084
2085		let task_store: Arc<dyn TaskStore<State>> = InMemoryTaskStore::new();
2086		let state: State = Arc::new(Mutex::new(Vec::new()));
2087		let scheduler = Scheduler::new(task_store);
2088		scheduler.start(state.clone());
2089		scheduler.register::<TestTask>().unwrap();
2090
2091		// Create a task without cron
2092		let task = TestTask::new(5); // Takes 1 second (5 * 200ms)
2093		let task_id = scheduler.add(task.clone()).await.unwrap();
2094
2095		// Wait a bit for task to start running
2096		tokio::time::sleep(std::time::Duration::from_millis(100)).await;
2097
2098		// Verify task is running and has no cron
2099		{
2100			let running = scheduler.tasks_running.lock().unwrap();
2101			let meta = running.get(&task_id).expect("Task should be running");
2102			assert!(meta.cron.is_none(), "Task should have no cron initially");
2103		}
2104
2105		// Try to update the running task with a cron schedule
2106		let cron = CronSchedule::parse("*/5 * * * *").unwrap();
2107		let task_meta_with_cron = TaskMeta {
2108			task: task.clone(),
2109			next_at: Some(Timestamp::now()),
2110			deps: vec![],
2111			retry_count: 0,
2112			retry: None,
2113			cron: Some(cron.clone()),
2114		};
2115		let result = scheduler.add_queue(task_id, task_meta_with_cron).await;
2116
2117		// Should succeed
2118		assert!(result.is_ok(), "add_queue should succeed");
2119
2120		// Verify the running task now has the cron schedule
2121		{
2122			let running = scheduler.tasks_running.lock().unwrap();
2123			let meta = running.get(&task_id).expect("Task should still be running");
2124			assert!(meta.cron.is_some(), "Task should now have cron after update");
2125		}
2126
2127		// Wait for task to complete
2128		tokio::time::sleep(std::time::Duration::from_secs(2)).await;
2129	}
2130}
2131
2132// vim: ts=4