Skip to main content

cloudillo_core/
scheduler.rs

1// SPDX-FileCopyrightText: Szilárd Hajba
2// SPDX-License-Identifier: LGPL-3.0-or-later
3
4//! Scheduler subsystem. Handles async tasks, dependencies, fallbacks, repetitions, persistence..
5
6use async_trait::async_trait;
7use itertools::Itertools;
8use std::{
9	collections::{BTreeMap, HashMap},
10	fmt::Debug,
11	sync::{Arc, Mutex, RwLock},
12};
13
14use chrono::{DateTime, Utc};
15use croner::Cron;
16use std::str::FromStr;
17
18use crate::prelude::*;
19use cloudillo_types::{lock, meta_adapter};
20
21pub type TaskId = u64;
22
23pub enum TaskType {
24	Periodic,
25	Once,
26}
27
28/// Cron schedule wrapper using the croner crate
29/// Stores the expression string for serialization
30#[derive(Debug, Clone)]
31pub struct CronSchedule {
32	/// The original cron expression string
33	expr: Box<str>,
34	/// Parsed cron object
35	cron: Cron,
36}
37
38impl CronSchedule {
39	/// Parse a cron expression (5 fields: minute hour day month weekday)
40	pub fn parse(expr: &str) -> ClResult<Self> {
41		let cron = Cron::from_str(expr)
42			.map_err(|e| Error::ValidationError(format!("invalid cron expression: {}", e)))?;
43		Ok(Self { expr: expr.into(), cron })
44	}
45
46	/// Calculate the next execution time after the given timestamp
47	///
48	/// Returns an error if no next occurrence can be found (should be rare
49	/// for valid expressions within reasonable time bounds).
50	pub fn next_execution(&self, after: Timestamp) -> ClResult<Timestamp> {
51		let dt = DateTime::<Utc>::from_timestamp(after.0, 0).unwrap_or_else(Utc::now);
52
53		self.cron
54			.find_next_occurrence(&dt, false)
55			.map(|next| Timestamp(next.timestamp()))
56			.map_err(|e| {
57				tracing::error!("Failed to find next cron occurrence for '{}': {}", self.expr, e);
58				Error::ValidationError(format!("cron next_execution failed: {}", e))
59			})
60	}
61
62	/// Convert back to cron expression string
63	pub fn to_cron_string(&self) -> String {
64		self.expr.to_string()
65	}
66}
67
68impl PartialEq for CronSchedule {
69	fn eq(&self, other: &Self) -> bool {
70		self.expr == other.expr
71	}
72}
73
74impl Eq for CronSchedule {}
75
76#[async_trait]
77pub trait Task<S: Clone>: Send + Sync + Debug {
78	fn kind() -> &'static str
79	where
80		Self: Sized;
81	fn build(id: TaskId, context: &str) -> ClResult<Arc<dyn Task<S>>>
82	where
83		Self: Sized;
84	fn serialize(&self) -> String;
85	async fn run(&self, state: &S) -> ClResult<()>;
86
87	fn kind_of(&self) -> &'static str;
88}
89
90#[derive(Debug)]
91pub enum TaskStatus {
92	Pending,
93	Completed,
94	Failed,
95}
96
97pub struct TaskData {
98	id: TaskId,
99	kind: Box<str>,
100	status: TaskStatus,
101	input: Box<str>,
102	deps: Box<[TaskId]>,
103	retry_data: Option<Box<str>>,
104	cron_data: Option<Box<str>>,
105	next_at: Option<Timestamp>,
106}
107
108#[async_trait]
109pub trait TaskStore<S: Clone>: Send + Sync {
110	async fn add(&self, task: &TaskMeta<S>, key: Option<&str>) -> ClResult<TaskId>;
111	async fn finished(&self, id: TaskId, output: &str) -> ClResult<()>;
112	async fn load(&self) -> ClResult<Vec<TaskData>>;
113	async fn update_task_error(
114		&self,
115		task_id: TaskId,
116		output: &str,
117		next_at: Option<Timestamp>,
118	) -> ClResult<()>;
119	async fn find_by_key(&self, key: &str) -> ClResult<Option<(TaskId, TaskData)>>;
120	async fn update_task(&self, id: TaskId, task: &TaskMeta<S>) -> ClResult<()>;
121}
122
123// InMemoryTaskStore
124//*******************
125pub struct InMemoryTaskStore {
126	last_id: Mutex<TaskId>,
127}
128
129impl InMemoryTaskStore {
130	pub fn new() -> Arc<Self> {
131		Arc::new(Self { last_id: Mutex::new(0) })
132	}
133}
134
135#[async_trait]
136impl<S: Clone> TaskStore<S> for InMemoryTaskStore {
137	async fn add(&self, _task: &TaskMeta<S>, _key: Option<&str>) -> ClResult<TaskId> {
138		let mut last_id = lock!(self.last_id)?;
139		*last_id += 1;
140		Ok(*last_id)
141	}
142
143	async fn finished(&self, _id: TaskId, _output: &str) -> ClResult<()> {
144		Ok(())
145	}
146
147	async fn load(&self) -> ClResult<Vec<TaskData>> {
148		Ok(vec![])
149	}
150
151	async fn update_task_error(
152		&self,
153		_task_id: TaskId,
154		_output: &str,
155		_next_at: Option<Timestamp>,
156	) -> ClResult<()> {
157		Ok(())
158	}
159
160	async fn find_by_key(&self, _key: &str) -> ClResult<Option<(TaskId, TaskData)>> {
161		// In-memory store doesn't support persistence or keys
162		Ok(None)
163	}
164
165	async fn update_task(&self, _id: TaskId, _task: &TaskMeta<S>) -> ClResult<()> {
166		// In-memory store doesn't support persistence
167		Ok(())
168	}
169}
170
171// MetaAdapterTaskStore
172//**********************
173pub struct MetaAdapterTaskStore {
174	meta_adapter: Arc<dyn meta_adapter::MetaAdapter>,
175}
176
177impl MetaAdapterTaskStore {
178	pub fn new(meta_adapter: Arc<dyn meta_adapter::MetaAdapter>) -> Arc<Self> {
179		Arc::new(Self { meta_adapter })
180	}
181}
182
183#[async_trait]
184impl<S: Clone> TaskStore<S> for MetaAdapterTaskStore {
185	async fn add(&self, task: &TaskMeta<S>, key: Option<&str>) -> ClResult<TaskId> {
186		let id = self
187			.meta_adapter
188			.create_task(task.task.kind_of(), key, &task.task.serialize(), &task.deps)
189			.await?;
190
191		// Store cron schedule if present
192		if let Some(cron) = &task.cron {
193			self.meta_adapter
194				.update_task(
195					id,
196					&meta_adapter::TaskPatch {
197						cron: Patch::Value(cron.to_cron_string()),
198						..Default::default()
199					},
200				)
201				.await?;
202		}
203
204		Ok(id)
205	}
206
207	async fn finished(&self, id: TaskId, output: &str) -> ClResult<()> {
208		self.meta_adapter.update_task_finished(id, output).await
209	}
210
211	async fn load(&self) -> ClResult<Vec<TaskData>> {
212		let tasks = self.meta_adapter.list_tasks(meta_adapter::ListTaskOptions::default()).await?;
213		let tasks = tasks
214			.into_iter()
215			.map(|t| TaskData {
216				id: t.task_id,
217				kind: t.kind,
218				status: match t.status {
219					'P' => TaskStatus::Pending,
220					'F' => TaskStatus::Completed,
221					// 'E' or unknown status = Failed
222					_ => TaskStatus::Failed,
223				},
224				input: t.input,
225				deps: t.deps,
226				retry_data: t.retry,
227				cron_data: t.cron,
228				next_at: t.next_at,
229			})
230			.collect();
231		Ok(tasks)
232	}
233
234	async fn update_task_error(
235		&self,
236		task_id: TaskId,
237		output: &str,
238		next_at: Option<Timestamp>,
239	) -> ClResult<()> {
240		self.meta_adapter.update_task_error(task_id, output, next_at).await
241	}
242
243	async fn find_by_key(&self, key: &str) -> ClResult<Option<(TaskId, TaskData)>> {
244		let task_opt = self.meta_adapter.find_task_by_key(key).await?;
245
246		match task_opt {
247			Some(t) => Ok(Some((
248				t.task_id,
249				TaskData {
250					id: t.task_id,
251					kind: t.kind,
252					status: match t.status {
253						'P' => TaskStatus::Pending,
254						'F' => TaskStatus::Completed,
255						// 'E' or unknown status = Failed
256						_ => TaskStatus::Failed,
257					},
258					input: t.input,
259					deps: t.deps,
260					retry_data: t.retry,
261					cron_data: t.cron,
262					next_at: t.next_at,
263				},
264			))),
265			None => Ok(None),
266		}
267	}
268
269	async fn update_task(&self, id: TaskId, task: &TaskMeta<S>) -> ClResult<()> {
270		use cloudillo_types::types::Patch;
271
272		// Build TaskPatch from TaskMeta
273		let mut patch = meta_adapter::TaskPatch {
274			input: Patch::Value(task.task.serialize()),
275			next_at: match task.next_at {
276				Some(ts) => Patch::Value(ts),
277				None => Patch::Null,
278			},
279			..Default::default()
280		};
281
282		// Update deps
283		if !task.deps.is_empty() {
284			patch.deps = Patch::Value(task.deps.clone());
285		}
286
287		// Update retry policy
288		if let Some(ref retry) = task.retry {
289			let retry_str = format!(
290				"{},{},{},{}",
291				task.retry_count, retry.wait_min_max.0, retry.wait_min_max.1, retry.times
292			);
293			patch.retry = Patch::Value(retry_str);
294		}
295
296		// Update cron schedule
297		if let Some(ref cron) = task.cron {
298			patch.cron = Patch::Value(cron.to_cron_string());
299		}
300
301		self.meta_adapter.update_task(id, &patch).await
302	}
303}
304
305// Task metadata
306type TaskBuilder<S> = dyn Fn(TaskId, &str) -> ClResult<Arc<dyn Task<S>>> + Send + Sync;
307
308#[derive(Debug, Clone)]
309pub struct RetryPolicy {
310	wait_min_max: (u64, u64),
311	times: u16,
312}
313
314impl Default for RetryPolicy {
315	fn default() -> Self {
316		Self { wait_min_max: (60, 3600), times: 10 }
317	}
318}
319
320impl RetryPolicy {
321	/// Create a new RetryPolicy with custom min/max backoff and number of retries
322	pub fn new(wait_min_max: (u64, u64), times: u16) -> Self {
323		Self { wait_min_max, times }
324	}
325
326	/// Calculate exponential backoff in seconds: min * (2^attempt), capped at max
327	pub fn calculate_backoff(&self, attempt_count: u16) -> u64 {
328		let (min, max) = self.wait_min_max;
329		let backoff = min * (1u64 << u64::from(attempt_count));
330		backoff.min(max)
331	}
332
333	/// Check if we should continue retrying
334	pub fn should_retry(&self, attempt_count: u16) -> bool {
335		attempt_count < self.times
336	}
337}
338
339// TaskSchedulerBuilder - Fluent API for task scheduling
340//************************************************************
341pub struct TaskSchedulerBuilder<'a, S: Clone> {
342	scheduler: &'a Scheduler<S>,
343	task: Arc<dyn Task<S>>,
344	key: Option<String>,
345	next_at: Option<Timestamp>,
346	deps: Vec<TaskId>,
347	retry: Option<RetryPolicy>,
348	cron: Option<CronSchedule>,
349}
350
351impl<'a, S: Clone + Send + Sync + 'static> TaskSchedulerBuilder<'a, S> {
352	/// Create a new builder for scheduling a task
353	fn new(scheduler: &'a Scheduler<S>, task: Arc<dyn Task<S>>) -> Self {
354		Self {
355			scheduler,
356			task,
357			key: None,
358			next_at: None,
359			deps: Vec::new(),
360			retry: None,
361			cron: None,
362		}
363	}
364
365	/// Set a string key for task identification
366	pub fn key(mut self, key: impl Into<String>) -> Self {
367		self.key = Some(key.into());
368		self
369	}
370
371	/// Schedule for a specific absolute timestamp
372	pub fn schedule_at(mut self, timestamp: Timestamp) -> Self {
373		self.next_at = Some(timestamp);
374		self
375	}
376
377	/// Schedule after a relative delay (in seconds)
378	pub fn schedule_after(mut self, seconds: i64) -> Self {
379		self.next_at = Some(Timestamp::from_now(seconds));
380		self
381	}
382
383	/// Add task dependencies - task waits for all of these to complete
384	pub fn depend_on(mut self, deps: Vec<TaskId>) -> Self {
385		self.deps = deps;
386		self
387	}
388
389	/// Add a single task dependency
390	pub fn depends_on(mut self, dep: TaskId) -> Self {
391		self.deps.push(dep);
392		self
393	}
394
395	/// Enable automatic retry with exponential backoff
396	pub fn with_retry(mut self, policy: RetryPolicy) -> Self {
397		self.retry = Some(policy);
398		self
399	}
400
401	// ===== Cron Scheduling Methods =====
402
403	/// Schedule task with cron expression
404	/// Example: `.cron("0 9 * * *")` for 9 AM daily
405	pub fn cron(mut self, expr: impl Into<String>) -> Self {
406		if let Ok(cron_schedule) = CronSchedule::parse(&expr.into()) {
407			// Calculate initial next_at from cron schedule
408			// Use .ok() - cron was just parsed successfully, should never fail
409			self.next_at = cron_schedule.next_execution(Timestamp::now()).ok();
410			self.cron = Some(cron_schedule);
411		}
412		self
413	}
414
415	/// Schedule task daily at specified time
416	/// Example: `.daily_at(2, 30)` for 2:30 AM daily
417	pub fn daily_at(mut self, hour: u8, minute: u8) -> Self {
418		if hour <= 23 && minute <= 59 {
419			let expr = format!("{} {} * * *", minute, hour);
420			if let Ok(cron_schedule) = CronSchedule::parse(&expr) {
421				// Calculate initial next_at from cron schedule
422				// Use .ok() - cron was just parsed successfully, should never fail
423				self.next_at = cron_schedule.next_execution(Timestamp::now()).ok();
424				self.cron = Some(cron_schedule);
425			}
426		}
427		self
428	}
429
430	/// Schedule task weekly at specified day and time
431	/// Example: `.weekly_at(1, 14, 30)` for Mondays at 2:30 PM
432	/// weekday: 0=Sunday, 1=Monday, ..., 6=Saturday
433	pub fn weekly_at(mut self, weekday: u8, hour: u8, minute: u8) -> Self {
434		if weekday <= 6 && hour <= 23 && minute <= 59 {
435			let expr = format!("{} {} * * {}", minute, hour, weekday);
436			if let Ok(cron_schedule) = CronSchedule::parse(&expr) {
437				// Calculate initial next_at from cron schedule
438				// Use .ok() - cron was just parsed successfully, should never fail
439				self.next_at = cron_schedule.next_execution(Timestamp::now()).ok();
440				self.cron = Some(cron_schedule);
441			}
442		}
443		self
444	}
445
446	/// Execute the scheduled task immediately
447	pub async fn now(self) -> ClResult<TaskId> {
448		self.schedule().await
449	}
450
451	/// Execute the scheduled task at a specific timestamp
452	pub async fn at(mut self, ts: Timestamp) -> ClResult<TaskId> {
453		self.next_at = Some(ts);
454		self.schedule().await
455	}
456
457	/// Execute the scheduled task after a delay (in seconds)
458	pub async fn after(mut self, seconds: i64) -> ClResult<TaskId> {
459		self.next_at = Some(Timestamp::from_now(seconds));
460		self.schedule().await
461	}
462
463	/// Execute the scheduled task after another task completes
464	pub async fn after_task(mut self, dep: TaskId) -> ClResult<TaskId> {
465		self.deps.push(dep);
466		self.schedule().await
467	}
468
469	/// Execute the scheduled task with automatic retry using default policy
470	pub async fn with_automatic_retry(mut self) -> ClResult<TaskId> {
471		self.retry = Some(RetryPolicy::default());
472		self.schedule().await
473	}
474
475	/// Execute the task with all configured options - main terminal method
476	pub async fn schedule(self) -> ClResult<TaskId> {
477		self.scheduler
478			.schedule_task_impl(
479				self.task,
480				self.key.as_deref(),
481				self.next_at,
482				if self.deps.is_empty() { None } else { Some(self.deps) },
483				self.retry,
484				self.cron,
485			)
486			.await
487	}
488}
489
490#[derive(Debug, Clone)]
491pub struct TaskMeta<S: Clone> {
492	pub task: Arc<dyn Task<S>>,
493	pub next_at: Option<Timestamp>,
494	pub deps: Vec<TaskId>,
495	retry_count: u16,
496	pub retry: Option<RetryPolicy>,
497	pub cron: Option<CronSchedule>,
498}
499
500type TaskBuilderRegistry<S> = HashMap<&'static str, Box<TaskBuilder<S>>>;
501type ScheduledTaskMap<S> = BTreeMap<(Timestamp, TaskId), TaskMeta<S>>;
502
503// Scheduler
504#[derive(Clone)]
505pub struct Scheduler<S: Clone> {
506	task_builders: Arc<RwLock<TaskBuilderRegistry<S>>>,
507	store: Arc<dyn TaskStore<S>>,
508	tasks_running: Arc<Mutex<HashMap<TaskId, TaskMeta<S>>>>,
509	tasks_waiting: Arc<Mutex<HashMap<TaskId, TaskMeta<S>>>>,
510	task_dependents: Arc<Mutex<HashMap<TaskId, Vec<TaskId>>>>,
511	tasks_scheduled: Arc<Mutex<ScheduledTaskMap<S>>>,
512	tx_finish: flume::Sender<TaskId>,
513	rx_finish: flume::Receiver<TaskId>,
514	notify_schedule: Arc<tokio::sync::Notify>,
515}
516
517impl<S: Clone + Send + Sync + 'static> Scheduler<S> {
518	pub fn new(store: Arc<dyn TaskStore<S>>) -> Arc<Self> {
519		let (tx_finish, rx_finish) = flume::unbounded();
520
521		let scheduler = Self {
522			task_builders: Arc::new(RwLock::new(HashMap::new())),
523			store,
524			tasks_running: Arc::new(Mutex::new(HashMap::new())),
525			tasks_waiting: Arc::new(Mutex::new(HashMap::new())),
526			task_dependents: Arc::new(Mutex::new(HashMap::new())),
527			tasks_scheduled: Arc::new(Mutex::new(BTreeMap::new())),
528			tx_finish,
529			rx_finish,
530			notify_schedule: Arc::new(tokio::sync::Notify::new()),
531		};
532
533		//scheduler.run(rx_finish)?;
534
535		Arc::new(scheduler)
536	}
537
538	pub fn start(&self, state: S) {
539		// Handle finished tasks and dependencies
540		let schedule = self.clone();
541		let stat = state.clone();
542		let rx_finish = self.rx_finish.clone();
543
544		tokio::spawn(async move {
545			while let Ok(id) = rx_finish.recv_async().await {
546				debug!("Completed task {} (notified)", id);
547
548				// Get task metadata WITHOUT removing - we only remove after successful transition
549				let task_meta_opt = {
550					let tasks_running = match schedule.tasks_running.lock() {
551						Ok(guard) => guard,
552						Err(poisoned) => {
553							error!("Mutex poisoned: tasks_running (recovering)");
554							poisoned.into_inner()
555						}
556					};
557					tasks_running.get(&id).cloned()
558				};
559
560				if let Some(task_meta) = task_meta_opt {
561					// Track if transition was successful
562					let mut transition_ok = false;
563
564					// Check if this is a recurring task with cron schedule
565					if let Some(ref cron) = task_meta.cron {
566						// Calculate next execution time
567						let next_at = match cron.next_execution(Timestamp::now()) {
568							Ok(ts) => ts,
569							Err(e) => {
570								error!(
571									"Failed to calculate next execution for recurring task {}: {} - task will not reschedule",
572									id, e
573								);
574								// Mark as finished since we can't reschedule
575								if let Err(e) = schedule.store.finished(id, "").await {
576									error!("Failed to mark task {} as finished: {}", id, e);
577								}
578								continue;
579							}
580						};
581						info!(
582							"Recurring task {} completed, scheduling next execution at {}",
583							id, next_at
584						);
585
586						// Update task with new next_at
587						let mut updated_meta = task_meta.clone();
588						updated_meta.next_at = Some(next_at);
589
590						// Update database with new next_at (keep status as Pending)
591						if let Err(e) = schedule.store.update_task(id, &updated_meta).await {
592							error!("Failed to update recurring task {} next_at: {}", id, e);
593						}
594
595						// Remove from running BEFORE add_queue (so add_queue doesn't see it as running)
596						match schedule.tasks_running.lock() {
597							Ok(mut tasks_running) => {
598								tasks_running.remove(&id);
599							}
600							Err(poisoned) => {
601								error!("Mutex poisoned: tasks_running (recovering)");
602								poisoned.into_inner().remove(&id);
603							}
604						}
605
606						// Re-add to scheduler with new execution time
607						match schedule.add_queue(id, updated_meta).await {
608							Ok(_) => transition_ok = true,
609							Err(e) => {
610								error!(
611									"Failed to reschedule recurring task {}: {} - task lost!",
612									id, e
613								);
614							}
615						}
616					} else {
617						// One-time task - mark as finished
618						match schedule.store.finished(id, "").await {
619							Ok(()) => transition_ok = true,
620							Err(e) => {
621								error!(
622									"Failed to mark task {} as finished: {} - task remains in running queue",
623									id, e
624								);
625							}
626						}
627					}
628
629					// Only remove from running queue after successful transition
630					if transition_ok {
631						match schedule.tasks_running.lock() {
632							Ok(mut tasks_running) => {
633								tasks_running.remove(&id);
634							}
635							Err(poisoned) => {
636								error!("Mutex poisoned: tasks_running (recovering)");
637								poisoned.into_inner().remove(&id);
638							}
639						}
640					}
641
642					// Handle dependencies of finished task using atomic release method
643					match schedule.release_dependents(id) {
644						Ok(ready_to_spawn) => {
645							for (dep_id, dep_task_meta) in ready_to_spawn {
646								// Add to running queue before spawning
647								match schedule.tasks_running.lock() {
648									Ok(mut tasks_running) => {
649										tasks_running.insert(dep_id, dep_task_meta.clone());
650									}
651									Err(poisoned) => {
652										error!("Mutex poisoned: tasks_running (recovering)");
653										poisoned.into_inner().insert(dep_id, dep_task_meta.clone());
654									}
655								}
656								schedule.spawn_task(
657									stat.clone(),
658									dep_task_meta.task.clone(),
659									dep_id,
660									dep_task_meta,
661								);
662							}
663						}
664						Err(e) => {
665							error!("Failed to release dependents of task {}: {}", id, e);
666						}
667					}
668				} else {
669					warn!("Completed task {} not found in running queue", id);
670				}
671			}
672		});
673
674		// Handle scheduled tasks
675		let schedule = self.clone();
676		tokio::spawn(async move {
677			loop {
678				let is_empty = match schedule.tasks_scheduled.lock() {
679					Ok(guard) => guard.is_empty(),
680					Err(poisoned) => {
681						error!("Mutex poisoned: tasks_scheduled (recovering)");
682						poisoned.into_inner().is_empty()
683					}
684				};
685				if is_empty {
686					schedule.notify_schedule.notified().await;
687				}
688				let time = Timestamp::now();
689				if let Some((timestamp, _id)) = loop {
690					let mut tasks_scheduled = match schedule.tasks_scheduled.lock() {
691						Ok(guard) => guard,
692						Err(poisoned) => {
693							error!("Mutex poisoned: tasks_scheduled (recovering)");
694							poisoned.into_inner()
695						}
696					};
697					if let Some((&(timestamp, id), _)) = tasks_scheduled.first_key_value() {
698						let (timestamp, id) = (timestamp, id);
699						if timestamp <= Timestamp::now() {
700							debug!("Spawning task id {} (from schedule)", id);
701							if let Some(task) = tasks_scheduled.remove(&(timestamp, id)) {
702								let mut tasks_running = match schedule.tasks_running.lock() {
703									Ok(guard) => guard,
704									Err(poisoned) => {
705										error!("Mutex poisoned: tasks_running (recovering)");
706										poisoned.into_inner()
707									}
708								};
709								tasks_running.insert(id, task.clone());
710								schedule.spawn_task(state.clone(), task.task.clone(), id, task);
711							} else {
712								error!("Task disappeared while being removed from schedule");
713								break None;
714							}
715						} else {
716							break Some((timestamp, id));
717						}
718					} else {
719						break None;
720					}
721				} {
722					let diff = timestamp.0 - time.0;
723					let wait =
724						tokio::time::Duration::from_secs(u64::try_from(diff).unwrap_or_default());
725					tokio::select! {
726						() = tokio::time::sleep(wait) => (), () = schedule.notify_schedule.notified() => ()
727					};
728				}
729			}
730		});
731
732		let schedule = self.clone();
733		tokio::spawn(async move {
734			let _ignore_err = schedule.load().await;
735		});
736	}
737
738	fn register_builder(
739		&self,
740		name: &'static str,
741		builder: &'static TaskBuilder<S>,
742	) -> ClResult<&Self> {
743		let mut task_builders = self
744			.task_builders
745			.write()
746			.map_err(|_| Error::Internal("task_builders RwLock poisoned".into()))?;
747		task_builders.insert(name, Box::new(builder));
748		Ok(self)
749	}
750
751	pub fn register<T: Task<S>>(&self) -> ClResult<&Self> {
752		info!("Registering task type {}", T::kind());
753		self.register_builder(T::kind(), &|id: TaskId, params: &str| T::build(id, params))?;
754		Ok(self)
755	}
756
757	/// Create a builder for scheduling a task using the fluent API
758	pub fn task(&self, task: Arc<dyn Task<S>>) -> TaskSchedulerBuilder<'_, S> {
759		TaskSchedulerBuilder::new(self, task)
760	}
761
762	/// Internal method to schedule a task with all options
763	/// This is the core implementation used by the builder pattern
764	async fn schedule_task_impl(
765		&self,
766		task: Arc<dyn Task<S>>,
767		key: Option<&str>,
768		next_at: Option<Timestamp>,
769		deps: Option<Vec<TaskId>>,
770		retry: Option<RetryPolicy>,
771		cron: Option<CronSchedule>,
772	) -> ClResult<TaskId> {
773		let task_meta = TaskMeta {
774			task: task.clone(),
775			next_at,
776			deps: deps.clone().unwrap_or_default(),
777			retry_count: 0,
778			retry,
779			cron,
780		};
781
782		// Check if a task with this key already exists (key-based deduplication)
783		if let Some(key) = key {
784			if let Some((existing_id, existing_data)) = self.store.find_by_key(key).await? {
785				let new_serialized = task.serialize();
786				let existing_serialized = existing_data.input.as_ref();
787
788				// Compare serialized parameters
789				if new_serialized == existing_serialized {
790					info!(
791						"Recurring task '{}' already exists with identical parameters (id={})",
792						key, existing_id
793					);
794					// Update DB with current cron/next_at (may differ from what's stored)
795					self.store.update_task(existing_id, &task_meta).await?;
796					// Ensure the existing task is queued (may be loaded from DB but not yet in queue)
797					self.add_queue(existing_id, task_meta).await?;
798					return Ok(existing_id);
799				}
800				info!(
801					"Updating recurring task '{}' (id={}) - parameters changed",
802					key, existing_id
803				);
804				info!("  Old params: {}", existing_serialized);
805				info!("  New params: {}", new_serialized);
806
807				// Remove from all queues (if present)
808				self.remove_from_queues(existing_id)?;
809
810				// Update the task in database with new parameters
811				self.store.update_task(existing_id, &task_meta).await?;
812
813				// Re-add to appropriate queue with updated parameters
814				self.add_queue(existing_id, task_meta).await?;
815
816				return Ok(existing_id);
817			}
818		}
819
820		// No existing task - create new one
821		let id = self.store.add(&task_meta, key).await?;
822		self.add_queue(id, task_meta).await
823	}
824
825	pub async fn add(&self, task: Arc<dyn Task<S>>) -> ClResult<TaskId> {
826		self.task(task).now().await
827	}
828
829	pub async fn add_queue(&self, id: TaskId, task_meta: TaskMeta<S>) -> ClResult<TaskId> {
830		// If task is already running, update its metadata (especially for cron updates)
831		// but don't add to scheduled queue (it will reschedule on completion)
832		{
833			let mut running = lock!(self.tasks_running, "tasks_running")?;
834			if let Some(existing_meta) = running.get_mut(&id) {
835				debug!(
836					"Task {} is already running, updating metadata (will reschedule on completion)",
837					id
838				);
839				// Update the running task's metadata so it has the latest cron schedule
840				*existing_meta = task_meta;
841				return Ok(id);
842			}
843		}
844
845		// Remove from other queues if present (prevents duplicate entries with different timestamps)
846		{
847			let mut scheduled = lock!(self.tasks_scheduled, "tasks_scheduled")?;
848			if let Some(key) = scheduled
849				.iter()
850				.find(|((_, tid), _)| *tid == id)
851				.map(|((ts, tid), _)| (*ts, *tid))
852			{
853				scheduled.remove(&key);
854				debug!("Removed existing scheduled entry for task {} before re-queueing", id);
855			}
856		}
857		{
858			let mut waiting = lock!(self.tasks_waiting, "tasks_waiting")?;
859			if waiting.remove(&id).is_some() {
860				debug!("Removed existing waiting entry for task {} before re-queueing", id);
861			}
862		}
863
864		let deps = task_meta.deps.clone();
865
866		// VALIDATION: Tasks with dependencies should NEVER be in tasks_scheduled
867		if !deps.is_empty() && task_meta.next_at.is_some() {
868			warn!("Task {} has both dependencies and scheduled time - ignoring next_at, placing in waiting queue", id);
869			// Force to tasks_waiting instead
870			lock!(self.tasks_waiting, "tasks_waiting")?.insert(id, task_meta);
871			debug!("Task {} is waiting for {:?}", id, &deps);
872			for dep in deps {
873				lock!(self.task_dependents, "task_dependents")?.entry(dep).or_default().push(id);
874			}
875			return Ok(id);
876		}
877
878		if deps.is_empty() && task_meta.next_at.unwrap_or(Timestamp(0)) < Timestamp::now() {
879			debug!("Spawning task {}", id);
880			lock!(self.tasks_scheduled, "tasks_scheduled")?.insert((Timestamp(0), id), task_meta);
881			self.notify_schedule.notify_one();
882		} else if let Some(next_at) = task_meta.next_at {
883			debug!("Scheduling task {} for {}", id, next_at);
884			lock!(self.tasks_scheduled, "tasks_scheduled")?.insert((next_at, id), task_meta);
885			self.notify_schedule.notify_one();
886		} else {
887			lock!(self.tasks_waiting, "tasks_waiting")?.insert(id, task_meta);
888			debug!("Task {} is waiting for {:?}", id, &deps);
889			for dep in deps {
890				lock!(self.task_dependents, "task_dependents")?.entry(dep).or_default().push(id);
891			}
892		}
893		Ok(id)
894	}
895
896	/// Remove a task from all internal queues (waiting, scheduled, running)
897	/// Returns the removed TaskMeta if found
898	fn remove_from_queues(&self, task_id: TaskId) -> ClResult<Option<TaskMeta<S>>> {
899		// Try tasks_waiting
900		if let Some(task_meta) = lock!(self.tasks_waiting, "tasks_waiting")?.remove(&task_id) {
901			debug!("Removed task {} from waiting queue for update", task_id);
902			return Ok(Some(task_meta));
903		}
904
905		// Try tasks_scheduled (need to find by task_id in BTreeMap)
906		{
907			let mut scheduled = lock!(self.tasks_scheduled, "tasks_scheduled")?;
908			if let Some(key) = scheduled
909				.iter()
910				.find(|((_, id), _)| *id == task_id)
911				.map(|((ts, id), _)| (*ts, *id))
912			{
913				if let Some(task_meta) = scheduled.remove(&key) {
914					debug!("Removed task {} from scheduled queue for update", task_id);
915					return Ok(Some(task_meta));
916				}
917			}
918		}
919
920		// Try tasks_running (should rarely happen, but handle it)
921		if let Some(task_meta) = lock!(self.tasks_running, "tasks_running")?.remove(&task_id) {
922			warn!("Removed task {} from running queue during update", task_id);
923			return Ok(Some(task_meta));
924		}
925
926		Ok(None)
927	}
928
929	/// Release all dependent tasks of a completed task
930	/// This method safely handles dependency cleanup and spawning
931	fn release_dependents(
932		&self,
933		completed_task_id: TaskId,
934	) -> ClResult<Vec<(TaskId, TaskMeta<S>)>> {
935		// Get list of dependents (atomic removal to prevent re-processing)
936		let dependents = {
937			let mut deps_map = lock!(self.task_dependents, "task_dependents")?;
938			deps_map.remove(&completed_task_id).unwrap_or_default()
939		};
940
941		if dependents.is_empty() {
942			return Ok(Vec::new()); // No dependents to release
943		}
944
945		debug!("Releasing {} dependents of completed task {}", dependents.len(), completed_task_id);
946
947		let mut ready_to_spawn = Vec::new();
948
949		// For each dependent, check and remove dependency
950		for dependent_id in dependents {
951			// Try tasks_waiting first (most common case for dependent tasks)
952			{
953				let mut waiting = lock!(self.tasks_waiting, "tasks_waiting")?;
954				if let Some(task_meta) = waiting.get_mut(&dependent_id) {
955					// Remove the completed task from dependencies
956					task_meta.deps.retain(|x| *x != completed_task_id);
957
958					// If all dependencies are cleared, remove and queue for spawning
959					if task_meta.deps.is_empty() {
960						if let Some(task_to_spawn) = waiting.remove(&dependent_id) {
961							debug!(
962								"Dependent task {} ready to spawn (all dependencies cleared)",
963								dependent_id
964							);
965							ready_to_spawn.push((dependent_id, task_to_spawn));
966						}
967					} else {
968						debug!(
969							"Task {} still has {} remaining dependencies",
970							dependent_id,
971							task_meta.deps.len()
972						);
973					}
974					continue;
975				}
976			}
977
978			// Try tasks_scheduled if not in waiting (shouldn't happen with validation, but be defensive)
979			{
980				let mut scheduled = lock!(self.tasks_scheduled, "tasks_scheduled")?;
981				if let Some(scheduled_key) = scheduled
982					.iter()
983					.find(|((_, id), _)| *id == dependent_id)
984					.map(|((ts, id), _)| (*ts, *id))
985				{
986					if let Some(task_meta) = scheduled.get_mut(&scheduled_key) {
987						task_meta.deps.retain(|x| *x != completed_task_id);
988						let remaining = task_meta.deps.len();
989						if remaining == 0 {
990							debug!(
991								"Task {} in scheduled queue has no remaining dependencies",
992								dependent_id
993							);
994						} else {
995							debug!(
996								"Task {} in scheduled queue has {} remaining dependencies",
997								dependent_id, remaining
998							);
999						}
1000					}
1001					continue;
1002				}
1003			}
1004
1005			// Task not found in any queue
1006			warn!(
1007				"Dependent task {} of completed task {} not found in any queue",
1008				dependent_id, completed_task_id
1009			);
1010		}
1011
1012		Ok(ready_to_spawn)
1013	}
1014
1015	async fn load(&self) -> ClResult<()> {
1016		let tasks = self.store.load().await?;
1017		debug!("Loaded {} tasks from store", tasks.len());
1018		for t in tasks {
1019			if let TaskStatus::Pending = t.status {
1020				debug!("Loading task {} {}", t.id, t.kind);
1021				let task = {
1022					let builder_map = self
1023						.task_builders
1024						.read()
1025						.map_err(|_| Error::Internal("task_builders RwLock poisoned".into()))?;
1026					let builder = builder_map.get(t.kind.as_ref()).ok_or(Error::Internal(
1027						format!("task builder not registered: {}", t.kind),
1028					))?;
1029					builder(t.id, &t.input)?
1030				};
1031				let (retry_count, retry) = match t.retry_data {
1032					Some(retry_str) => {
1033						let (retry_count, retry_min, retry_max, retry_times) = retry_str
1034							.split(',')
1035							.collect_tuple()
1036							.ok_or(Error::Internal("invalid retry policy format".into()))?;
1037						let retry_count: u16 = retry_count
1038							.parse()
1039							.map_err(|_| Error::Internal("retry count must be u16".into()))?;
1040						let retry = RetryPolicy {
1041							wait_min_max: (
1042								retry_min
1043									.parse()
1044									.map_err(|_| Error::Internal("retry_min must be u64".into()))?,
1045								retry_max
1046									.parse()
1047									.map_err(|_| Error::Internal("retry_max must be u64".into()))?,
1048							),
1049							times: retry_times
1050								.parse()
1051								.map_err(|_| Error::Internal("retry times must be u64".into()))?,
1052						};
1053						debug!("Loaded retry policy: {:?}", retry);
1054						(retry_count, Some(retry))
1055					}
1056					_ => (0, None),
1057				};
1058				// Parse cron data if present
1059				let cron =
1060					t.cron_data.as_ref().and_then(|cron_str| CronSchedule::parse(cron_str).ok());
1061
1062				let task_meta = TaskMeta {
1063					task,
1064					next_at: t.next_at,
1065					deps: t.deps.into(),
1066					retry_count,
1067					retry,
1068					cron,
1069				};
1070				self.add_queue(t.id, task_meta).await?;
1071			}
1072		}
1073		Ok(())
1074	}
1075
1076	fn spawn_task(&self, state: S, task: Arc<dyn Task<S>>, id: TaskId, task_meta: TaskMeta<S>) {
1077		let tx_finish = self.tx_finish.clone();
1078		let store = self.store.clone();
1079		let scheduler = self.clone();
1080		//let state = self.state.clone();
1081		tokio::spawn(async move {
1082			match task.run(&state).await {
1083				Ok(()) => {
1084					debug!("Task {} completed successfully", id);
1085					tx_finish.send(id).unwrap_or(());
1086				}
1087				Err(e) => {
1088					if let Some(retry_policy) = &task_meta.retry {
1089						if retry_policy.should_retry(task_meta.retry_count) {
1090							let backoff = retry_policy.calculate_backoff(task_meta.retry_count);
1091							let next_at = Timestamp::from_now(backoff.cast_signed());
1092
1093							info!(
1094								"Task {} failed (attempt {}/{}). Scheduling retry in {} seconds: {}",
1095								id, task_meta.retry_count + 1, retry_policy.times, backoff, e
1096							);
1097
1098							// Update database with error and reschedule
1099							store
1100								.update_task_error(id, &e.to_string(), Some(next_at))
1101								.await
1102								.unwrap_or(());
1103
1104							// Remove from running tasks (we're not sending finish event)
1105							match scheduler.tasks_running.lock() {
1106								Ok(mut tasks_running) => {
1107									tasks_running.remove(&id);
1108								}
1109								Err(poisoned) => {
1110									error!("Mutex poisoned: tasks_running (recovering)");
1111									poisoned.into_inner().remove(&id);
1112								}
1113							}
1114
1115							// Re-queue task with incremented retry count
1116							let mut retry_meta = task_meta.clone();
1117							retry_meta.retry_count += 1;
1118							retry_meta.next_at = Some(next_at);
1119							scheduler.add_queue(id, retry_meta).await.unwrap_or(id);
1120						} else {
1121							// Max retries exhausted
1122							error!(
1123								"Task {} failed after {} retries: {}",
1124								id, task_meta.retry_count, e
1125							);
1126							store.update_task_error(id, &e.to_string(), None).await.unwrap_or(());
1127							tx_finish.send(id).unwrap_or(());
1128						}
1129					} else {
1130						// No retry policy - fail immediately
1131						error!("Task {} failed: {}", id, e);
1132						store.update_task_error(id, &e.to_string(), None).await.unwrap_or(());
1133						tx_finish.send(id).unwrap_or(());
1134					}
1135				}
1136			}
1137		});
1138	}
1139
1140	/// Get health status of the scheduler
1141	/// Returns information about tasks in each queue and detects anomalies
1142	pub async fn health_check(&self) -> ClResult<SchedulerHealth> {
1143		let waiting_count = lock!(self.tasks_waiting, "tasks_waiting")?.len();
1144		let scheduled_count = lock!(self.tasks_scheduled, "tasks_scheduled")?.len();
1145		let running_count = lock!(self.tasks_running, "tasks_running")?.len();
1146		let dependents_count = lock!(self.task_dependents, "task_dependents")?.len();
1147
1148		// Check for anomalies
1149		let mut stuck_tasks = Vec::new();
1150		let mut tasks_with_missing_deps = Vec::new();
1151
1152		// Check tasks_waiting for tasks with no dependencies (stuck)
1153		{
1154			let waiting = lock!(self.tasks_waiting, "tasks_waiting")?;
1155			let _deps_map = lock!(self.task_dependents, "task_dependents")?;
1156
1157			for (id, task_meta) in waiting.iter() {
1158				if task_meta.deps.is_empty() {
1159					stuck_tasks.push(*id);
1160					warn!("SCHEDULER HEALTH: Task {} in waiting with no dependencies", id);
1161				} else {
1162					// Check if all dependencies still exist
1163					for dep in &task_meta.deps {
1164						// Check if dependency is in any queue or dependents map
1165						let dep_exists =
1166							self.tasks_running.lock().ok().is_some_and(|r| r.contains_key(dep))
1167								|| self
1168									.tasks_waiting
1169									.lock()
1170									.ok()
1171									.is_some_and(|w| w.contains_key(dep))
1172								|| self.tasks_scheduled.lock().ok().is_some_and(|s| {
1173									s.iter().any(|((_, task_id), _)| task_id == dep)
1174								});
1175
1176						if !dep_exists {
1177							tasks_with_missing_deps.push((*id, *dep));
1178							warn!(
1179								"SCHEDULER HEALTH: Task {} depends on non-existent task {}",
1180								id, dep
1181							);
1182						}
1183					}
1184				}
1185			}
1186		}
1187
1188		Ok(SchedulerHealth {
1189			waiting: waiting_count,
1190			scheduled: scheduled_count,
1191			running: running_count,
1192			dependents: dependents_count,
1193			stuck_tasks,
1194			tasks_with_missing_deps,
1195		})
1196	}
1197}
1198
1199/// Health status of the scheduler
1200#[derive(Debug, Clone)]
1201pub struct SchedulerHealth {
1202	/// Number of tasks waiting for dependencies
1203	pub waiting: usize,
1204	/// Number of tasks scheduled for future execution
1205	pub scheduled: usize,
1206	/// Number of tasks currently running
1207	pub running: usize,
1208	/// Number of task entries in dependents map
1209	pub dependents: usize,
1210	/// IDs of tasks with no dependencies but still in waiting queue
1211	pub stuck_tasks: Vec<TaskId>,
1212	/// Pairs of (task_id, missing_dependency_id) where dependency doesn't exist
1213	pub tasks_with_missing_deps: Vec<(TaskId, TaskId)>,
1214}
1215
1216#[cfg(test)]
1217mod tests {
1218	use super::*;
1219	use serde::{Deserialize, Serialize};
1220
1221	type State = Arc<Mutex<Vec<u8>>>;
1222
1223	#[derive(Debug, Serialize, Deserialize)]
1224	struct TestTask {
1225		num: u8,
1226	}
1227
1228	impl TestTask {
1229		pub fn new(num: u8) -> Arc<Self> {
1230			Arc::new(Self { num })
1231		}
1232	}
1233
1234	#[async_trait]
1235	impl Task<State> for TestTask {
1236		fn kind() -> &'static str {
1237			"test"
1238		}
1239
1240		fn build(_id: TaskId, ctx: &str) -> ClResult<Arc<dyn Task<State>>> {
1241			let num: u8 = ctx
1242				.parse()
1243				.map_err(|_| Error::Internal("test task context must be u8".into()))?;
1244			let task = TestTask::new(num);
1245			Ok(task)
1246		}
1247
1248		fn serialize(&self) -> String {
1249			self.num.to_string()
1250		}
1251
1252		fn kind_of(&self) -> &'static str {
1253			"test"
1254		}
1255
1256		async fn run(&self, state: &State) -> ClResult<()> {
1257			info!("Running task {}", self.num);
1258			tokio::time::sleep(std::time::Duration::from_millis(200 * u64::from(self.num))).await;
1259			info!("Completed task {}", self.num);
1260			state.lock().unwrap().push(self.num);
1261			Ok(())
1262		}
1263	}
1264
1265	#[derive(Debug, Clone)]
1266	struct FailingTask {
1267		id: u8,
1268		fail_count: u8,
1269		attempt: Arc<Mutex<u8>>,
1270	}
1271
1272	impl FailingTask {
1273		pub fn new(id: u8, fail_count: u8) -> Arc<Self> {
1274			Arc::new(Self { id, fail_count, attempt: Arc::new(Mutex::new(0)) })
1275		}
1276	}
1277
1278	#[async_trait]
1279	impl Task<State> for FailingTask {
1280		fn kind() -> &'static str {
1281			"failing"
1282		}
1283
1284		fn build(_id: TaskId, ctx: &str) -> ClResult<Arc<dyn Task<State>>> {
1285			let parts: Vec<&str> = ctx.split(',').collect();
1286			if parts.len() != 2 {
1287				return Err(Error::Internal("failing task context must have 2 parts".into()));
1288			}
1289			let id: u8 = parts[0]
1290				.parse()
1291				.map_err(|_| Error::Internal("failing task id must be u8".into()))?;
1292			let fail_count: u8 = parts[1]
1293				.parse()
1294				.map_err(|_| Error::Internal("failing task fail_count must be u8".into()))?;
1295			Ok(FailingTask::new(id, fail_count))
1296		}
1297
1298		fn serialize(&self) -> String {
1299			format!("{},{}", self.id, self.fail_count)
1300		}
1301
1302		fn kind_of(&self) -> &'static str {
1303			"failing"
1304		}
1305
1306		async fn run(&self, state: &State) -> ClResult<()> {
1307			let mut attempt = self.attempt.lock().unwrap();
1308			*attempt += 1;
1309			let current_attempt = *attempt;
1310
1311			info!("FailingTask {} - attempt {}/{}", self.id, current_attempt, self.fail_count + 1);
1312
1313			if current_attempt <= self.fail_count {
1314				error!("FailingTask {} failed on attempt {}", self.id, current_attempt);
1315				return Err(Error::ServiceUnavailable(format!("Task {} failed", self.id)));
1316			}
1317
1318			info!("FailingTask {} succeeded on attempt {}", self.id, current_attempt);
1319			state.lock().unwrap().push(self.id);
1320			Ok(())
1321		}
1322	}
1323
1324	#[tokio::test]
1325	pub async fn test_scheduler() {
1326		let _ = tracing_subscriber::fmt().try_init();
1327
1328		let task_store: Arc<dyn TaskStore<State>> = InMemoryTaskStore::new();
1329		let state: State = Arc::new(Mutex::new(Vec::new()));
1330		let scheduler = Scheduler::new(task_store);
1331		scheduler.start(state.clone());
1332		scheduler.register::<TestTask>().unwrap();
1333
1334		let _task1 = TestTask::new(1);
1335		let task2 = TestTask::new(1);
1336		let task3 = TestTask::new(1);
1337
1338		let task2_id = scheduler.task(task2).schedule_after(2).schedule().await.unwrap();
1339		let task3_id = scheduler.add(task3).await.unwrap();
1340		scheduler
1341			.task(TestTask::new(1))
1342			.depend_on(vec![task2_id, task3_id])
1343			.schedule()
1344			.await
1345			.unwrap();
1346
1347		tokio::time::sleep(std::time::Duration::from_secs(4)).await;
1348		let task4 = TestTask::new(1);
1349		let task5 = TestTask::new(1);
1350		scheduler.task(task4).schedule_after(2).schedule().await.unwrap();
1351		scheduler.task(task5).schedule_after(1).schedule().await.unwrap();
1352
1353		tokio::time::sleep(std::time::Duration::from_secs(3)).await;
1354
1355		let st = state.lock().unwrap();
1356		info!("res: {}", st.len());
1357		let str_vec = st.iter().map(std::string::ToString::to_string).collect::<Vec<String>>();
1358		assert_eq!(str_vec.join(":"), "1:1:1:1:1");
1359	}
1360
1361	#[tokio::test]
1362	pub async fn test_retry_with_backoff() {
1363		let _ = tracing_subscriber::fmt().try_init();
1364
1365		let task_store: Arc<dyn TaskStore<State>> = InMemoryTaskStore::new();
1366		let state: State = Arc::new(Mutex::new(Vec::new()));
1367		let scheduler = Scheduler::new(task_store);
1368		scheduler.start(state.clone());
1369		scheduler.register::<FailingTask>().unwrap();
1370
1371		// Create a task that fails twice, then succeeds
1372		// With retry policy: min=1s, max=3600s, max_attempts=3
1373		let failing_task = FailingTask::new(42, 2);
1374		let retry_policy = RetryPolicy { wait_min_max: (1, 3600), times: 3 };
1375
1376		scheduler.task(failing_task).with_retry(retry_policy).schedule().await.unwrap();
1377
1378		// Wait for retries: 1s (1st fail) + 1s (2nd fail) + time for success
1379		// First attempt: immediate fail
1380		// Wait 1s (min backoff)
1381		// Second attempt: fail
1382		// Wait 2s (min * 2)
1383		// Third attempt: success
1384		tokio::time::sleep(std::time::Duration::from_secs(6)).await;
1385
1386		let st = state.lock().unwrap();
1387		assert_eq!(st.len(), 1, "Task should have succeeded after retries");
1388		assert_eq!(st[0], 42);
1389	}
1390
1391	// ===== Builder Pattern Tests =====
1392
1393	#[tokio::test]
1394	pub async fn test_builder_simple_schedule() {
1395		let task_store: Arc<dyn TaskStore<State>> = InMemoryTaskStore::new();
1396		let state: State = Arc::new(Mutex::new(Vec::new()));
1397		let scheduler = Scheduler::new(task_store);
1398		scheduler.start(state.clone());
1399		scheduler.register::<TestTask>().unwrap();
1400
1401		// Test basic builder usage: .now()
1402		let task = TestTask::new(1);
1403		let id = scheduler.task(task).now().await.unwrap();
1404
1405		assert!(id > 0, "Task ID should be positive");
1406
1407		tokio::time::sleep(std::time::Duration::from_millis(500)).await;
1408
1409		let st = state.lock().unwrap();
1410		assert_eq!(st.len(), 1, "Task should have executed");
1411		assert_eq!(st[0], 1);
1412	}
1413
1414	#[tokio::test]
1415	pub async fn test_builder_with_key() {
1416		let task_store: Arc<dyn TaskStore<State>> = InMemoryTaskStore::new();
1417		let state: State = Arc::new(Mutex::new(Vec::new()));
1418		let scheduler = Scheduler::new(task_store);
1419		scheduler.start(state.clone());
1420		scheduler.register::<TestTask>().unwrap();
1421
1422		// Test builder with key
1423		let task = TestTask::new(1);
1424		let _id = scheduler.task(task).key("my-task-key").now().await.unwrap();
1425
1426		tokio::time::sleep(std::time::Duration::from_millis(500)).await;
1427
1428		let st = state.lock().unwrap();
1429		assert_eq!(st.len(), 1);
1430		assert_eq!(st[0], 1);
1431	}
1432
1433	#[tokio::test]
1434	pub async fn test_builder_with_delay() {
1435		let task_store: Arc<dyn TaskStore<State>> = InMemoryTaskStore::new();
1436		let state: State = Arc::new(Mutex::new(Vec::new()));
1437		let scheduler = Scheduler::new(task_store);
1438		scheduler.start(state.clone());
1439		scheduler.register::<TestTask>().unwrap();
1440
1441		// Test builder with .after() convenience method
1442		let task = TestTask::new(1);
1443		let _id = scheduler
1444			.task(task)
1445			.after(1)  // 1 second delay
1446			.await
1447			.unwrap();
1448
1449		// Should not have executed yet
1450		tokio::time::sleep(std::time::Duration::from_millis(500)).await;
1451		{
1452			let st = state.lock().unwrap();
1453			assert_eq!(st.len(), 0, "Task should not execute yet");
1454		}
1455
1456		// Wait for execution (1 sec delay + 200ms task sleep + buffer)
1457		tokio::time::sleep(std::time::Duration::from_millis(800)).await;
1458
1459		{
1460			let st = state.lock().unwrap();
1461			assert_eq!(st.len(), 1, "Task should have executed");
1462			assert_eq!(st[0], 1);
1463		}
1464	}
1465
1466	#[tokio::test]
1467	pub async fn test_builder_with_dependencies() {
1468		let task_store: Arc<dyn TaskStore<State>> = InMemoryTaskStore::new();
1469		let state: State = Arc::new(Mutex::new(Vec::new()));
1470		let scheduler = Scheduler::new(task_store);
1471		scheduler.start(state.clone());
1472		scheduler.register::<TestTask>().unwrap();
1473
1474		// Create first task (sleeps 200ms)
1475		let task1 = TestTask::new(1);
1476		let id1 = scheduler.task(task1).now().await.unwrap();
1477
1478		// Create second task (sleeps 400ms)
1479		let task2 = TestTask::new(1);
1480		let id2 = scheduler.task(task2).now().await.unwrap();
1481
1482		// Create third task that depends on first two (sleeps 600ms)
1483		let task3 = TestTask::new(1);
1484		let _id3 = scheduler.task(task3).depend_on(vec![id1, id2]).schedule().await.unwrap();
1485
1486		// Wait for all tasks: task1 200ms, task2 400ms, task3 600ms = ~1200ms
1487		tokio::time::sleep(std::time::Duration::from_millis(1500)).await;
1488
1489		let st = state.lock().unwrap();
1490		// Should have all three tasks in execution order: 1 finishes first (200ms), then 2 (200ms), then 3 (200ms after both)
1491		let str_vec = st.iter().map(std::string::ToString::to_string).collect::<Vec<String>>();
1492		assert_eq!(str_vec.join(":"), "1:1:1");
1493	}
1494
1495	#[tokio::test]
1496	pub async fn test_builder_with_retry() {
1497		let task_store: Arc<dyn TaskStore<State>> = InMemoryTaskStore::new();
1498		let state: State = Arc::new(Mutex::new(Vec::new()));
1499		let scheduler = Scheduler::new(task_store);
1500		scheduler.start(state.clone());
1501		scheduler.register::<FailingTask>().unwrap();
1502
1503		// Create task using builder with retry policy
1504		let failing_task = FailingTask::new(55, 1); // Fails once, succeeds second time
1505		let retry_policy = RetryPolicy { wait_min_max: (1, 3600), times: 3 };
1506
1507		let _id = scheduler.task(failing_task).with_retry(retry_policy).schedule().await.unwrap();
1508
1509		// Wait for retry cycle: 1 fail + 1s wait + 1 success
1510		tokio::time::sleep(std::time::Duration::from_secs(3)).await;
1511
1512		let st = state.lock().unwrap();
1513		assert_eq!(st.len(), 1);
1514		assert_eq!(st[0], 55);
1515	}
1516
1517	#[tokio::test]
1518	pub async fn test_builder_with_automatic_retry() {
1519		let task_store: Arc<dyn TaskStore<State>> = InMemoryTaskStore::new();
1520		let state: State = Arc::new(Mutex::new(Vec::new()));
1521		let scheduler = Scheduler::new(task_store);
1522		scheduler.start(state.clone());
1523		scheduler.register::<FailingTask>().unwrap();
1524
1525		// Create task using builder with automatic retry (default policy)
1526		let failing_task = FailingTask::new(66, 1);
1527		let _id = scheduler.task(failing_task).with_automatic_retry().await.unwrap();
1528
1529		// Wait for retry cycle with default policy (min=60s would be too long for test)
1530		// but we already tested retry logic thoroughly, just verify builder integration
1531		tokio::time::sleep(std::time::Duration::from_millis(500)).await;
1532
1533		// The important part is that this compiles and integrates correctly
1534		let st = state.lock().unwrap();
1535		// With default policy (min=60s), task shouldn't succeed in test timeframe
1536		// Just verify builder chaining works
1537		let _ = st.len(); // Verify state is accessible, but don't assert on timeout-dependent result
1538	}
1539
1540	#[tokio::test]
1541	pub async fn test_builder_fluent_chaining() {
1542		let task_store: Arc<dyn TaskStore<State>> = InMemoryTaskStore::new();
1543		let state: State = Arc::new(Mutex::new(Vec::new()));
1544		let scheduler = Scheduler::new(task_store);
1545		scheduler.start(state.clone());
1546		scheduler.register::<TestTask>().unwrap();
1547
1548		// Create first dependencies
1549		let dep1 = scheduler.task(TestTask::new(1)).now().await.unwrap();
1550		let dep2 = scheduler.task(TestTask::new(1)).now().await.unwrap();
1551
1552		// Test fluent chaining with multiple methods
1553		let retry_policy = RetryPolicy { wait_min_max: (1, 3600), times: 3 };
1554
1555		let task = TestTask::new(1);
1556		let _id = scheduler
1557			.task(task)
1558			.key("complex-task")
1559			.schedule_after(0)  // Schedule immediately
1560			.depend_on(vec![dep1, dep2])
1561			.with_retry(retry_policy)
1562			.schedule()
1563			.await
1564			.unwrap();
1565
1566		tokio::time::sleep(std::time::Duration::from_millis(800)).await;
1567
1568		let st = state.lock().unwrap();
1569		// Should have all tasks: 20:10 (immediate deps) then 30 (after deps)
1570		let str_vec = st.iter().map(std::string::ToString::to_string).collect::<Vec<String>>();
1571		assert_eq!(str_vec.join(":"), "1:1:1");
1572	}
1573
1574	#[tokio::test]
1575	pub async fn test_builder_backward_compatibility() {
1576		let task_store: Arc<dyn TaskStore<State>> = InMemoryTaskStore::new();
1577		let state: State = Arc::new(Mutex::new(Vec::new()));
1578		let scheduler = Scheduler::new(task_store);
1579		scheduler.start(state.clone());
1580		scheduler.register::<TestTask>().unwrap();
1581
1582		// Test that old API still works
1583		let _id1 = scheduler.add(TestTask::new(1)).await.unwrap();
1584
1585		// Test that new builder API works
1586		let _id2 = scheduler.task(TestTask::new(1)).now().await.unwrap();
1587
1588		tokio::time::sleep(std::time::Duration::from_millis(800)).await;
1589
1590		let st = state.lock().unwrap();
1591		// Both old and new API should have executed
1592		assert_eq!(st.len(), 2);
1593		let str_vec = st.iter().map(std::string::ToString::to_string).collect::<Vec<String>>();
1594		assert_eq!(str_vec.join(":"), "1:1");
1595	}
1596
1597	// ===== Phase 2: Integration Tests - Real-world scenarios =====
1598
1599	#[tokio::test]
1600	pub async fn test_builder_pipeline_scenario() {
1601		// Simulates: Task 1 -> Task 2 (depends on 1) -> Task 3 (depends on 2)
1602		let task_store: Arc<dyn TaskStore<State>> = InMemoryTaskStore::new();
1603		let state: State = Arc::new(Mutex::new(Vec::new()));
1604		let scheduler = Scheduler::new(task_store);
1605		scheduler.start(state.clone());
1606		scheduler.register::<TestTask>().unwrap();
1607
1608		// Stage 1: Create initial task
1609		let id1 = scheduler.task(TestTask::new(1)).key("stage-1").now().await.unwrap();
1610
1611		// Stage 2: Create task that depends on stage 1
1612		let id2 = scheduler.task(TestTask::new(1)).key("stage-2").after_task(id1).await.unwrap();
1613
1614		// Stage 3: Create task that depends on stage 2
1615		let _id3 = scheduler.task(TestTask::new(1)).key("stage-3").after_task(id2).await.unwrap();
1616
1617		// Wait for pipeline: 1(200ms) + 2(200ms) + 3(200ms) = 600ms
1618		tokio::time::sleep(std::time::Duration::from_millis(1200)).await;
1619
1620		let st = state.lock().unwrap();
1621		// Should execute in order: 1, 2, 3
1622		let str_vec = st.iter().map(std::string::ToString::to_string).collect::<Vec<String>>();
1623		assert_eq!(str_vec.join(":"), "1:1:1");
1624	}
1625
1626	#[tokio::test]
1627	pub async fn test_builder_multi_dependency_join() {
1628		// Simulates: Task 1 parallel with Task 2, then Task 3 waits for both
1629		let task_store: Arc<dyn TaskStore<State>> = InMemoryTaskStore::new();
1630		let state: State = Arc::new(Mutex::new(Vec::new()));
1631		let scheduler = Scheduler::new(task_store);
1632		scheduler.start(state.clone());
1633		scheduler.register::<TestTask>().unwrap();
1634
1635		// Parallel tasks
1636		let id1 = scheduler.task(TestTask::new(1)).now().await.unwrap();
1637		let id2 = scheduler.task(TestTask::new(1)).now().await.unwrap();
1638
1639		// Join task - waits for both
1640		let _id3 = scheduler
1641			.task(TestTask::new(1))
1642			.depend_on(vec![id1, id2])
1643			.schedule()
1644			.await
1645			.unwrap();
1646
1647		tokio::time::sleep(std::time::Duration::from_secs(1)).await;
1648
1649		let st = state.lock().unwrap();
1650		// 1 and 2 execute in parallel, then 3 executes after both
1651		let str_vec = st.iter().map(std::string::ToString::to_string).collect::<Vec<String>>();
1652		assert_eq!(str_vec.join(":"), "1:1:1");
1653	}
1654
1655	#[tokio::test]
1656	pub async fn test_builder_scheduled_task_with_dependencies() {
1657		// Simulates: Task depends on earlier task AND is scheduled for future time
1658		let task_store: Arc<dyn TaskStore<State>> = InMemoryTaskStore::new();
1659		let state: State = Arc::new(Mutex::new(Vec::new()));
1660		let scheduler = Scheduler::new(task_store);
1661		scheduler.start(state.clone());
1662		scheduler.register::<TestTask>().unwrap();
1663
1664		// Immediate task
1665		let dep_id = scheduler.task(TestTask::new(1)).now().await.unwrap();
1666
1667		// Task that waits for dependency AND scheduled delay
1668		let ts = Timestamp::from_now(1);
1669		let _task_id = scheduler
1670			.task(TestTask::new(1))
1671			.schedule_at(ts)
1672			.depend_on(vec![dep_id])
1673			.schedule()
1674			.await
1675			.unwrap();
1676
1677		// Wait for dependency to complete but before scheduled time
1678		tokio::time::sleep(std::time::Duration::from_millis(300)).await;
1679		{
1680			let st = state.lock().unwrap();
1681			assert_eq!(st.len(), 1); // Only dependency executed
1682		}
1683
1684		// Wait for scheduled time (1s total from initial schedule)
1685		tokio::time::sleep(std::time::Duration::from_millis(800)).await;
1686
1687		{
1688			let st = state.lock().unwrap();
1689			let str_vec = st.iter().map(std::string::ToString::to_string).collect::<Vec<String>>();
1690			assert_eq!(str_vec.join(":"), "1:1");
1691		}
1692	}
1693
1694	#[tokio::test]
1695	pub async fn test_builder_mixed_features() {
1696		// Simulates: Complex real-world scenario with key, scheduling, deps, and retry
1697		let task_store: Arc<dyn TaskStore<State>> = InMemoryTaskStore::new();
1698		let state: State = Arc::new(Mutex::new(Vec::new()));
1699		let scheduler = Scheduler::new(task_store);
1700		scheduler.start(state.clone());
1701		scheduler.register::<TestTask>().unwrap();
1702		scheduler.register::<FailingTask>().unwrap();
1703
1704		// Create initial tasks
1705		let id1 = scheduler.task(TestTask::new(1)).now().await.unwrap();
1706
1707		// Create complex task: scheduled + depends on id1 + has key
1708		let _id2 = scheduler
1709			.task(TestTask::new(1))
1710			.key("critical-task")
1711			.schedule_after(0)
1712			.depend_on(vec![id1])
1713			.schedule()
1714			.await
1715			.unwrap();
1716
1717		// Create task with retry
1718		let _id3 = scheduler
1719			.task(FailingTask::new(1, 0))  // Fails 0 times, succeeds immediately
1720			.key("retryable-task")
1721			.with_retry(RetryPolicy {
1722				wait_min_max: (1, 3600),
1723				times: 3,
1724			})
1725			.schedule()
1726			.await
1727			.unwrap();
1728
1729		// Wait for tasks: id1 (200ms) + id2 (200ms after id1) + id3 (200ms) = ~600ms
1730		tokio::time::sleep(std::time::Duration::from_millis(1200)).await;
1731
1732		let st = state.lock().unwrap();
1733		// All three tasks should execute
1734		let str_vec = st.iter().map(std::string::ToString::to_string).collect::<Vec<String>>();
1735		assert_eq!(str_vec.join(":"), "1:1:1");
1736	}
1737
1738	#[tokio::test]
1739	pub async fn test_builder_builder_reuse_not_possible() {
1740		// Verify that builder is consumed (moved) and can't be reused
1741		let task_store: Arc<dyn TaskStore<State>> = InMemoryTaskStore::new();
1742		let _state: State = Arc::new(Mutex::new(Vec::new()));
1743		let scheduler = Scheduler::new(task_store);
1744
1745		let task = TestTask::new(1);
1746		let builder = scheduler.task(task);
1747
1748		// This would not compile if uncommented (builder is moved):
1749		// let _id1 = builder.now().await.unwrap();
1750		// let _id2 = builder.now().await.unwrap();  // Error: use of moved value
1751
1752		// Can only call terminal method once
1753		let _id = builder.now().await.unwrap();
1754		// builder is now consumed, can't use again
1755
1756		// Test passes if it compiles (verifying move semantics)
1757	}
1758
1759	#[tokio::test]
1760	pub async fn test_builder_different_task_types() {
1761		// Test builder works with different task implementations
1762		let task_store: Arc<dyn TaskStore<State>> = InMemoryTaskStore::new();
1763		let state: State = Arc::new(Mutex::new(Vec::new()));
1764		let scheduler = Scheduler::new(task_store);
1765		scheduler.start(state.clone());
1766		scheduler.register::<TestTask>().unwrap();
1767		scheduler.register::<FailingTask>().unwrap();
1768
1769		// Mix of different task types
1770		let _id1 = scheduler.task(TestTask::new(1)).key("test-task").now().await.unwrap();
1771
1772		let _id2 = scheduler
1773			.task(FailingTask::new(1, 0))  // Won't fail
1774			.key("failing-task")
1775			.now()
1776			.await
1777			.unwrap();
1778
1779		let _id3 = scheduler.task(TestTask::new(1)).now().await.unwrap();
1780
1781		tokio::time::sleep(std::time::Duration::from_secs(1)).await;
1782
1783		let st = state.lock().unwrap();
1784		assert_eq!(st.len(), 3);
1785		let str_vec = st.iter().map(std::string::ToString::to_string).collect::<Vec<String>>();
1786		// All three tasks should execute
1787		assert_eq!(str_vec.join(":"), "1:1:1");
1788	}
1789
1790	// ===== Phase 3: Cron Placeholder Tests =====
1791	// These tests verify that cron methods compile and integrate
1792	// Actual cron functionality will be implemented in Phase 3
1793
1794	#[tokio::test]
1795	pub async fn test_builder_cron_placeholder_syntax() {
1796		// Verify cron placeholder methods compile and chain properly
1797		let task_store: Arc<dyn TaskStore<State>> = InMemoryTaskStore::new();
1798		let state: State = Arc::new(Mutex::new(Vec::new()));
1799		let scheduler = Scheduler::new(task_store);
1800		scheduler.start(state.clone());
1801		scheduler.register::<TestTask>().unwrap();
1802
1803		// Test that cron methods compile (they're no-ops in Phase 2)
1804		let task = TestTask::new(1);
1805		let _id = scheduler
1806			.task(task)
1807			.key("cron-task")
1808			.cron("0 9 * * *")  // 9 AM daily
1809			.schedule()
1810			.await
1811			.unwrap();
1812
1813		// Cron scheduling - task will execute at the next scheduled time
1814		// For cron "0 9 * * *", that's tomorrow at 9 AM, so task won't execute in this test
1815		// This test just verifies the methods compile and chain properly
1816		tokio::time::sleep(std::time::Duration::from_millis(500)).await;
1817
1818		let st = state.lock().unwrap();
1819		// Task is scheduled for future (9 AM), so it won't have executed yet
1820		// The important thing is that the cron methods compile and integrate
1821		assert_eq!(st.len(), 0); // Not executed yet since scheduled for future
1822	}
1823
1824	#[tokio::test]
1825	pub async fn test_builder_daily_at_placeholder() {
1826		// Verify daily_at placeholder compiles and integrates
1827		let task_store: Arc<dyn TaskStore<State>> = InMemoryTaskStore::new();
1828		let state: State = Arc::new(Mutex::new(Vec::new()));
1829		let scheduler = Scheduler::new(task_store);
1830		scheduler.start(state.clone());
1831		scheduler.register::<TestTask>().unwrap();
1832
1833		// Test that daily_at placeholder compiles
1834		let task = TestTask::new(1);
1835		let _id = scheduler
1836			.task(task)
1837			.key("daily-task")
1838			.daily_at(14, 30)  // 2:30 PM daily
1839			.schedule()
1840			.await
1841			.unwrap();
1842
1843		// Daily_at scheduling - task will execute at the specified time (2:30 PM daily)
1844		// Task is scheduled for future, so it won't execute in this test
1845		tokio::time::sleep(std::time::Duration::from_millis(500)).await;
1846
1847		let st = state.lock().unwrap();
1848		// Task is scheduled for future (2:30 PM), not executed yet
1849		// The important thing is that daily_at compiles and integrates properly
1850		assert_eq!(st.len(), 0);
1851	}
1852
1853	#[tokio::test]
1854	pub async fn test_builder_weekly_at_placeholder() {
1855		// Verify weekly_at placeholder compiles and integrates
1856		let task_store: Arc<dyn TaskStore<State>> = InMemoryTaskStore::new();
1857		let state: State = Arc::new(Mutex::new(Vec::new()));
1858		let scheduler = Scheduler::new(task_store);
1859		scheduler.start(state.clone());
1860		scheduler.register::<TestTask>().unwrap();
1861
1862		// Test that weekly_at placeholder compiles
1863		let task = TestTask::new(1);
1864		let _id = scheduler
1865			.task(task)
1866			.key("weekly-task")
1867			.weekly_at(1, 9, 0)  // Monday at 9 AM
1868			.schedule()
1869			.await
1870			.unwrap();
1871
1872		// Weekly_at scheduling - task will execute on Monday at 9 AM
1873		// Task is scheduled for future, so it won't execute in this test
1874		tokio::time::sleep(std::time::Duration::from_millis(500)).await;
1875
1876		let st = state.lock().unwrap();
1877		// Task is scheduled for future (Monday 9 AM), not executed yet
1878		// The important thing is that weekly_at compiles and integrates properly
1879		assert_eq!(st.len(), 0);
1880	}
1881
1882	#[tokio::test]
1883	pub async fn test_builder_cron_with_retry() {
1884		// Verify cron methods chain with retry (future combined usage)
1885		let task_store: Arc<dyn TaskStore<State>> = InMemoryTaskStore::new();
1886		let state: State = Arc::new(Mutex::new(Vec::new()));
1887		let scheduler = Scheduler::new(task_store);
1888		scheduler.start(state.clone());
1889		scheduler.register::<TestTask>().unwrap();
1890
1891		// Test future usage pattern: cron + retry
1892		let task = TestTask::new(1);
1893		let _id = scheduler
1894			.task(task)
1895			.key("reliable-scheduled-task")
1896			.daily_at(2, 0)  // 2 AM daily
1897			.with_retry(RetryPolicy {
1898				wait_min_max: (60, 3600),
1899				times: 5,
1900			})
1901			.schedule()
1902			.await
1903			.unwrap();
1904
1905		// Verify cron+retry chain compiles properly
1906		// Task is scheduled for 2 AM, so won't execute in this test
1907		tokio::time::sleep(std::time::Duration::from_millis(500)).await;
1908
1909		let st = state.lock().unwrap();
1910		// Task scheduled for future (2 AM), not executed yet
1911		// The important thing is that chaining cron + retry works
1912		assert_eq!(st.len(), 0);
1913	}
1914
1915	// ===== Cron Schedule Tests =====
1916
1917	#[test]
1918	fn test_cron_to_string() {
1919		// Test that to_cron_string returns the original expression
1920		let cron = CronSchedule::parse("*/5 * * * *").unwrap();
1921		assert_eq!(cron.to_cron_string(), "*/5 * * * *");
1922	}
1923
1924	#[tokio::test]
1925	pub async fn test_running_task_not_double_scheduled() {
1926		let _ = tracing_subscriber::fmt().try_init();
1927
1928		let task_store: Arc<dyn TaskStore<State>> = InMemoryTaskStore::new();
1929		let state: State = Arc::new(Mutex::new(Vec::new()));
1930		let scheduler = Scheduler::new(task_store);
1931		scheduler.start(state.clone());
1932		scheduler.register::<TestTask>().unwrap();
1933
1934		// Create a task
1935		let task = TestTask::new(5); // Takes 1 second (5 * 200ms)
1936		let task_id = scheduler.add(task.clone()).await.unwrap();
1937
1938		// Wait a bit for task to start running
1939		tokio::time::sleep(std::time::Duration::from_millis(100)).await;
1940
1941		// Verify task is in tasks_running
1942		{
1943			let running = scheduler.tasks_running.lock().unwrap();
1944			assert!(running.contains_key(&task_id), "Task should be in running queue");
1945		}
1946
1947		// Try to add the same task again via add_queue
1948		let task_meta = TaskMeta {
1949			task: task.clone(),
1950			next_at: Some(Timestamp::now()),
1951			deps: vec![],
1952			retry_count: 0,
1953			retry: None,
1954			cron: None,
1955		};
1956		let result = scheduler.add_queue(task_id, task_meta).await;
1957
1958		// Should succeed but not actually add to scheduled queue
1959		assert!(result.is_ok(), "add_queue should succeed");
1960
1961		// Verify task is NOT in tasks_scheduled (only in running)
1962		{
1963			let sched_queue = scheduler.tasks_scheduled.lock().unwrap();
1964			let in_scheduled = sched_queue.iter().any(|((_, id), _)| *id == task_id);
1965			assert!(!in_scheduled, "Task should NOT be in scheduled queue while running");
1966		}
1967
1968		// Wait for original task to complete
1969		tokio::time::sleep(std::time::Duration::from_secs(2)).await;
1970
1971		// Verify task completed
1972		let st = state.lock().unwrap();
1973		assert_eq!(st.len(), 1, "Only one task execution should have occurred");
1974		assert_eq!(st[0], 5);
1975	}
1976
1977	#[tokio::test]
1978	pub async fn test_running_task_metadata_updated() {
1979		let _ = tracing_subscriber::fmt().try_init();
1980
1981		let task_store: Arc<dyn TaskStore<State>> = InMemoryTaskStore::new();
1982		let state: State = Arc::new(Mutex::new(Vec::new()));
1983		let scheduler = Scheduler::new(task_store);
1984		scheduler.start(state.clone());
1985		scheduler.register::<TestTask>().unwrap();
1986
1987		// Create a task without cron
1988		let task = TestTask::new(5); // Takes 1 second (5 * 200ms)
1989		let task_id = scheduler.add(task.clone()).await.unwrap();
1990
1991		// Wait a bit for task to start running
1992		tokio::time::sleep(std::time::Duration::from_millis(100)).await;
1993
1994		// Verify task is running and has no cron
1995		{
1996			let running = scheduler.tasks_running.lock().unwrap();
1997			let meta = running.get(&task_id).expect("Task should be running");
1998			assert!(meta.cron.is_none(), "Task should have no cron initially");
1999		}
2000
2001		// Try to update the running task with a cron schedule
2002		let cron = CronSchedule::parse("*/5 * * * *").unwrap();
2003		let task_meta_with_cron = TaskMeta {
2004			task: task.clone(),
2005			next_at: Some(Timestamp::now()),
2006			deps: vec![],
2007			retry_count: 0,
2008			retry: None,
2009			cron: Some(cron.clone()),
2010		};
2011		let result = scheduler.add_queue(task_id, task_meta_with_cron).await;
2012
2013		// Should succeed
2014		assert!(result.is_ok(), "add_queue should succeed");
2015
2016		// Verify the running task now has the cron schedule
2017		{
2018			let running = scheduler.tasks_running.lock().unwrap();
2019			let meta = running.get(&task_id).expect("Task should still be running");
2020			assert!(meta.cron.is_some(), "Task should now have cron after update");
2021		}
2022
2023		// Wait for task to complete
2024		tokio::time::sleep(std::time::Duration::from_secs(2)).await;
2025	}
2026}
2027
2028// vim: ts=4