Skip to main content

cloudillo_core/
scheduler.rs

1//! Scheduler subsystem. Handles async tasks, dependencies, fallbacks, repetitions, persistence..
2
3use async_trait::async_trait;
4use itertools::Itertools;
5use std::{
6	collections::{BTreeMap, HashMap},
7	fmt::Debug,
8	sync::{Arc, Mutex, RwLock},
9};
10
11use chrono::{DateTime, Utc};
12use croner::Cron;
13use std::str::FromStr;
14
15use crate::prelude::*;
16use cloudillo_types::{lock, meta_adapter};
17
18pub type TaskId = u64;
19
20pub enum TaskType {
21	Periodic,
22	Once,
23}
24
25/// Cron schedule wrapper using the croner crate
26/// Stores the expression string for serialization
27#[derive(Debug, Clone)]
28pub struct CronSchedule {
29	/// The original cron expression string
30	expr: Box<str>,
31	/// Parsed cron object
32	cron: Cron,
33}
34
35impl CronSchedule {
36	/// Parse a cron expression (5 fields: minute hour day month weekday)
37	pub fn parse(expr: &str) -> ClResult<Self> {
38		let cron = Cron::from_str(expr)
39			.map_err(|e| Error::ValidationError(format!("invalid cron expression: {}", e)))?;
40		Ok(Self { expr: expr.into(), cron })
41	}
42
43	/// Calculate the next execution time after the given timestamp
44	///
45	/// Returns an error if no next occurrence can be found (should be rare
46	/// for valid expressions within reasonable time bounds).
47	pub fn next_execution(&self, after: Timestamp) -> ClResult<Timestamp> {
48		let dt = DateTime::<Utc>::from_timestamp(after.0, 0).unwrap_or_else(Utc::now);
49
50		self.cron
51			.find_next_occurrence(&dt, false)
52			.map(|next| Timestamp(next.timestamp()))
53			.map_err(|e| {
54				tracing::error!("Failed to find next cron occurrence for '{}': {}", self.expr, e);
55				Error::ValidationError(format!("cron next_execution failed: {}", e))
56			})
57	}
58
59	/// Convert back to cron expression string
60	pub fn to_cron_string(&self) -> String {
61		self.expr.to_string()
62	}
63}
64
65impl PartialEq for CronSchedule {
66	fn eq(&self, other: &Self) -> bool {
67		self.expr == other.expr
68	}
69}
70
71impl Eq for CronSchedule {}
72
73#[async_trait]
74pub trait Task<S: Clone>: Send + Sync + Debug {
75	fn kind() -> &'static str
76	where
77		Self: Sized;
78	fn build(id: TaskId, context: &str) -> ClResult<Arc<dyn Task<S>>>
79	where
80		Self: Sized;
81	fn serialize(&self) -> String;
82	async fn run(&self, state: &S) -> ClResult<()>;
83
84	fn kind_of(&self) -> &'static str;
85}
86
87#[derive(Debug)]
88pub enum TaskStatus {
89	Pending,
90	Completed,
91	Failed,
92}
93
94pub struct TaskData {
95	id: TaskId,
96	kind: Box<str>,
97	status: TaskStatus,
98	input: Box<str>,
99	deps: Box<[TaskId]>,
100	retry_data: Option<Box<str>>,
101	cron_data: Option<Box<str>>,
102	next_at: Option<Timestamp>,
103}
104
105#[async_trait]
106pub trait TaskStore<S: Clone>: Send + Sync {
107	async fn add(&self, task: &TaskMeta<S>, key: Option<&str>) -> ClResult<TaskId>;
108	async fn finished(&self, id: TaskId, output: &str) -> ClResult<()>;
109	async fn load(&self) -> ClResult<Vec<TaskData>>;
110	async fn update_task_error(
111		&self,
112		task_id: TaskId,
113		output: &str,
114		next_at: Option<Timestamp>,
115	) -> ClResult<()>;
116	async fn find_by_key(&self, key: &str) -> ClResult<Option<(TaskId, TaskData)>>;
117	async fn update_task(&self, id: TaskId, task: &TaskMeta<S>) -> ClResult<()>;
118}
119
120// InMemoryTaskStore
121//*******************
122pub struct InMemoryTaskStore {
123	last_id: Mutex<TaskId>,
124}
125
126impl InMemoryTaskStore {
127	pub fn new() -> Arc<Self> {
128		Arc::new(Self { last_id: Mutex::new(0) })
129	}
130}
131
132#[async_trait]
133impl<S: Clone> TaskStore<S> for InMemoryTaskStore {
134	async fn add(&self, _task: &TaskMeta<S>, _key: Option<&str>) -> ClResult<TaskId> {
135		let mut last_id = lock!(self.last_id)?;
136		*last_id += 1;
137		Ok(*last_id)
138	}
139
140	async fn finished(&self, _id: TaskId, _output: &str) -> ClResult<()> {
141		Ok(())
142	}
143
144	async fn load(&self) -> ClResult<Vec<TaskData>> {
145		Ok(vec![])
146	}
147
148	async fn update_task_error(
149		&self,
150		_task_id: TaskId,
151		_output: &str,
152		_next_at: Option<Timestamp>,
153	) -> ClResult<()> {
154		Ok(())
155	}
156
157	async fn find_by_key(&self, _key: &str) -> ClResult<Option<(TaskId, TaskData)>> {
158		// In-memory store doesn't support persistence or keys
159		Ok(None)
160	}
161
162	async fn update_task(&self, _id: TaskId, _task: &TaskMeta<S>) -> ClResult<()> {
163		// In-memory store doesn't support persistence
164		Ok(())
165	}
166}
167
168// MetaAdapterTaskStore
169//**********************
170pub struct MetaAdapterTaskStore {
171	meta_adapter: Arc<dyn meta_adapter::MetaAdapter>,
172}
173
174impl MetaAdapterTaskStore {
175	pub fn new(meta_adapter: Arc<dyn meta_adapter::MetaAdapter>) -> Arc<Self> {
176		Arc::new(Self { meta_adapter })
177	}
178}
179
180#[async_trait]
181impl<S: Clone> TaskStore<S> for MetaAdapterTaskStore {
182	async fn add(&self, task: &TaskMeta<S>, key: Option<&str>) -> ClResult<TaskId> {
183		let id = self
184			.meta_adapter
185			.create_task(task.task.kind_of(), key, &task.task.serialize(), &task.deps)
186			.await?;
187
188		// Store cron schedule if present
189		if let Some(cron) = &task.cron {
190			self.meta_adapter
191				.update_task(
192					id,
193					&meta_adapter::TaskPatch {
194						cron: Patch::Value(cron.to_cron_string()),
195						..Default::default()
196					},
197				)
198				.await?;
199		}
200
201		Ok(id)
202	}
203
204	async fn finished(&self, id: TaskId, output: &str) -> ClResult<()> {
205		self.meta_adapter.update_task_finished(id, output).await
206	}
207
208	async fn load(&self) -> ClResult<Vec<TaskData>> {
209		let tasks = self.meta_adapter.list_tasks(meta_adapter::ListTaskOptions::default()).await?;
210		let tasks = tasks
211			.into_iter()
212			.map(|t| TaskData {
213				id: t.task_id,
214				kind: t.kind,
215				status: match t.status {
216					'P' => TaskStatus::Pending,
217					'F' => TaskStatus::Completed,
218					// 'E' or unknown status = Failed
219					_ => TaskStatus::Failed,
220				},
221				input: t.input,
222				deps: t.deps,
223				retry_data: t.retry,
224				cron_data: t.cron,
225				next_at: t.next_at,
226			})
227			.collect();
228		Ok(tasks)
229	}
230
231	async fn update_task_error(
232		&self,
233		task_id: TaskId,
234		output: &str,
235		next_at: Option<Timestamp>,
236	) -> ClResult<()> {
237		self.meta_adapter.update_task_error(task_id, output, next_at).await
238	}
239
240	async fn find_by_key(&self, key: &str) -> ClResult<Option<(TaskId, TaskData)>> {
241		let task_opt = self.meta_adapter.find_task_by_key(key).await?;
242
243		match task_opt {
244			Some(t) => Ok(Some((
245				t.task_id,
246				TaskData {
247					id: t.task_id,
248					kind: t.kind,
249					status: match t.status {
250						'P' => TaskStatus::Pending,
251						'F' => TaskStatus::Completed,
252						// 'E' or unknown status = Failed
253						_ => TaskStatus::Failed,
254					},
255					input: t.input,
256					deps: t.deps,
257					retry_data: t.retry,
258					cron_data: t.cron,
259					next_at: t.next_at,
260				},
261			))),
262			None => Ok(None),
263		}
264	}
265
266	async fn update_task(&self, id: TaskId, task: &TaskMeta<S>) -> ClResult<()> {
267		use cloudillo_types::types::Patch;
268
269		// Build TaskPatch from TaskMeta
270		let mut patch = meta_adapter::TaskPatch {
271			input: Patch::Value(task.task.serialize()),
272			next_at: match task.next_at {
273				Some(ts) => Patch::Value(ts),
274				None => Patch::Null,
275			},
276			..Default::default()
277		};
278
279		// Update deps
280		if !task.deps.is_empty() {
281			patch.deps = Patch::Value(task.deps.clone());
282		}
283
284		// Update retry policy
285		if let Some(ref retry) = task.retry {
286			let retry_str = format!(
287				"{},{},{},{}",
288				task.retry_count, retry.wait_min_max.0, retry.wait_min_max.1, retry.times
289			);
290			patch.retry = Patch::Value(retry_str);
291		}
292
293		// Update cron schedule
294		if let Some(ref cron) = task.cron {
295			patch.cron = Patch::Value(cron.to_cron_string());
296		}
297
298		self.meta_adapter.update_task(id, &patch).await
299	}
300}
301
302// Task metadata
303type TaskBuilder<S> = dyn Fn(TaskId, &str) -> ClResult<Arc<dyn Task<S>>> + Send + Sync;
304
305#[derive(Debug, Clone)]
306pub struct RetryPolicy {
307	wait_min_max: (u64, u64),
308	times: u16,
309}
310
311impl Default for RetryPolicy {
312	fn default() -> Self {
313		Self { wait_min_max: (60, 3600), times: 10 }
314	}
315}
316
317impl RetryPolicy {
318	/// Create a new RetryPolicy with custom min/max backoff and number of retries
319	pub fn new(wait_min_max: (u64, u64), times: u16) -> Self {
320		Self { wait_min_max, times }
321	}
322
323	/// Calculate exponential backoff in seconds: min * (2^attempt), capped at max
324	pub fn calculate_backoff(&self, attempt_count: u16) -> u64 {
325		let (min, max) = self.wait_min_max;
326		let backoff = min * (1u64 << u64::from(attempt_count));
327		backoff.min(max)
328	}
329
330	/// Check if we should continue retrying
331	pub fn should_retry(&self, attempt_count: u16) -> bool {
332		attempt_count < self.times
333	}
334}
335
336// TaskSchedulerBuilder - Fluent API for task scheduling
337//************************************************************
338pub struct TaskSchedulerBuilder<'a, S: Clone> {
339	scheduler: &'a Scheduler<S>,
340	task: Arc<dyn Task<S>>,
341	key: Option<String>,
342	next_at: Option<Timestamp>,
343	deps: Vec<TaskId>,
344	retry: Option<RetryPolicy>,
345	cron: Option<CronSchedule>,
346}
347
348impl<'a, S: Clone + Send + Sync + 'static> TaskSchedulerBuilder<'a, S> {
349	/// Create a new builder for scheduling a task
350	fn new(scheduler: &'a Scheduler<S>, task: Arc<dyn Task<S>>) -> Self {
351		Self {
352			scheduler,
353			task,
354			key: None,
355			next_at: None,
356			deps: Vec::new(),
357			retry: None,
358			cron: None,
359		}
360	}
361
362	/// Set a string key for task identification
363	pub fn key(mut self, key: impl Into<String>) -> Self {
364		self.key = Some(key.into());
365		self
366	}
367
368	/// Schedule for a specific absolute timestamp
369	pub fn schedule_at(mut self, timestamp: Timestamp) -> Self {
370		self.next_at = Some(timestamp);
371		self
372	}
373
374	/// Schedule after a relative delay (in seconds)
375	pub fn schedule_after(mut self, seconds: i64) -> Self {
376		self.next_at = Some(Timestamp::from_now(seconds));
377		self
378	}
379
380	/// Add task dependencies - task waits for all of these to complete
381	pub fn depend_on(mut self, deps: Vec<TaskId>) -> Self {
382		self.deps = deps;
383		self
384	}
385
386	/// Add a single task dependency
387	pub fn depends_on(mut self, dep: TaskId) -> Self {
388		self.deps.push(dep);
389		self
390	}
391
392	/// Enable automatic retry with exponential backoff
393	pub fn with_retry(mut self, policy: RetryPolicy) -> Self {
394		self.retry = Some(policy);
395		self
396	}
397
398	// ===== Cron Scheduling Methods =====
399
400	/// Schedule task with cron expression
401	/// Example: `.cron("0 9 * * *")` for 9 AM daily
402	pub fn cron(mut self, expr: impl Into<String>) -> Self {
403		if let Ok(cron_schedule) = CronSchedule::parse(&expr.into()) {
404			// Calculate initial next_at from cron schedule
405			// Use .ok() - cron was just parsed successfully, should never fail
406			self.next_at = cron_schedule.next_execution(Timestamp::now()).ok();
407			self.cron = Some(cron_schedule);
408		}
409		self
410	}
411
412	/// Schedule task daily at specified time
413	/// Example: `.daily_at(2, 30)` for 2:30 AM daily
414	pub fn daily_at(mut self, hour: u8, minute: u8) -> Self {
415		if hour <= 23 && minute <= 59 {
416			let expr = format!("{} {} * * *", minute, hour);
417			if let Ok(cron_schedule) = CronSchedule::parse(&expr) {
418				// Calculate initial next_at from cron schedule
419				// Use .ok() - cron was just parsed successfully, should never fail
420				self.next_at = cron_schedule.next_execution(Timestamp::now()).ok();
421				self.cron = Some(cron_schedule);
422			}
423		}
424		self
425	}
426
427	/// Schedule task weekly at specified day and time
428	/// Example: `.weekly_at(1, 14, 30)` for Mondays at 2:30 PM
429	/// weekday: 0=Sunday, 1=Monday, ..., 6=Saturday
430	pub fn weekly_at(mut self, weekday: u8, hour: u8, minute: u8) -> Self {
431		if weekday <= 6 && hour <= 23 && minute <= 59 {
432			let expr = format!("{} {} * * {}", minute, hour, weekday);
433			if let Ok(cron_schedule) = CronSchedule::parse(&expr) {
434				// Calculate initial next_at from cron schedule
435				// Use .ok() - cron was just parsed successfully, should never fail
436				self.next_at = cron_schedule.next_execution(Timestamp::now()).ok();
437				self.cron = Some(cron_schedule);
438			}
439		}
440		self
441	}
442
443	/// Execute the scheduled task immediately
444	pub async fn now(self) -> ClResult<TaskId> {
445		self.schedule().await
446	}
447
448	/// Execute the scheduled task at a specific timestamp
449	pub async fn at(mut self, ts: Timestamp) -> ClResult<TaskId> {
450		self.next_at = Some(ts);
451		self.schedule().await
452	}
453
454	/// Execute the scheduled task after a delay (in seconds)
455	pub async fn after(mut self, seconds: i64) -> ClResult<TaskId> {
456		self.next_at = Some(Timestamp::from_now(seconds));
457		self.schedule().await
458	}
459
460	/// Execute the scheduled task after another task completes
461	pub async fn after_task(mut self, dep: TaskId) -> ClResult<TaskId> {
462		self.deps.push(dep);
463		self.schedule().await
464	}
465
466	/// Execute the scheduled task with automatic retry using default policy
467	pub async fn with_automatic_retry(mut self) -> ClResult<TaskId> {
468		self.retry = Some(RetryPolicy::default());
469		self.schedule().await
470	}
471
472	/// Execute the task with all configured options - main terminal method
473	pub async fn schedule(self) -> ClResult<TaskId> {
474		self.scheduler
475			.schedule_task_impl(
476				self.task,
477				self.key.as_deref(),
478				self.next_at,
479				if self.deps.is_empty() { None } else { Some(self.deps) },
480				self.retry,
481				self.cron,
482			)
483			.await
484	}
485}
486
487#[derive(Debug, Clone)]
488pub struct TaskMeta<S: Clone> {
489	pub task: Arc<dyn Task<S>>,
490	pub next_at: Option<Timestamp>,
491	pub deps: Vec<TaskId>,
492	retry_count: u16,
493	pub retry: Option<RetryPolicy>,
494	pub cron: Option<CronSchedule>,
495}
496
497type TaskBuilderRegistry<S> = HashMap<&'static str, Box<TaskBuilder<S>>>;
498type ScheduledTaskMap<S> = BTreeMap<(Timestamp, TaskId), TaskMeta<S>>;
499
500// Scheduler
501#[derive(Clone)]
502pub struct Scheduler<S: Clone> {
503	task_builders: Arc<RwLock<TaskBuilderRegistry<S>>>,
504	store: Arc<dyn TaskStore<S>>,
505	tasks_running: Arc<Mutex<HashMap<TaskId, TaskMeta<S>>>>,
506	tasks_waiting: Arc<Mutex<HashMap<TaskId, TaskMeta<S>>>>,
507	task_dependents: Arc<Mutex<HashMap<TaskId, Vec<TaskId>>>>,
508	tasks_scheduled: Arc<Mutex<ScheduledTaskMap<S>>>,
509	tx_finish: flume::Sender<TaskId>,
510	rx_finish: flume::Receiver<TaskId>,
511	notify_schedule: Arc<tokio::sync::Notify>,
512}
513
514impl<S: Clone + Send + Sync + 'static> Scheduler<S> {
515	pub fn new(store: Arc<dyn TaskStore<S>>) -> Arc<Self> {
516		let (tx_finish, rx_finish) = flume::unbounded();
517
518		let scheduler = Self {
519			task_builders: Arc::new(RwLock::new(HashMap::new())),
520			store,
521			tasks_running: Arc::new(Mutex::new(HashMap::new())),
522			tasks_waiting: Arc::new(Mutex::new(HashMap::new())),
523			task_dependents: Arc::new(Mutex::new(HashMap::new())),
524			tasks_scheduled: Arc::new(Mutex::new(BTreeMap::new())),
525			tx_finish,
526			rx_finish,
527			notify_schedule: Arc::new(tokio::sync::Notify::new()),
528		};
529
530		//scheduler.run(rx_finish)?;
531
532		Arc::new(scheduler)
533	}
534
535	pub fn start(&self, state: S) {
536		// Handle finished tasks and dependencies
537		let schedule = self.clone();
538		let stat = state.clone();
539		let rx_finish = self.rx_finish.clone();
540
541		tokio::spawn(async move {
542			while let Ok(id) = rx_finish.recv_async().await {
543				debug!("Completed task {} (notified)", id);
544
545				// Get task metadata WITHOUT removing - we only remove after successful transition
546				let task_meta_opt = {
547					let tasks_running = match schedule.tasks_running.lock() {
548						Ok(guard) => guard,
549						Err(poisoned) => {
550							error!("Mutex poisoned: tasks_running (recovering)");
551							poisoned.into_inner()
552						}
553					};
554					tasks_running.get(&id).cloned()
555				};
556
557				if let Some(task_meta) = task_meta_opt {
558					// Track if transition was successful
559					let mut transition_ok = false;
560
561					// Check if this is a recurring task with cron schedule
562					if let Some(ref cron) = task_meta.cron {
563						// Calculate next execution time
564						let next_at = match cron.next_execution(Timestamp::now()) {
565							Ok(ts) => ts,
566							Err(e) => {
567								error!(
568									"Failed to calculate next execution for recurring task {}: {} - task will not reschedule",
569									id, e
570								);
571								// Mark as finished since we can't reschedule
572								if let Err(e) = schedule.store.finished(id, "").await {
573									error!("Failed to mark task {} as finished: {}", id, e);
574								}
575								continue;
576							}
577						};
578						info!(
579							"Recurring task {} completed, scheduling next execution at {}",
580							id, next_at
581						);
582
583						// Update task with new next_at
584						let mut updated_meta = task_meta.clone();
585						updated_meta.next_at = Some(next_at);
586
587						// Update database with new next_at (keep status as Pending)
588						if let Err(e) = schedule.store.update_task(id, &updated_meta).await {
589							error!("Failed to update recurring task {} next_at: {}", id, e);
590						}
591
592						// Remove from running BEFORE add_queue (so add_queue doesn't see it as running)
593						match schedule.tasks_running.lock() {
594							Ok(mut tasks_running) => {
595								tasks_running.remove(&id);
596							}
597							Err(poisoned) => {
598								error!("Mutex poisoned: tasks_running (recovering)");
599								poisoned.into_inner().remove(&id);
600							}
601						}
602
603						// Re-add to scheduler with new execution time
604						match schedule.add_queue(id, updated_meta).await {
605							Ok(_) => transition_ok = true,
606							Err(e) => {
607								error!(
608									"Failed to reschedule recurring task {}: {} - task lost!",
609									id, e
610								);
611							}
612						}
613					} else {
614						// One-time task - mark as finished
615						match schedule.store.finished(id, "").await {
616							Ok(()) => transition_ok = true,
617							Err(e) => {
618								error!(
619									"Failed to mark task {} as finished: {} - task remains in running queue",
620									id, e
621								);
622							}
623						}
624					}
625
626					// Only remove from running queue after successful transition
627					if transition_ok {
628						match schedule.tasks_running.lock() {
629							Ok(mut tasks_running) => {
630								tasks_running.remove(&id);
631							}
632							Err(poisoned) => {
633								error!("Mutex poisoned: tasks_running (recovering)");
634								poisoned.into_inner().remove(&id);
635							}
636						}
637					}
638
639					// Handle dependencies of finished task using atomic release method
640					match schedule.release_dependents(id) {
641						Ok(ready_to_spawn) => {
642							for (dep_id, dep_task_meta) in ready_to_spawn {
643								// Add to running queue before spawning
644								match schedule.tasks_running.lock() {
645									Ok(mut tasks_running) => {
646										tasks_running.insert(dep_id, dep_task_meta.clone());
647									}
648									Err(poisoned) => {
649										error!("Mutex poisoned: tasks_running (recovering)");
650										poisoned.into_inner().insert(dep_id, dep_task_meta.clone());
651									}
652								}
653								schedule.spawn_task(
654									stat.clone(),
655									dep_task_meta.task.clone(),
656									dep_id,
657									dep_task_meta,
658								);
659							}
660						}
661						Err(e) => {
662							error!("Failed to release dependents of task {}: {}", id, e);
663						}
664					}
665				} else {
666					warn!("Completed task {} not found in running queue", id);
667				}
668			}
669		});
670
671		// Handle scheduled tasks
672		let schedule = self.clone();
673		tokio::spawn(async move {
674			loop {
675				let is_empty = match schedule.tasks_scheduled.lock() {
676					Ok(guard) => guard.is_empty(),
677					Err(poisoned) => {
678						error!("Mutex poisoned: tasks_scheduled (recovering)");
679						poisoned.into_inner().is_empty()
680					}
681				};
682				if is_empty {
683					schedule.notify_schedule.notified().await;
684				}
685				let time = Timestamp::now();
686				if let Some((timestamp, _id)) = loop {
687					let mut tasks_scheduled = match schedule.tasks_scheduled.lock() {
688						Ok(guard) => guard,
689						Err(poisoned) => {
690							error!("Mutex poisoned: tasks_scheduled (recovering)");
691							poisoned.into_inner()
692						}
693					};
694					if let Some((&(timestamp, id), _)) = tasks_scheduled.first_key_value() {
695						let (timestamp, id) = (timestamp, id);
696						if timestamp <= Timestamp::now() {
697							debug!("Spawning task id {} (from schedule)", id);
698							if let Some(task) = tasks_scheduled.remove(&(timestamp, id)) {
699								let mut tasks_running = match schedule.tasks_running.lock() {
700									Ok(guard) => guard,
701									Err(poisoned) => {
702										error!("Mutex poisoned: tasks_running (recovering)");
703										poisoned.into_inner()
704									}
705								};
706								tasks_running.insert(id, task.clone());
707								schedule.spawn_task(state.clone(), task.task.clone(), id, task);
708							} else {
709								error!("Task disappeared while being removed from schedule");
710								break None;
711							}
712						} else {
713							break Some((timestamp, id));
714						}
715					} else {
716						break None;
717					}
718				} {
719					let diff = timestamp.0 - time.0;
720					let wait =
721						tokio::time::Duration::from_secs(u64::try_from(diff).unwrap_or_default());
722					tokio::select! {
723						() = tokio::time::sleep(wait) => (), () = schedule.notify_schedule.notified() => ()
724					};
725				}
726			}
727		});
728
729		let schedule = self.clone();
730		tokio::spawn(async move {
731			let _ignore_err = schedule.load().await;
732		});
733	}
734
735	fn register_builder(
736		&self,
737		name: &'static str,
738		builder: &'static TaskBuilder<S>,
739	) -> ClResult<&Self> {
740		let mut task_builders = self
741			.task_builders
742			.write()
743			.map_err(|_| Error::Internal("task_builders RwLock poisoned".into()))?;
744		task_builders.insert(name, Box::new(builder));
745		Ok(self)
746	}
747
748	pub fn register<T: Task<S>>(&self) -> ClResult<&Self> {
749		info!("Registering task type {}", T::kind());
750		self.register_builder(T::kind(), &|id: TaskId, params: &str| T::build(id, params))?;
751		Ok(self)
752	}
753
754	/// Create a builder for scheduling a task using the fluent API
755	pub fn task(&self, task: Arc<dyn Task<S>>) -> TaskSchedulerBuilder<'_, S> {
756		TaskSchedulerBuilder::new(self, task)
757	}
758
759	/// Internal method to schedule a task with all options
760	/// This is the core implementation used by the builder pattern
761	async fn schedule_task_impl(
762		&self,
763		task: Arc<dyn Task<S>>,
764		key: Option<&str>,
765		next_at: Option<Timestamp>,
766		deps: Option<Vec<TaskId>>,
767		retry: Option<RetryPolicy>,
768		cron: Option<CronSchedule>,
769	) -> ClResult<TaskId> {
770		let task_meta = TaskMeta {
771			task: task.clone(),
772			next_at,
773			deps: deps.clone().unwrap_or_default(),
774			retry_count: 0,
775			retry,
776			cron,
777		};
778
779		// Check if a task with this key already exists (key-based deduplication)
780		if let Some(key) = key {
781			if let Some((existing_id, existing_data)) = self.store.find_by_key(key).await? {
782				let new_serialized = task.serialize();
783				let existing_serialized = existing_data.input.as_ref();
784
785				// Compare serialized parameters
786				if new_serialized == existing_serialized {
787					info!(
788						"Recurring task '{}' already exists with identical parameters (id={})",
789						key, existing_id
790					);
791					// Update DB with current cron/next_at (may differ from what's stored)
792					self.store.update_task(existing_id, &task_meta).await?;
793					// Ensure the existing task is queued (may be loaded from DB but not yet in queue)
794					self.add_queue(existing_id, task_meta).await?;
795					return Ok(existing_id);
796				}
797				info!(
798					"Updating recurring task '{}' (id={}) - parameters changed",
799					key, existing_id
800				);
801				info!("  Old params: {}", existing_serialized);
802				info!("  New params: {}", new_serialized);
803
804				// Remove from all queues (if present)
805				self.remove_from_queues(existing_id)?;
806
807				// Update the task in database with new parameters
808				self.store.update_task(existing_id, &task_meta).await?;
809
810				// Re-add to appropriate queue with updated parameters
811				self.add_queue(existing_id, task_meta).await?;
812
813				return Ok(existing_id);
814			}
815		}
816
817		// No existing task - create new one
818		let id = self.store.add(&task_meta, key).await?;
819		self.add_queue(id, task_meta).await
820	}
821
822	pub async fn add(&self, task: Arc<dyn Task<S>>) -> ClResult<TaskId> {
823		self.task(task).now().await
824	}
825
826	pub async fn add_queue(&self, id: TaskId, task_meta: TaskMeta<S>) -> ClResult<TaskId> {
827		// If task is already running, update its metadata (especially for cron updates)
828		// but don't add to scheduled queue (it will reschedule on completion)
829		{
830			let mut running = lock!(self.tasks_running, "tasks_running")?;
831			if let Some(existing_meta) = running.get_mut(&id) {
832				debug!(
833					"Task {} is already running, updating metadata (will reschedule on completion)",
834					id
835				);
836				// Update the running task's metadata so it has the latest cron schedule
837				*existing_meta = task_meta;
838				return Ok(id);
839			}
840		}
841
842		// Remove from other queues if present (prevents duplicate entries with different timestamps)
843		{
844			let mut scheduled = lock!(self.tasks_scheduled, "tasks_scheduled")?;
845			if let Some(key) = scheduled
846				.iter()
847				.find(|((_, tid), _)| *tid == id)
848				.map(|((ts, tid), _)| (*ts, *tid))
849			{
850				scheduled.remove(&key);
851				debug!("Removed existing scheduled entry for task {} before re-queueing", id);
852			}
853		}
854		{
855			let mut waiting = lock!(self.tasks_waiting, "tasks_waiting")?;
856			if waiting.remove(&id).is_some() {
857				debug!("Removed existing waiting entry for task {} before re-queueing", id);
858			}
859		}
860
861		let deps = task_meta.deps.clone();
862
863		// VALIDATION: Tasks with dependencies should NEVER be in tasks_scheduled
864		if !deps.is_empty() && task_meta.next_at.is_some() {
865			warn!("Task {} has both dependencies and scheduled time - ignoring next_at, placing in waiting queue", id);
866			// Force to tasks_waiting instead
867			lock!(self.tasks_waiting, "tasks_waiting")?.insert(id, task_meta);
868			debug!("Task {} is waiting for {:?}", id, &deps);
869			for dep in deps {
870				lock!(self.task_dependents, "task_dependents")?.entry(dep).or_default().push(id);
871			}
872			return Ok(id);
873		}
874
875		if deps.is_empty() && task_meta.next_at.unwrap_or(Timestamp(0)) < Timestamp::now() {
876			debug!("Spawning task {}", id);
877			lock!(self.tasks_scheduled, "tasks_scheduled")?.insert((Timestamp(0), id), task_meta);
878			self.notify_schedule.notify_one();
879		} else if let Some(next_at) = task_meta.next_at {
880			debug!("Scheduling task {} for {}", id, next_at);
881			lock!(self.tasks_scheduled, "tasks_scheduled")?.insert((next_at, id), task_meta);
882			self.notify_schedule.notify_one();
883		} else {
884			lock!(self.tasks_waiting, "tasks_waiting")?.insert(id, task_meta);
885			debug!("Task {} is waiting for {:?}", id, &deps);
886			for dep in deps {
887				lock!(self.task_dependents, "task_dependents")?.entry(dep).or_default().push(id);
888			}
889		}
890		Ok(id)
891	}
892
893	/// Remove a task from all internal queues (waiting, scheduled, running)
894	/// Returns the removed TaskMeta if found
895	fn remove_from_queues(&self, task_id: TaskId) -> ClResult<Option<TaskMeta<S>>> {
896		// Try tasks_waiting
897		if let Some(task_meta) = lock!(self.tasks_waiting, "tasks_waiting")?.remove(&task_id) {
898			debug!("Removed task {} from waiting queue for update", task_id);
899			return Ok(Some(task_meta));
900		}
901
902		// Try tasks_scheduled (need to find by task_id in BTreeMap)
903		{
904			let mut scheduled = lock!(self.tasks_scheduled, "tasks_scheduled")?;
905			if let Some(key) = scheduled
906				.iter()
907				.find(|((_, id), _)| *id == task_id)
908				.map(|((ts, id), _)| (*ts, *id))
909			{
910				if let Some(task_meta) = scheduled.remove(&key) {
911					debug!("Removed task {} from scheduled queue for update", task_id);
912					return Ok(Some(task_meta));
913				}
914			}
915		}
916
917		// Try tasks_running (should rarely happen, but handle it)
918		if let Some(task_meta) = lock!(self.tasks_running, "tasks_running")?.remove(&task_id) {
919			warn!("Removed task {} from running queue during update", task_id);
920			return Ok(Some(task_meta));
921		}
922
923		Ok(None)
924	}
925
926	/// Release all dependent tasks of a completed task
927	/// This method safely handles dependency cleanup and spawning
928	fn release_dependents(
929		&self,
930		completed_task_id: TaskId,
931	) -> ClResult<Vec<(TaskId, TaskMeta<S>)>> {
932		// Get list of dependents (atomic removal to prevent re-processing)
933		let dependents = {
934			let mut deps_map = lock!(self.task_dependents, "task_dependents")?;
935			deps_map.remove(&completed_task_id).unwrap_or_default()
936		};
937
938		if dependents.is_empty() {
939			return Ok(Vec::new()); // No dependents to release
940		}
941
942		debug!("Releasing {} dependents of completed task {}", dependents.len(), completed_task_id);
943
944		let mut ready_to_spawn = Vec::new();
945
946		// For each dependent, check and remove dependency
947		for dependent_id in dependents {
948			// Try tasks_waiting first (most common case for dependent tasks)
949			{
950				let mut waiting = lock!(self.tasks_waiting, "tasks_waiting")?;
951				if let Some(task_meta) = waiting.get_mut(&dependent_id) {
952					// Remove the completed task from dependencies
953					task_meta.deps.retain(|x| *x != completed_task_id);
954
955					// If all dependencies are cleared, remove and queue for spawning
956					if task_meta.deps.is_empty() {
957						if let Some(task_to_spawn) = waiting.remove(&dependent_id) {
958							debug!(
959								"Dependent task {} ready to spawn (all dependencies cleared)",
960								dependent_id
961							);
962							ready_to_spawn.push((dependent_id, task_to_spawn));
963						}
964					} else {
965						debug!(
966							"Task {} still has {} remaining dependencies",
967							dependent_id,
968							task_meta.deps.len()
969						);
970					}
971					continue;
972				}
973			}
974
975			// Try tasks_scheduled if not in waiting (shouldn't happen with validation, but be defensive)
976			{
977				let mut scheduled = lock!(self.tasks_scheduled, "tasks_scheduled")?;
978				if let Some(scheduled_key) = scheduled
979					.iter()
980					.find(|((_, id), _)| *id == dependent_id)
981					.map(|((ts, id), _)| (*ts, *id))
982				{
983					if let Some(task_meta) = scheduled.get_mut(&scheduled_key) {
984						task_meta.deps.retain(|x| *x != completed_task_id);
985						let remaining = task_meta.deps.len();
986						if remaining == 0 {
987							debug!(
988								"Task {} in scheduled queue has no remaining dependencies",
989								dependent_id
990							);
991						} else {
992							debug!(
993								"Task {} in scheduled queue has {} remaining dependencies",
994								dependent_id, remaining
995							);
996						}
997					}
998					continue;
999				}
1000			}
1001
1002			// Task not found in any queue
1003			warn!(
1004				"Dependent task {} of completed task {} not found in any queue",
1005				dependent_id, completed_task_id
1006			);
1007		}
1008
1009		Ok(ready_to_spawn)
1010	}
1011
1012	async fn load(&self) -> ClResult<()> {
1013		let tasks = self.store.load().await?;
1014		debug!("Loaded {} tasks from store", tasks.len());
1015		for t in tasks {
1016			if let TaskStatus::Pending = t.status {
1017				debug!("Loading task {} {}", t.id, t.kind);
1018				let task = {
1019					let builder_map = self
1020						.task_builders
1021						.read()
1022						.map_err(|_| Error::Internal("task_builders RwLock poisoned".into()))?;
1023					let builder = builder_map.get(t.kind.as_ref()).ok_or(Error::Internal(
1024						format!("task builder not registered: {}", t.kind),
1025					))?;
1026					builder(t.id, &t.input)?
1027				};
1028				let (retry_count, retry) = match t.retry_data {
1029					Some(retry_str) => {
1030						let (retry_count, retry_min, retry_max, retry_times) = retry_str
1031							.split(',')
1032							.collect_tuple()
1033							.ok_or(Error::Internal("invalid retry policy format".into()))?;
1034						let retry_count: u16 = retry_count
1035							.parse()
1036							.map_err(|_| Error::Internal("retry count must be u16".into()))?;
1037						let retry = RetryPolicy {
1038							wait_min_max: (
1039								retry_min
1040									.parse()
1041									.map_err(|_| Error::Internal("retry_min must be u64".into()))?,
1042								retry_max
1043									.parse()
1044									.map_err(|_| Error::Internal("retry_max must be u64".into()))?,
1045							),
1046							times: retry_times
1047								.parse()
1048								.map_err(|_| Error::Internal("retry times must be u64".into()))?,
1049						};
1050						debug!("Loaded retry policy: {:?}", retry);
1051						(retry_count, Some(retry))
1052					}
1053					_ => (0, None),
1054				};
1055				// Parse cron data if present
1056				let cron =
1057					t.cron_data.as_ref().and_then(|cron_str| CronSchedule::parse(cron_str).ok());
1058
1059				let task_meta = TaskMeta {
1060					task,
1061					next_at: t.next_at,
1062					deps: t.deps.into(),
1063					retry_count,
1064					retry,
1065					cron,
1066				};
1067				self.add_queue(t.id, task_meta).await?;
1068			}
1069		}
1070		Ok(())
1071	}
1072
1073	fn spawn_task(&self, state: S, task: Arc<dyn Task<S>>, id: TaskId, task_meta: TaskMeta<S>) {
1074		let tx_finish = self.tx_finish.clone();
1075		let store = self.store.clone();
1076		let scheduler = self.clone();
1077		//let state = self.state.clone();
1078		tokio::spawn(async move {
1079			match task.run(&state).await {
1080				Ok(()) => {
1081					debug!("Task {} completed successfully", id);
1082					tx_finish.send(id).unwrap_or(());
1083				}
1084				Err(e) => {
1085					if let Some(retry_policy) = &task_meta.retry {
1086						if retry_policy.should_retry(task_meta.retry_count) {
1087							let backoff = retry_policy.calculate_backoff(task_meta.retry_count);
1088							let next_at = Timestamp::from_now(backoff.cast_signed());
1089
1090							info!(
1091								"Task {} failed (attempt {}/{}). Scheduling retry in {} seconds: {}",
1092								id, task_meta.retry_count + 1, retry_policy.times, backoff, e
1093							);
1094
1095							// Update database with error and reschedule
1096							store
1097								.update_task_error(id, &e.to_string(), Some(next_at))
1098								.await
1099								.unwrap_or(());
1100
1101							// Remove from running tasks (we're not sending finish event)
1102							match scheduler.tasks_running.lock() {
1103								Ok(mut tasks_running) => {
1104									tasks_running.remove(&id);
1105								}
1106								Err(poisoned) => {
1107									error!("Mutex poisoned: tasks_running (recovering)");
1108									poisoned.into_inner().remove(&id);
1109								}
1110							}
1111
1112							// Re-queue task with incremented retry count
1113							let mut retry_meta = task_meta.clone();
1114							retry_meta.retry_count += 1;
1115							retry_meta.next_at = Some(next_at);
1116							scheduler.add_queue(id, retry_meta).await.unwrap_or(id);
1117						} else {
1118							// Max retries exhausted
1119							error!(
1120								"Task {} failed after {} retries: {}",
1121								id, task_meta.retry_count, e
1122							);
1123							store.update_task_error(id, &e.to_string(), None).await.unwrap_or(());
1124							tx_finish.send(id).unwrap_or(());
1125						}
1126					} else {
1127						// No retry policy - fail immediately
1128						error!("Task {} failed: {}", id, e);
1129						store.update_task_error(id, &e.to_string(), None).await.unwrap_or(());
1130						tx_finish.send(id).unwrap_or(());
1131					}
1132				}
1133			}
1134		});
1135	}
1136
1137	/// Get health status of the scheduler
1138	/// Returns information about tasks in each queue and detects anomalies
1139	pub async fn health_check(&self) -> ClResult<SchedulerHealth> {
1140		let waiting_count = lock!(self.tasks_waiting, "tasks_waiting")?.len();
1141		let scheduled_count = lock!(self.tasks_scheduled, "tasks_scheduled")?.len();
1142		let running_count = lock!(self.tasks_running, "tasks_running")?.len();
1143		let dependents_count = lock!(self.task_dependents, "task_dependents")?.len();
1144
1145		// Check for anomalies
1146		let mut stuck_tasks = Vec::new();
1147		let mut tasks_with_missing_deps = Vec::new();
1148
1149		// Check tasks_waiting for tasks with no dependencies (stuck)
1150		{
1151			let waiting = lock!(self.tasks_waiting, "tasks_waiting")?;
1152			let _deps_map = lock!(self.task_dependents, "task_dependents")?;
1153
1154			for (id, task_meta) in waiting.iter() {
1155				if task_meta.deps.is_empty() {
1156					stuck_tasks.push(*id);
1157					warn!("SCHEDULER HEALTH: Task {} in waiting with no dependencies", id);
1158				} else {
1159					// Check if all dependencies still exist
1160					for dep in &task_meta.deps {
1161						// Check if dependency is in any queue or dependents map
1162						let dep_exists =
1163							self.tasks_running.lock().ok().is_some_and(|r| r.contains_key(dep))
1164								|| self
1165									.tasks_waiting
1166									.lock()
1167									.ok()
1168									.is_some_and(|w| w.contains_key(dep))
1169								|| self.tasks_scheduled.lock().ok().is_some_and(|s| {
1170									s.iter().any(|((_, task_id), _)| task_id == dep)
1171								});
1172
1173						if !dep_exists {
1174							tasks_with_missing_deps.push((*id, *dep));
1175							warn!(
1176								"SCHEDULER HEALTH: Task {} depends on non-existent task {}",
1177								id, dep
1178							);
1179						}
1180					}
1181				}
1182			}
1183		}
1184
1185		Ok(SchedulerHealth {
1186			waiting: waiting_count,
1187			scheduled: scheduled_count,
1188			running: running_count,
1189			dependents: dependents_count,
1190			stuck_tasks,
1191			tasks_with_missing_deps,
1192		})
1193	}
1194}
1195
1196/// Health status of the scheduler
1197#[derive(Debug, Clone)]
1198pub struct SchedulerHealth {
1199	/// Number of tasks waiting for dependencies
1200	pub waiting: usize,
1201	/// Number of tasks scheduled for future execution
1202	pub scheduled: usize,
1203	/// Number of tasks currently running
1204	pub running: usize,
1205	/// Number of task entries in dependents map
1206	pub dependents: usize,
1207	/// IDs of tasks with no dependencies but still in waiting queue
1208	pub stuck_tasks: Vec<TaskId>,
1209	/// Pairs of (task_id, missing_dependency_id) where dependency doesn't exist
1210	pub tasks_with_missing_deps: Vec<(TaskId, TaskId)>,
1211}
1212
1213#[cfg(test)]
1214mod tests {
1215	use super::*;
1216	use serde::{Deserialize, Serialize};
1217
1218	type State = Arc<Mutex<Vec<u8>>>;
1219
1220	#[derive(Debug, Serialize, Deserialize)]
1221	struct TestTask {
1222		num: u8,
1223	}
1224
1225	impl TestTask {
1226		pub fn new(num: u8) -> Arc<Self> {
1227			Arc::new(Self { num })
1228		}
1229	}
1230
1231	#[async_trait]
1232	impl Task<State> for TestTask {
1233		fn kind() -> &'static str {
1234			"test"
1235		}
1236
1237		fn build(_id: TaskId, ctx: &str) -> ClResult<Arc<dyn Task<State>>> {
1238			let num: u8 = ctx
1239				.parse()
1240				.map_err(|_| Error::Internal("test task context must be u8".into()))?;
1241			let task = TestTask::new(num);
1242			Ok(task)
1243		}
1244
1245		fn serialize(&self) -> String {
1246			self.num.to_string()
1247		}
1248
1249		fn kind_of(&self) -> &'static str {
1250			"test"
1251		}
1252
1253		async fn run(&self, state: &State) -> ClResult<()> {
1254			info!("Running task {}", self.num);
1255			tokio::time::sleep(std::time::Duration::from_millis(200 * u64::from(self.num))).await;
1256			info!("Completed task {}", self.num);
1257			state.lock().unwrap().push(self.num);
1258			Ok(())
1259		}
1260	}
1261
1262	#[derive(Debug, Clone)]
1263	struct FailingTask {
1264		id: u8,
1265		fail_count: u8,
1266		attempt: Arc<Mutex<u8>>,
1267	}
1268
1269	impl FailingTask {
1270		pub fn new(id: u8, fail_count: u8) -> Arc<Self> {
1271			Arc::new(Self { id, fail_count, attempt: Arc::new(Mutex::new(0)) })
1272		}
1273	}
1274
1275	#[async_trait]
1276	impl Task<State> for FailingTask {
1277		fn kind() -> &'static str {
1278			"failing"
1279		}
1280
1281		fn build(_id: TaskId, ctx: &str) -> ClResult<Arc<dyn Task<State>>> {
1282			let parts: Vec<&str> = ctx.split(',').collect();
1283			if parts.len() != 2 {
1284				return Err(Error::Internal("failing task context must have 2 parts".into()));
1285			}
1286			let id: u8 = parts[0]
1287				.parse()
1288				.map_err(|_| Error::Internal("failing task id must be u8".into()))?;
1289			let fail_count: u8 = parts[1]
1290				.parse()
1291				.map_err(|_| Error::Internal("failing task fail_count must be u8".into()))?;
1292			Ok(FailingTask::new(id, fail_count))
1293		}
1294
1295		fn serialize(&self) -> String {
1296			format!("{},{}", self.id, self.fail_count)
1297		}
1298
1299		fn kind_of(&self) -> &'static str {
1300			"failing"
1301		}
1302
1303		async fn run(&self, state: &State) -> ClResult<()> {
1304			let mut attempt = self.attempt.lock().unwrap();
1305			*attempt += 1;
1306			let current_attempt = *attempt;
1307
1308			info!("FailingTask {} - attempt {}/{}", self.id, current_attempt, self.fail_count + 1);
1309
1310			if current_attempt <= self.fail_count {
1311				error!("FailingTask {} failed on attempt {}", self.id, current_attempt);
1312				return Err(Error::ServiceUnavailable(format!("Task {} failed", self.id)));
1313			}
1314
1315			info!("FailingTask {} succeeded on attempt {}", self.id, current_attempt);
1316			state.lock().unwrap().push(self.id);
1317			Ok(())
1318		}
1319	}
1320
1321	#[tokio::test]
1322	pub async fn test_scheduler() {
1323		let _ = tracing_subscriber::fmt().try_init();
1324
1325		let task_store: Arc<dyn TaskStore<State>> = InMemoryTaskStore::new();
1326		let state: State = Arc::new(Mutex::new(Vec::new()));
1327		let scheduler = Scheduler::new(task_store);
1328		scheduler.start(state.clone());
1329		scheduler.register::<TestTask>().unwrap();
1330
1331		let _task1 = TestTask::new(1);
1332		let task2 = TestTask::new(1);
1333		let task3 = TestTask::new(1);
1334
1335		let task2_id = scheduler.task(task2).schedule_after(2).schedule().await.unwrap();
1336		let task3_id = scheduler.add(task3).await.unwrap();
1337		scheduler
1338			.task(TestTask::new(1))
1339			.depend_on(vec![task2_id, task3_id])
1340			.schedule()
1341			.await
1342			.unwrap();
1343
1344		tokio::time::sleep(std::time::Duration::from_secs(4)).await;
1345		let task4 = TestTask::new(1);
1346		let task5 = TestTask::new(1);
1347		scheduler.task(task4).schedule_after(2).schedule().await.unwrap();
1348		scheduler.task(task5).schedule_after(1).schedule().await.unwrap();
1349
1350		tokio::time::sleep(std::time::Duration::from_secs(3)).await;
1351
1352		let st = state.lock().unwrap();
1353		info!("res: {}", st.len());
1354		let str_vec = st.iter().map(std::string::ToString::to_string).collect::<Vec<String>>();
1355		assert_eq!(str_vec.join(":"), "1:1:1:1:1");
1356	}
1357
1358	#[tokio::test]
1359	pub async fn test_retry_with_backoff() {
1360		let _ = tracing_subscriber::fmt().try_init();
1361
1362		let task_store: Arc<dyn TaskStore<State>> = InMemoryTaskStore::new();
1363		let state: State = Arc::new(Mutex::new(Vec::new()));
1364		let scheduler = Scheduler::new(task_store);
1365		scheduler.start(state.clone());
1366		scheduler.register::<FailingTask>().unwrap();
1367
1368		// Create a task that fails twice, then succeeds
1369		// With retry policy: min=1s, max=3600s, max_attempts=3
1370		let failing_task = FailingTask::new(42, 2);
1371		let retry_policy = RetryPolicy { wait_min_max: (1, 3600), times: 3 };
1372
1373		scheduler.task(failing_task).with_retry(retry_policy).schedule().await.unwrap();
1374
1375		// Wait for retries: 1s (1st fail) + 1s (2nd fail) + time for success
1376		// First attempt: immediate fail
1377		// Wait 1s (min backoff)
1378		// Second attempt: fail
1379		// Wait 2s (min * 2)
1380		// Third attempt: success
1381		tokio::time::sleep(std::time::Duration::from_secs(6)).await;
1382
1383		let st = state.lock().unwrap();
1384		assert_eq!(st.len(), 1, "Task should have succeeded after retries");
1385		assert_eq!(st[0], 42);
1386	}
1387
1388	// ===== Builder Pattern Tests =====
1389
1390	#[tokio::test]
1391	pub async fn test_builder_simple_schedule() {
1392		let task_store: Arc<dyn TaskStore<State>> = InMemoryTaskStore::new();
1393		let state: State = Arc::new(Mutex::new(Vec::new()));
1394		let scheduler = Scheduler::new(task_store);
1395		scheduler.start(state.clone());
1396		scheduler.register::<TestTask>().unwrap();
1397
1398		// Test basic builder usage: .now()
1399		let task = TestTask::new(1);
1400		let id = scheduler.task(task).now().await.unwrap();
1401
1402		assert!(id > 0, "Task ID should be positive");
1403
1404		tokio::time::sleep(std::time::Duration::from_millis(500)).await;
1405
1406		let st = state.lock().unwrap();
1407		assert_eq!(st.len(), 1, "Task should have executed");
1408		assert_eq!(st[0], 1);
1409	}
1410
1411	#[tokio::test]
1412	pub async fn test_builder_with_key() {
1413		let task_store: Arc<dyn TaskStore<State>> = InMemoryTaskStore::new();
1414		let state: State = Arc::new(Mutex::new(Vec::new()));
1415		let scheduler = Scheduler::new(task_store);
1416		scheduler.start(state.clone());
1417		scheduler.register::<TestTask>().unwrap();
1418
1419		// Test builder with key
1420		let task = TestTask::new(1);
1421		let _id = scheduler.task(task).key("my-task-key").now().await.unwrap();
1422
1423		tokio::time::sleep(std::time::Duration::from_millis(500)).await;
1424
1425		let st = state.lock().unwrap();
1426		assert_eq!(st.len(), 1);
1427		assert_eq!(st[0], 1);
1428	}
1429
1430	#[tokio::test]
1431	pub async fn test_builder_with_delay() {
1432		let task_store: Arc<dyn TaskStore<State>> = InMemoryTaskStore::new();
1433		let state: State = Arc::new(Mutex::new(Vec::new()));
1434		let scheduler = Scheduler::new(task_store);
1435		scheduler.start(state.clone());
1436		scheduler.register::<TestTask>().unwrap();
1437
1438		// Test builder with .after() convenience method
1439		let task = TestTask::new(1);
1440		let _id = scheduler
1441			.task(task)
1442			.after(1)  // 1 second delay
1443			.await
1444			.unwrap();
1445
1446		// Should not have executed yet
1447		tokio::time::sleep(std::time::Duration::from_millis(500)).await;
1448		{
1449			let st = state.lock().unwrap();
1450			assert_eq!(st.len(), 0, "Task should not execute yet");
1451		}
1452
1453		// Wait for execution (1 sec delay + 200ms task sleep + buffer)
1454		tokio::time::sleep(std::time::Duration::from_millis(800)).await;
1455
1456		{
1457			let st = state.lock().unwrap();
1458			assert_eq!(st.len(), 1, "Task should have executed");
1459			assert_eq!(st[0], 1);
1460		}
1461	}
1462
1463	#[tokio::test]
1464	pub async fn test_builder_with_dependencies() {
1465		let task_store: Arc<dyn TaskStore<State>> = InMemoryTaskStore::new();
1466		let state: State = Arc::new(Mutex::new(Vec::new()));
1467		let scheduler = Scheduler::new(task_store);
1468		scheduler.start(state.clone());
1469		scheduler.register::<TestTask>().unwrap();
1470
1471		// Create first task (sleeps 200ms)
1472		let task1 = TestTask::new(1);
1473		let id1 = scheduler.task(task1).now().await.unwrap();
1474
1475		// Create second task (sleeps 400ms)
1476		let task2 = TestTask::new(1);
1477		let id2 = scheduler.task(task2).now().await.unwrap();
1478
1479		// Create third task that depends on first two (sleeps 600ms)
1480		let task3 = TestTask::new(1);
1481		let _id3 = scheduler.task(task3).depend_on(vec![id1, id2]).schedule().await.unwrap();
1482
1483		// Wait for all tasks: task1 200ms, task2 400ms, task3 600ms = ~1200ms
1484		tokio::time::sleep(std::time::Duration::from_millis(1500)).await;
1485
1486		let st = state.lock().unwrap();
1487		// Should have all three tasks in execution order: 1 finishes first (200ms), then 2 (200ms), then 3 (200ms after both)
1488		let str_vec = st.iter().map(std::string::ToString::to_string).collect::<Vec<String>>();
1489		assert_eq!(str_vec.join(":"), "1:1:1");
1490	}
1491
1492	#[tokio::test]
1493	pub async fn test_builder_with_retry() {
1494		let task_store: Arc<dyn TaskStore<State>> = InMemoryTaskStore::new();
1495		let state: State = Arc::new(Mutex::new(Vec::new()));
1496		let scheduler = Scheduler::new(task_store);
1497		scheduler.start(state.clone());
1498		scheduler.register::<FailingTask>().unwrap();
1499
1500		// Create task using builder with retry policy
1501		let failing_task = FailingTask::new(55, 1); // Fails once, succeeds second time
1502		let retry_policy = RetryPolicy { wait_min_max: (1, 3600), times: 3 };
1503
1504		let _id = scheduler.task(failing_task).with_retry(retry_policy).schedule().await.unwrap();
1505
1506		// Wait for retry cycle: 1 fail + 1s wait + 1 success
1507		tokio::time::sleep(std::time::Duration::from_secs(3)).await;
1508
1509		let st = state.lock().unwrap();
1510		assert_eq!(st.len(), 1);
1511		assert_eq!(st[0], 55);
1512	}
1513
1514	#[tokio::test]
1515	pub async fn test_builder_with_automatic_retry() {
1516		let task_store: Arc<dyn TaskStore<State>> = InMemoryTaskStore::new();
1517		let state: State = Arc::new(Mutex::new(Vec::new()));
1518		let scheduler = Scheduler::new(task_store);
1519		scheduler.start(state.clone());
1520		scheduler.register::<FailingTask>().unwrap();
1521
1522		// Create task using builder with automatic retry (default policy)
1523		let failing_task = FailingTask::new(66, 1);
1524		let _id = scheduler.task(failing_task).with_automatic_retry().await.unwrap();
1525
1526		// Wait for retry cycle with default policy (min=60s would be too long for test)
1527		// but we already tested retry logic thoroughly, just verify builder integration
1528		tokio::time::sleep(std::time::Duration::from_millis(500)).await;
1529
1530		// The important part is that this compiles and integrates correctly
1531		let st = state.lock().unwrap();
1532		// With default policy (min=60s), task shouldn't succeed in test timeframe
1533		// Just verify builder chaining works
1534		let _ = st.len(); // Verify state is accessible, but don't assert on timeout-dependent result
1535	}
1536
1537	#[tokio::test]
1538	pub async fn test_builder_fluent_chaining() {
1539		let task_store: Arc<dyn TaskStore<State>> = InMemoryTaskStore::new();
1540		let state: State = Arc::new(Mutex::new(Vec::new()));
1541		let scheduler = Scheduler::new(task_store);
1542		scheduler.start(state.clone());
1543		scheduler.register::<TestTask>().unwrap();
1544
1545		// Create first dependencies
1546		let dep1 = scheduler.task(TestTask::new(1)).now().await.unwrap();
1547		let dep2 = scheduler.task(TestTask::new(1)).now().await.unwrap();
1548
1549		// Test fluent chaining with multiple methods
1550		let retry_policy = RetryPolicy { wait_min_max: (1, 3600), times: 3 };
1551
1552		let task = TestTask::new(1);
1553		let _id = scheduler
1554			.task(task)
1555			.key("complex-task")
1556			.schedule_after(0)  // Schedule immediately
1557			.depend_on(vec![dep1, dep2])
1558			.with_retry(retry_policy)
1559			.schedule()
1560			.await
1561			.unwrap();
1562
1563		tokio::time::sleep(std::time::Duration::from_millis(800)).await;
1564
1565		let st = state.lock().unwrap();
1566		// Should have all tasks: 20:10 (immediate deps) then 30 (after deps)
1567		let str_vec = st.iter().map(std::string::ToString::to_string).collect::<Vec<String>>();
1568		assert_eq!(str_vec.join(":"), "1:1:1");
1569	}
1570
1571	#[tokio::test]
1572	pub async fn test_builder_backward_compatibility() {
1573		let task_store: Arc<dyn TaskStore<State>> = InMemoryTaskStore::new();
1574		let state: State = Arc::new(Mutex::new(Vec::new()));
1575		let scheduler = Scheduler::new(task_store);
1576		scheduler.start(state.clone());
1577		scheduler.register::<TestTask>().unwrap();
1578
1579		// Test that old API still works
1580		let _id1 = scheduler.add(TestTask::new(1)).await.unwrap();
1581
1582		// Test that new builder API works
1583		let _id2 = scheduler.task(TestTask::new(1)).now().await.unwrap();
1584
1585		tokio::time::sleep(std::time::Duration::from_millis(800)).await;
1586
1587		let st = state.lock().unwrap();
1588		// Both old and new API should have executed
1589		assert_eq!(st.len(), 2);
1590		let str_vec = st.iter().map(std::string::ToString::to_string).collect::<Vec<String>>();
1591		assert_eq!(str_vec.join(":"), "1:1");
1592	}
1593
1594	// ===== Phase 2: Integration Tests - Real-world scenarios =====
1595
1596	#[tokio::test]
1597	pub async fn test_builder_pipeline_scenario() {
1598		// Simulates: Task 1 -> Task 2 (depends on 1) -> Task 3 (depends on 2)
1599		let task_store: Arc<dyn TaskStore<State>> = InMemoryTaskStore::new();
1600		let state: State = Arc::new(Mutex::new(Vec::new()));
1601		let scheduler = Scheduler::new(task_store);
1602		scheduler.start(state.clone());
1603		scheduler.register::<TestTask>().unwrap();
1604
1605		// Stage 1: Create initial task
1606		let id1 = scheduler.task(TestTask::new(1)).key("stage-1").now().await.unwrap();
1607
1608		// Stage 2: Create task that depends on stage 1
1609		let id2 = scheduler.task(TestTask::new(1)).key("stage-2").after_task(id1).await.unwrap();
1610
1611		// Stage 3: Create task that depends on stage 2
1612		let _id3 = scheduler.task(TestTask::new(1)).key("stage-3").after_task(id2).await.unwrap();
1613
1614		// Wait for pipeline: 1(200ms) + 2(200ms) + 3(200ms) = 600ms
1615		tokio::time::sleep(std::time::Duration::from_millis(1200)).await;
1616
1617		let st = state.lock().unwrap();
1618		// Should execute in order: 1, 2, 3
1619		let str_vec = st.iter().map(std::string::ToString::to_string).collect::<Vec<String>>();
1620		assert_eq!(str_vec.join(":"), "1:1:1");
1621	}
1622
1623	#[tokio::test]
1624	pub async fn test_builder_multi_dependency_join() {
1625		// Simulates: Task 1 parallel with Task 2, then Task 3 waits for both
1626		let task_store: Arc<dyn TaskStore<State>> = InMemoryTaskStore::new();
1627		let state: State = Arc::new(Mutex::new(Vec::new()));
1628		let scheduler = Scheduler::new(task_store);
1629		scheduler.start(state.clone());
1630		scheduler.register::<TestTask>().unwrap();
1631
1632		// Parallel tasks
1633		let id1 = scheduler.task(TestTask::new(1)).now().await.unwrap();
1634		let id2 = scheduler.task(TestTask::new(1)).now().await.unwrap();
1635
1636		// Join task - waits for both
1637		let _id3 = scheduler
1638			.task(TestTask::new(1))
1639			.depend_on(vec![id1, id2])
1640			.schedule()
1641			.await
1642			.unwrap();
1643
1644		tokio::time::sleep(std::time::Duration::from_secs(1)).await;
1645
1646		let st = state.lock().unwrap();
1647		// 1 and 2 execute in parallel, then 3 executes after both
1648		let str_vec = st.iter().map(std::string::ToString::to_string).collect::<Vec<String>>();
1649		assert_eq!(str_vec.join(":"), "1:1:1");
1650	}
1651
1652	#[tokio::test]
1653	pub async fn test_builder_scheduled_task_with_dependencies() {
1654		// Simulates: Task depends on earlier task AND is scheduled for future time
1655		let task_store: Arc<dyn TaskStore<State>> = InMemoryTaskStore::new();
1656		let state: State = Arc::new(Mutex::new(Vec::new()));
1657		let scheduler = Scheduler::new(task_store);
1658		scheduler.start(state.clone());
1659		scheduler.register::<TestTask>().unwrap();
1660
1661		// Immediate task
1662		let dep_id = scheduler.task(TestTask::new(1)).now().await.unwrap();
1663
1664		// Task that waits for dependency AND scheduled delay
1665		let ts = Timestamp::from_now(1);
1666		let _task_id = scheduler
1667			.task(TestTask::new(1))
1668			.schedule_at(ts)
1669			.depend_on(vec![dep_id])
1670			.schedule()
1671			.await
1672			.unwrap();
1673
1674		// Wait for dependency to complete but before scheduled time
1675		tokio::time::sleep(std::time::Duration::from_millis(300)).await;
1676		{
1677			let st = state.lock().unwrap();
1678			assert_eq!(st.len(), 1); // Only dependency executed
1679		}
1680
1681		// Wait for scheduled time (1s total from initial schedule)
1682		tokio::time::sleep(std::time::Duration::from_millis(800)).await;
1683
1684		{
1685			let st = state.lock().unwrap();
1686			let str_vec = st.iter().map(std::string::ToString::to_string).collect::<Vec<String>>();
1687			assert_eq!(str_vec.join(":"), "1:1");
1688		}
1689	}
1690
1691	#[tokio::test]
1692	pub async fn test_builder_mixed_features() {
1693		// Simulates: Complex real-world scenario with key, scheduling, deps, and retry
1694		let task_store: Arc<dyn TaskStore<State>> = InMemoryTaskStore::new();
1695		let state: State = Arc::new(Mutex::new(Vec::new()));
1696		let scheduler = Scheduler::new(task_store);
1697		scheduler.start(state.clone());
1698		scheduler.register::<TestTask>().unwrap();
1699		scheduler.register::<FailingTask>().unwrap();
1700
1701		// Create initial tasks
1702		let id1 = scheduler.task(TestTask::new(1)).now().await.unwrap();
1703
1704		// Create complex task: scheduled + depends on id1 + has key
1705		let _id2 = scheduler
1706			.task(TestTask::new(1))
1707			.key("critical-task")
1708			.schedule_after(0)
1709			.depend_on(vec![id1])
1710			.schedule()
1711			.await
1712			.unwrap();
1713
1714		// Create task with retry
1715		let _id3 = scheduler
1716			.task(FailingTask::new(1, 0))  // Fails 0 times, succeeds immediately
1717			.key("retryable-task")
1718			.with_retry(RetryPolicy {
1719				wait_min_max: (1, 3600),
1720				times: 3,
1721			})
1722			.schedule()
1723			.await
1724			.unwrap();
1725
1726		// Wait for tasks: id1 (200ms) + id2 (200ms after id1) + id3 (200ms) = ~600ms
1727		tokio::time::sleep(std::time::Duration::from_millis(1200)).await;
1728
1729		let st = state.lock().unwrap();
1730		// All three tasks should execute
1731		let str_vec = st.iter().map(std::string::ToString::to_string).collect::<Vec<String>>();
1732		assert_eq!(str_vec.join(":"), "1:1:1");
1733	}
1734
1735	#[tokio::test]
1736	pub async fn test_builder_builder_reuse_not_possible() {
1737		// Verify that builder is consumed (moved) and can't be reused
1738		let task_store: Arc<dyn TaskStore<State>> = InMemoryTaskStore::new();
1739		let _state: State = Arc::new(Mutex::new(Vec::new()));
1740		let scheduler = Scheduler::new(task_store);
1741
1742		let task = TestTask::new(1);
1743		let builder = scheduler.task(task);
1744
1745		// This would not compile if uncommented (builder is moved):
1746		// let _id1 = builder.now().await.unwrap();
1747		// let _id2 = builder.now().await.unwrap();  // Error: use of moved value
1748
1749		// Can only call terminal method once
1750		let _id = builder.now().await.unwrap();
1751		// builder is now consumed, can't use again
1752
1753		// Test passes if it compiles (verifying move semantics)
1754	}
1755
1756	#[tokio::test]
1757	pub async fn test_builder_different_task_types() {
1758		// Test builder works with different task implementations
1759		let task_store: Arc<dyn TaskStore<State>> = InMemoryTaskStore::new();
1760		let state: State = Arc::new(Mutex::new(Vec::new()));
1761		let scheduler = Scheduler::new(task_store);
1762		scheduler.start(state.clone());
1763		scheduler.register::<TestTask>().unwrap();
1764		scheduler.register::<FailingTask>().unwrap();
1765
1766		// Mix of different task types
1767		let _id1 = scheduler.task(TestTask::new(1)).key("test-task").now().await.unwrap();
1768
1769		let _id2 = scheduler
1770			.task(FailingTask::new(1, 0))  // Won't fail
1771			.key("failing-task")
1772			.now()
1773			.await
1774			.unwrap();
1775
1776		let _id3 = scheduler.task(TestTask::new(1)).now().await.unwrap();
1777
1778		tokio::time::sleep(std::time::Duration::from_secs(1)).await;
1779
1780		let st = state.lock().unwrap();
1781		assert_eq!(st.len(), 3);
1782		let str_vec = st.iter().map(std::string::ToString::to_string).collect::<Vec<String>>();
1783		// All three tasks should execute
1784		assert_eq!(str_vec.join(":"), "1:1:1");
1785	}
1786
1787	// ===== Phase 3: Cron Placeholder Tests =====
1788	// These tests verify that cron methods compile and integrate
1789	// Actual cron functionality will be implemented in Phase 3
1790
1791	#[tokio::test]
1792	pub async fn test_builder_cron_placeholder_syntax() {
1793		// Verify cron placeholder methods compile and chain properly
1794		let task_store: Arc<dyn TaskStore<State>> = InMemoryTaskStore::new();
1795		let state: State = Arc::new(Mutex::new(Vec::new()));
1796		let scheduler = Scheduler::new(task_store);
1797		scheduler.start(state.clone());
1798		scheduler.register::<TestTask>().unwrap();
1799
1800		// Test that cron methods compile (they're no-ops in Phase 2)
1801		let task = TestTask::new(1);
1802		let _id = scheduler
1803			.task(task)
1804			.key("cron-task")
1805			.cron("0 9 * * *")  // 9 AM daily
1806			.schedule()
1807			.await
1808			.unwrap();
1809
1810		// Cron scheduling - task will execute at the next scheduled time
1811		// For cron "0 9 * * *", that's tomorrow at 9 AM, so task won't execute in this test
1812		// This test just verifies the methods compile and chain properly
1813		tokio::time::sleep(std::time::Duration::from_millis(500)).await;
1814
1815		let st = state.lock().unwrap();
1816		// Task is scheduled for future (9 AM), so it won't have executed yet
1817		// The important thing is that the cron methods compile and integrate
1818		assert_eq!(st.len(), 0); // Not executed yet since scheduled for future
1819	}
1820
1821	#[tokio::test]
1822	pub async fn test_builder_daily_at_placeholder() {
1823		// Verify daily_at placeholder compiles and integrates
1824		let task_store: Arc<dyn TaskStore<State>> = InMemoryTaskStore::new();
1825		let state: State = Arc::new(Mutex::new(Vec::new()));
1826		let scheduler = Scheduler::new(task_store);
1827		scheduler.start(state.clone());
1828		scheduler.register::<TestTask>().unwrap();
1829
1830		// Test that daily_at placeholder compiles
1831		let task = TestTask::new(1);
1832		let _id = scheduler
1833			.task(task)
1834			.key("daily-task")
1835			.daily_at(14, 30)  // 2:30 PM daily
1836			.schedule()
1837			.await
1838			.unwrap();
1839
1840		// Daily_at scheduling - task will execute at the specified time (2:30 PM daily)
1841		// Task is scheduled for future, so it won't execute in this test
1842		tokio::time::sleep(std::time::Duration::from_millis(500)).await;
1843
1844		let st = state.lock().unwrap();
1845		// Task is scheduled for future (2:30 PM), not executed yet
1846		// The important thing is that daily_at compiles and integrates properly
1847		assert_eq!(st.len(), 0);
1848	}
1849
1850	#[tokio::test]
1851	pub async fn test_builder_weekly_at_placeholder() {
1852		// Verify weekly_at placeholder compiles and integrates
1853		let task_store: Arc<dyn TaskStore<State>> = InMemoryTaskStore::new();
1854		let state: State = Arc::new(Mutex::new(Vec::new()));
1855		let scheduler = Scheduler::new(task_store);
1856		scheduler.start(state.clone());
1857		scheduler.register::<TestTask>().unwrap();
1858
1859		// Test that weekly_at placeholder compiles
1860		let task = TestTask::new(1);
1861		let _id = scheduler
1862			.task(task)
1863			.key("weekly-task")
1864			.weekly_at(1, 9, 0)  // Monday at 9 AM
1865			.schedule()
1866			.await
1867			.unwrap();
1868
1869		// Weekly_at scheduling - task will execute on Monday at 9 AM
1870		// Task is scheduled for future, so it won't execute in this test
1871		tokio::time::sleep(std::time::Duration::from_millis(500)).await;
1872
1873		let st = state.lock().unwrap();
1874		// Task is scheduled for future (Monday 9 AM), not executed yet
1875		// The important thing is that weekly_at compiles and integrates properly
1876		assert_eq!(st.len(), 0);
1877	}
1878
1879	#[tokio::test]
1880	pub async fn test_builder_cron_with_retry() {
1881		// Verify cron methods chain with retry (future combined usage)
1882		let task_store: Arc<dyn TaskStore<State>> = InMemoryTaskStore::new();
1883		let state: State = Arc::new(Mutex::new(Vec::new()));
1884		let scheduler = Scheduler::new(task_store);
1885		scheduler.start(state.clone());
1886		scheduler.register::<TestTask>().unwrap();
1887
1888		// Test future usage pattern: cron + retry
1889		let task = TestTask::new(1);
1890		let _id = scheduler
1891			.task(task)
1892			.key("reliable-scheduled-task")
1893			.daily_at(2, 0)  // 2 AM daily
1894			.with_retry(RetryPolicy {
1895				wait_min_max: (60, 3600),
1896				times: 5,
1897			})
1898			.schedule()
1899			.await
1900			.unwrap();
1901
1902		// Verify cron+retry chain compiles properly
1903		// Task is scheduled for 2 AM, so won't execute in this test
1904		tokio::time::sleep(std::time::Duration::from_millis(500)).await;
1905
1906		let st = state.lock().unwrap();
1907		// Task scheduled for future (2 AM), not executed yet
1908		// The important thing is that chaining cron + retry works
1909		assert_eq!(st.len(), 0);
1910	}
1911
1912	// ===== Cron Schedule Tests =====
1913
1914	#[test]
1915	fn test_cron_to_string() {
1916		// Test that to_cron_string returns the original expression
1917		let cron = CronSchedule::parse("*/5 * * * *").unwrap();
1918		assert_eq!(cron.to_cron_string(), "*/5 * * * *");
1919	}
1920
1921	#[tokio::test]
1922	pub async fn test_running_task_not_double_scheduled() {
1923		let _ = tracing_subscriber::fmt().try_init();
1924
1925		let task_store: Arc<dyn TaskStore<State>> = InMemoryTaskStore::new();
1926		let state: State = Arc::new(Mutex::new(Vec::new()));
1927		let scheduler = Scheduler::new(task_store);
1928		scheduler.start(state.clone());
1929		scheduler.register::<TestTask>().unwrap();
1930
1931		// Create a task
1932		let task = TestTask::new(5); // Takes 1 second (5 * 200ms)
1933		let task_id = scheduler.add(task.clone()).await.unwrap();
1934
1935		// Wait a bit for task to start running
1936		tokio::time::sleep(std::time::Duration::from_millis(100)).await;
1937
1938		// Verify task is in tasks_running
1939		{
1940			let running = scheduler.tasks_running.lock().unwrap();
1941			assert!(running.contains_key(&task_id), "Task should be in running queue");
1942		}
1943
1944		// Try to add the same task again via add_queue
1945		let task_meta = TaskMeta {
1946			task: task.clone(),
1947			next_at: Some(Timestamp::now()),
1948			deps: vec![],
1949			retry_count: 0,
1950			retry: None,
1951			cron: None,
1952		};
1953		let result = scheduler.add_queue(task_id, task_meta).await;
1954
1955		// Should succeed but not actually add to scheduled queue
1956		assert!(result.is_ok(), "add_queue should succeed");
1957
1958		// Verify task is NOT in tasks_scheduled (only in running)
1959		{
1960			let sched_queue = scheduler.tasks_scheduled.lock().unwrap();
1961			let in_scheduled = sched_queue.iter().any(|((_, id), _)| *id == task_id);
1962			assert!(!in_scheduled, "Task should NOT be in scheduled queue while running");
1963		}
1964
1965		// Wait for original task to complete
1966		tokio::time::sleep(std::time::Duration::from_secs(2)).await;
1967
1968		// Verify task completed
1969		let st = state.lock().unwrap();
1970		assert_eq!(st.len(), 1, "Only one task execution should have occurred");
1971		assert_eq!(st[0], 5);
1972	}
1973
1974	#[tokio::test]
1975	pub async fn test_running_task_metadata_updated() {
1976		let _ = tracing_subscriber::fmt().try_init();
1977
1978		let task_store: Arc<dyn TaskStore<State>> = InMemoryTaskStore::new();
1979		let state: State = Arc::new(Mutex::new(Vec::new()));
1980		let scheduler = Scheduler::new(task_store);
1981		scheduler.start(state.clone());
1982		scheduler.register::<TestTask>().unwrap();
1983
1984		// Create a task without cron
1985		let task = TestTask::new(5); // Takes 1 second (5 * 200ms)
1986		let task_id = scheduler.add(task.clone()).await.unwrap();
1987
1988		// Wait a bit for task to start running
1989		tokio::time::sleep(std::time::Duration::from_millis(100)).await;
1990
1991		// Verify task is running and has no cron
1992		{
1993			let running = scheduler.tasks_running.lock().unwrap();
1994			let meta = running.get(&task_id).expect("Task should be running");
1995			assert!(meta.cron.is_none(), "Task should have no cron initially");
1996		}
1997
1998		// Try to update the running task with a cron schedule
1999		let cron = CronSchedule::parse("*/5 * * * *").unwrap();
2000		let task_meta_with_cron = TaskMeta {
2001			task: task.clone(),
2002			next_at: Some(Timestamp::now()),
2003			deps: vec![],
2004			retry_count: 0,
2005			retry: None,
2006			cron: Some(cron.clone()),
2007		};
2008		let result = scheduler.add_queue(task_id, task_meta_with_cron).await;
2009
2010		// Should succeed
2011		assert!(result.is_ok(), "add_queue should succeed");
2012
2013		// Verify the running task now has the cron schedule
2014		{
2015			let running = scheduler.tasks_running.lock().unwrap();
2016			let meta = running.get(&task_id).expect("Task should still be running");
2017			assert!(meta.cron.is_some(), "Task should now have cron after update");
2018		}
2019
2020		// Wait for task to complete
2021		tokio::time::sleep(std::time::Duration::from_secs(2)).await;
2022	}
2023}
2024
2025// vim: ts=4