Skip to main content

cloudillo_core/
scheduler.rs

1//! Scheduler subsystem. Handles async tasks, dependencies, fallbacks, repetitions, persistence..
2
3use async_trait::async_trait;
4use flume;
5use itertools::Itertools;
6use std::{
7	collections::{BTreeMap, HashMap},
8	fmt::Debug,
9	sync::{Arc, Mutex, RwLock},
10};
11
12use chrono::{DateTime, Utc};
13use croner::Cron;
14use std::str::FromStr;
15
16use crate::prelude::*;
17use cloudillo_types::{lock, meta_adapter};
18
19pub type TaskId = u64;
20
21pub enum TaskType {
22	Periodic,
23	Once,
24}
25
26/// Cron schedule wrapper using the croner crate
27/// Stores the expression string for serialization
28#[derive(Debug, Clone)]
29pub struct CronSchedule {
30	/// The original cron expression string
31	expr: Box<str>,
32	/// Parsed cron object
33	cron: Cron,
34}
35
36impl CronSchedule {
37	/// Parse a cron expression (5 fields: minute hour day month weekday)
38	pub fn parse(expr: &str) -> ClResult<Self> {
39		let cron = Cron::from_str(expr)
40			.map_err(|e| Error::ValidationError(format!("invalid cron expression: {}", e)))?;
41		Ok(Self { expr: expr.into(), cron })
42	}
43
44	/// Calculate the next execution time after the given timestamp
45	///
46	/// Returns an error if no next occurrence can be found (should be rare
47	/// for valid expressions within reasonable time bounds).
48	pub fn next_execution(&self, after: Timestamp) -> ClResult<Timestamp> {
49		let dt = DateTime::<Utc>::from_timestamp(after.0, 0).unwrap_or_else(Utc::now);
50
51		self.cron
52			.find_next_occurrence(&dt, false)
53			.map(|next| Timestamp(next.timestamp()))
54			.map_err(|e| {
55				tracing::error!("Failed to find next cron occurrence for '{}': {}", self.expr, e);
56				Error::ValidationError(format!("cron next_execution failed: {}", e))
57			})
58	}
59
60	/// Convert back to cron expression string
61	pub fn to_cron_string(&self) -> String {
62		self.expr.to_string()
63	}
64}
65
66impl PartialEq for CronSchedule {
67	fn eq(&self, other: &Self) -> bool {
68		self.expr == other.expr
69	}
70}
71
72impl Eq for CronSchedule {}
73
74#[async_trait]
75pub trait Task<S: Clone>: Send + Sync + Debug {
76	fn kind() -> &'static str
77	where
78		Self: Sized;
79	fn build(id: TaskId, context: &str) -> ClResult<Arc<dyn Task<S>>>
80	where
81		Self: Sized;
82	fn serialize(&self) -> String;
83	async fn run(&self, state: &S) -> ClResult<()>;
84
85	fn kind_of(&self) -> &'static str;
86}
87
88#[derive(Debug)]
89pub enum TaskStatus {
90	Pending,
91	Completed,
92	Failed,
93}
94
95pub struct TaskData {
96	id: TaskId,
97	kind: Box<str>,
98	status: TaskStatus,
99	input: Box<str>,
100	deps: Box<[TaskId]>,
101	retry_data: Option<Box<str>>,
102	cron_data: Option<Box<str>>,
103	next_at: Option<Timestamp>,
104}
105
106#[async_trait]
107pub trait TaskStore<S: Clone>: Send + Sync {
108	async fn add(&self, task: &TaskMeta<S>, key: Option<&str>) -> ClResult<TaskId>;
109	async fn finished(&self, id: TaskId, output: &str) -> ClResult<()>;
110	async fn load(&self) -> ClResult<Vec<TaskData>>;
111	async fn update_task_error(
112		&self,
113		task_id: TaskId,
114		output: &str,
115		next_at: Option<Timestamp>,
116	) -> ClResult<()>;
117	async fn find_by_key(&self, key: &str) -> ClResult<Option<(TaskId, TaskData)>>;
118	async fn update_task(&self, id: TaskId, task: &TaskMeta<S>) -> ClResult<()>;
119}
120
121// InMemoryTaskStore
122//*******************
123pub struct InMemoryTaskStore {
124	last_id: Mutex<TaskId>,
125}
126
127impl InMemoryTaskStore {
128	pub fn new() -> Arc<Self> {
129		Arc::new(Self { last_id: Mutex::new(0) })
130	}
131}
132
133#[async_trait]
134impl<S: Clone> TaskStore<S> for InMemoryTaskStore {
135	async fn add(&self, _task: &TaskMeta<S>, _key: Option<&str>) -> ClResult<TaskId> {
136		let mut last_id = lock!(self.last_id)?;
137		*last_id += 1;
138		Ok(*last_id)
139	}
140
141	async fn finished(&self, _id: TaskId, _output: &str) -> ClResult<()> {
142		Ok(())
143	}
144
145	async fn load(&self) -> ClResult<Vec<TaskData>> {
146		Ok(vec![])
147	}
148
149	async fn update_task_error(
150		&self,
151		_task_id: TaskId,
152		_output: &str,
153		_next_at: Option<Timestamp>,
154	) -> ClResult<()> {
155		Ok(())
156	}
157
158	async fn find_by_key(&self, _key: &str) -> ClResult<Option<(TaskId, TaskData)>> {
159		// In-memory store doesn't support persistence or keys
160		Ok(None)
161	}
162
163	async fn update_task(&self, _id: TaskId, _task: &TaskMeta<S>) -> ClResult<()> {
164		// In-memory store doesn't support persistence
165		Ok(())
166	}
167}
168
169// MetaAdapterTaskStore
170//**********************
171pub struct MetaAdapterTaskStore {
172	meta_adapter: Arc<dyn meta_adapter::MetaAdapter>,
173}
174
175impl MetaAdapterTaskStore {
176	pub fn new(meta_adapter: Arc<dyn meta_adapter::MetaAdapter>) -> Arc<Self> {
177		Arc::new(Self { meta_adapter })
178	}
179}
180
181#[async_trait]
182impl<S: Clone> TaskStore<S> for MetaAdapterTaskStore {
183	async fn add(&self, task: &TaskMeta<S>, key: Option<&str>) -> ClResult<TaskId> {
184		let id = self
185			.meta_adapter
186			.create_task(task.task.kind_of(), key, &task.task.serialize(), &task.deps)
187			.await?;
188
189		// Store cron schedule if present
190		if let Some(cron) = &task.cron {
191			self.meta_adapter
192				.update_task(
193					id,
194					&meta_adapter::TaskPatch {
195						cron: Patch::Value(cron.to_cron_string()),
196						..Default::default()
197					},
198				)
199				.await?;
200		}
201
202		Ok(id)
203	}
204
205	async fn finished(&self, id: TaskId, output: &str) -> ClResult<()> {
206		self.meta_adapter.update_task_finished(id, output).await
207	}
208
209	async fn load(&self) -> ClResult<Vec<TaskData>> {
210		let tasks = self.meta_adapter.list_tasks(meta_adapter::ListTaskOptions::default()).await?;
211		let tasks = tasks
212			.into_iter()
213			.map(|t| TaskData {
214				id: t.task_id,
215				kind: t.kind,
216				status: match t.status {
217					'P' => TaskStatus::Pending,
218					'F' => TaskStatus::Completed,
219					'E' => TaskStatus::Failed,
220					_ => TaskStatus::Failed,
221				},
222				input: t.input,
223				deps: t.deps,
224				retry_data: t.retry,
225				cron_data: t.cron,
226				next_at: t.next_at,
227			})
228			.collect();
229		Ok(tasks)
230	}
231
232	async fn update_task_error(
233		&self,
234		task_id: TaskId,
235		output: &str,
236		next_at: Option<Timestamp>,
237	) -> ClResult<()> {
238		self.meta_adapter.update_task_error(task_id, output, next_at).await
239	}
240
241	async fn find_by_key(&self, key: &str) -> ClResult<Option<(TaskId, TaskData)>> {
242		let task_opt = self.meta_adapter.find_task_by_key(key).await?;
243
244		match task_opt {
245			Some(t) => Ok(Some((
246				t.task_id,
247				TaskData {
248					id: t.task_id,
249					kind: t.kind,
250					status: match t.status {
251						'P' => TaskStatus::Pending,
252						'F' => TaskStatus::Completed,
253						'E' => TaskStatus::Failed,
254						_ => TaskStatus::Failed,
255					},
256					input: t.input,
257					deps: t.deps,
258					retry_data: t.retry,
259					cron_data: t.cron,
260					next_at: t.next_at,
261				},
262			))),
263			None => Ok(None),
264		}
265	}
266
267	async fn update_task(&self, id: TaskId, task: &TaskMeta<S>) -> ClResult<()> {
268		use cloudillo_types::types::Patch;
269
270		// Build TaskPatch from TaskMeta
271		let mut patch = meta_adapter::TaskPatch {
272			input: Patch::Value(task.task.serialize()),
273			next_at: match task.next_at {
274				Some(ts) => Patch::Value(ts),
275				None => Patch::Null,
276			},
277			..Default::default()
278		};
279
280		// Update deps
281		if !task.deps.is_empty() {
282			patch.deps = Patch::Value(task.deps.clone());
283		}
284
285		// Update retry policy
286		if let Some(ref retry) = task.retry {
287			let retry_str = format!(
288				"{},{},{},{}",
289				task.retry_count, retry.wait_min_max.0, retry.wait_min_max.1, retry.times
290			);
291			patch.retry = Patch::Value(retry_str);
292		}
293
294		// Update cron schedule
295		if let Some(ref cron) = task.cron {
296			patch.cron = Patch::Value(cron.to_cron_string());
297		}
298
299		self.meta_adapter.update_task(id, &patch).await
300	}
301}
302
303// Task metadata
304type TaskBuilder<S> = dyn Fn(TaskId, &str) -> ClResult<Arc<dyn Task<S>>> + Send + Sync;
305
306#[derive(Debug, Clone)]
307pub struct RetryPolicy {
308	wait_min_max: (u64, u64),
309	times: u16,
310}
311
312impl Default for RetryPolicy {
313	fn default() -> Self {
314		Self { wait_min_max: (60, 3600), times: 10 }
315	}
316}
317
318impl RetryPolicy {
319	/// Create a new RetryPolicy with custom min/max backoff and number of retries
320	pub fn new(wait_min_max: (u64, u64), times: u16) -> Self {
321		Self { wait_min_max, times }
322	}
323
324	/// Calculate exponential backoff in seconds: min * (2^attempt), capped at max
325	pub fn calculate_backoff(&self, attempt_count: u16) -> u64 {
326		let (min, max) = self.wait_min_max;
327		let backoff = min * (1u64 << attempt_count as u64);
328		backoff.min(max)
329	}
330
331	/// Check if we should continue retrying
332	pub fn should_retry(&self, attempt_count: u16) -> bool {
333		attempt_count < self.times
334	}
335}
336
337// TaskSchedulerBuilder - Fluent API for task scheduling
338//************************************************************
339pub struct TaskSchedulerBuilder<'a, S: Clone> {
340	scheduler: &'a Scheduler<S>,
341	task: Arc<dyn Task<S>>,
342	key: Option<String>,
343	next_at: Option<Timestamp>,
344	deps: Vec<TaskId>,
345	retry: Option<RetryPolicy>,
346	cron: Option<CronSchedule>,
347}
348
349impl<'a, S: Clone + Send + Sync + 'static> TaskSchedulerBuilder<'a, S> {
350	/// Create a new builder for scheduling a task
351	fn new(scheduler: &'a Scheduler<S>, task: Arc<dyn Task<S>>) -> Self {
352		Self {
353			scheduler,
354			task,
355			key: None,
356			next_at: None,
357			deps: Vec::new(),
358			retry: None,
359			cron: None,
360		}
361	}
362
363	/// Set a string key for task identification
364	pub fn key(mut self, key: impl Into<String>) -> Self {
365		self.key = Some(key.into());
366		self
367	}
368
369	/// Schedule for a specific absolute timestamp
370	pub fn schedule_at(mut self, timestamp: Timestamp) -> Self {
371		self.next_at = Some(timestamp);
372		self
373	}
374
375	/// Schedule after a relative delay (in seconds)
376	pub fn schedule_after(mut self, seconds: i64) -> Self {
377		self.next_at = Some(Timestamp::from_now(seconds));
378		self
379	}
380
381	/// Add task dependencies - task waits for all of these to complete
382	pub fn depend_on(mut self, deps: Vec<TaskId>) -> Self {
383		self.deps = deps;
384		self
385	}
386
387	/// Add a single task dependency
388	pub fn depends_on(mut self, dep: TaskId) -> Self {
389		self.deps.push(dep);
390		self
391	}
392
393	/// Enable automatic retry with exponential backoff
394	pub fn with_retry(mut self, policy: RetryPolicy) -> Self {
395		self.retry = Some(policy);
396		self
397	}
398
399	// ===== Cron Scheduling Methods =====
400
401	/// Schedule task with cron expression
402	/// Example: `.cron("0 9 * * *")` for 9 AM daily
403	pub fn cron(mut self, expr: impl Into<String>) -> Self {
404		if let Ok(cron_schedule) = CronSchedule::parse(&expr.into()) {
405			// Calculate initial next_at from cron schedule
406			// Use .ok() - cron was just parsed successfully, should never fail
407			self.next_at = cron_schedule.next_execution(Timestamp::now()).ok();
408			self.cron = Some(cron_schedule);
409		}
410		self
411	}
412
413	/// Schedule task daily at specified time
414	/// Example: `.daily_at(2, 30)` for 2:30 AM daily
415	pub fn daily_at(mut self, hour: u8, minute: u8) -> Self {
416		if hour <= 23 && minute <= 59 {
417			let expr = format!("{} {} * * *", minute, hour);
418			if let Ok(cron_schedule) = CronSchedule::parse(&expr) {
419				// Calculate initial next_at from cron schedule
420				// Use .ok() - cron was just parsed successfully, should never fail
421				self.next_at = cron_schedule.next_execution(Timestamp::now()).ok();
422				self.cron = Some(cron_schedule);
423			}
424		}
425		self
426	}
427
428	/// Schedule task weekly at specified day and time
429	/// Example: `.weekly_at(1, 14, 30)` for Mondays at 2:30 PM
430	/// weekday: 0=Sunday, 1=Monday, ..., 6=Saturday
431	pub fn weekly_at(mut self, weekday: u8, hour: u8, minute: u8) -> Self {
432		if weekday <= 6 && hour <= 23 && minute <= 59 {
433			let expr = format!("{} {} * * {}", minute, hour, weekday);
434			if let Ok(cron_schedule) = CronSchedule::parse(&expr) {
435				// Calculate initial next_at from cron schedule
436				// Use .ok() - cron was just parsed successfully, should never fail
437				self.next_at = cron_schedule.next_execution(Timestamp::now()).ok();
438				self.cron = Some(cron_schedule);
439			}
440		}
441		self
442	}
443
444	/// Execute the scheduled task immediately
445	pub async fn now(self) -> ClResult<TaskId> {
446		self.schedule().await
447	}
448
449	/// Execute the scheduled task at a specific timestamp
450	pub async fn at(mut self, ts: Timestamp) -> ClResult<TaskId> {
451		self.next_at = Some(ts);
452		self.schedule().await
453	}
454
455	/// Execute the scheduled task after a delay (in seconds)
456	pub async fn after(mut self, seconds: i64) -> ClResult<TaskId> {
457		self.next_at = Some(Timestamp::from_now(seconds));
458		self.schedule().await
459	}
460
461	/// Execute the scheduled task after another task completes
462	pub async fn after_task(mut self, dep: TaskId) -> ClResult<TaskId> {
463		self.deps.push(dep);
464		self.schedule().await
465	}
466
467	/// Execute the scheduled task with automatic retry using default policy
468	pub async fn with_automatic_retry(mut self) -> ClResult<TaskId> {
469		self.retry = Some(RetryPolicy::default());
470		self.schedule().await
471	}
472
473	/// Execute the task with all configured options - main terminal method
474	pub async fn schedule(self) -> ClResult<TaskId> {
475		self.scheduler
476			._schedule_task(
477				self.task,
478				self.key.as_deref(),
479				self.next_at,
480				if self.deps.is_empty() { None } else { Some(self.deps) },
481				self.retry,
482				self.cron,
483			)
484			.await
485	}
486}
487
488#[derive(Debug, Clone)]
489pub struct TaskMeta<S: Clone> {
490	pub task: Arc<dyn Task<S>>,
491	pub next_at: Option<Timestamp>,
492	pub deps: Vec<TaskId>,
493	retry_count: u16,
494	pub retry: Option<RetryPolicy>,
495	pub cron: Option<CronSchedule>,
496}
497
498// Scheduler
499#[allow(clippy::type_complexity)]
500#[derive(Clone)]
501pub struct Scheduler<S: Clone> {
502	task_builders: Arc<RwLock<HashMap<&'static str, Box<TaskBuilder<S>>>>>,
503	store: Arc<dyn TaskStore<S>>,
504	tasks_running: Arc<Mutex<HashMap<TaskId, TaskMeta<S>>>>,
505	tasks_waiting: Arc<Mutex<HashMap<TaskId, TaskMeta<S>>>>,
506	task_dependents: Arc<Mutex<HashMap<TaskId, Vec<TaskId>>>>,
507	tasks_scheduled: Arc<Mutex<BTreeMap<(Timestamp, TaskId), TaskMeta<S>>>>,
508	tx_finish: flume::Sender<TaskId>,
509	rx_finish: flume::Receiver<TaskId>,
510	notify_schedule: Arc<tokio::sync::Notify>,
511}
512
513impl<S: Clone + Send + Sync + 'static> Scheduler<S> {
514	pub fn new(store: Arc<dyn TaskStore<S>>) -> Arc<Self> {
515		let (tx_finish, rx_finish) = flume::unbounded();
516
517		let scheduler = Self {
518			task_builders: Arc::new(RwLock::new(HashMap::new())),
519			store,
520			tasks_running: Arc::new(Mutex::new(HashMap::new())),
521			tasks_waiting: Arc::new(Mutex::new(HashMap::new())),
522			task_dependents: Arc::new(Mutex::new(HashMap::new())),
523			tasks_scheduled: Arc::new(Mutex::new(BTreeMap::new())),
524			tx_finish,
525			rx_finish,
526			notify_schedule: Arc::new(tokio::sync::Notify::new()),
527		};
528
529		//scheduler.run(rx_finish)?;
530
531		Arc::new(scheduler)
532	}
533
534	pub fn start(&self, state: S) {
535		// Handle finished tasks and dependencies
536		let schedule = self.clone();
537		let stat = state.clone();
538		let rx_finish = self.rx_finish.clone();
539
540		tokio::spawn(async move {
541			while let Ok(id) = rx_finish.recv_async().await {
542				debug!("Completed task {} (notified)", id);
543
544				// Get task metadata WITHOUT removing - we only remove after successful transition
545				let task_meta_opt = {
546					let tasks_running = match schedule.tasks_running.lock() {
547						Ok(guard) => guard,
548						Err(poisoned) => {
549							error!("Mutex poisoned: tasks_running (recovering)");
550							poisoned.into_inner()
551						}
552					};
553					tasks_running.get(&id).cloned()
554				};
555
556				if let Some(task_meta) = task_meta_opt {
557					// Track if transition was successful
558					let mut transition_ok = false;
559
560					// Check if this is a recurring task with cron schedule
561					if let Some(ref cron) = task_meta.cron {
562						// Calculate next execution time
563						let next_at = match cron.next_execution(Timestamp::now()) {
564							Ok(ts) => ts,
565							Err(e) => {
566								error!(
567									"Failed to calculate next execution for recurring task {}: {} - task will not reschedule",
568									id, e
569								);
570								// Mark as finished since we can't reschedule
571								if let Err(e) = schedule.store.finished(id, "").await {
572									error!("Failed to mark task {} as finished: {}", id, e);
573								}
574								continue;
575							}
576						};
577						info!(
578							"Recurring task {} completed, scheduling next execution at {}",
579							id, next_at
580						);
581
582						// Update task with new next_at
583						let mut updated_meta = task_meta.clone();
584						updated_meta.next_at = Some(next_at);
585
586						// Update database with new next_at (keep status as Pending)
587						if let Err(e) = schedule.store.update_task(id, &updated_meta).await {
588							error!("Failed to update recurring task {} next_at: {}", id, e);
589						}
590
591						// Remove from running BEFORE add_queue (so add_queue doesn't see it as running)
592						match schedule.tasks_running.lock() {
593							Ok(mut tasks_running) => {
594								tasks_running.remove(&id);
595							}
596							Err(poisoned) => {
597								error!("Mutex poisoned: tasks_running (recovering)");
598								poisoned.into_inner().remove(&id);
599							}
600						}
601
602						// Re-add to scheduler with new execution time
603						match schedule.add_queue(id, updated_meta).await {
604							Ok(_) => transition_ok = true,
605							Err(e) => {
606								error!(
607									"Failed to reschedule recurring task {}: {} - task lost!",
608									id, e
609								);
610							}
611						}
612					} else {
613						// One-time task - mark as finished
614						match schedule.store.finished(id, "").await {
615							Ok(_) => transition_ok = true,
616							Err(e) => {
617								error!(
618									"Failed to mark task {} as finished: {} - task remains in running queue",
619									id, e
620								);
621							}
622						}
623					}
624
625					// Only remove from running queue after successful transition
626					if transition_ok {
627						match schedule.tasks_running.lock() {
628							Ok(mut tasks_running) => {
629								tasks_running.remove(&id);
630							}
631							Err(poisoned) => {
632								error!("Mutex poisoned: tasks_running (recovering)");
633								poisoned.into_inner().remove(&id);
634							}
635						}
636					}
637
638					// Handle dependencies of finished task using atomic release method
639					match schedule.release_dependents(id) {
640						Ok(ready_to_spawn) => {
641							for (dep_id, dep_task_meta) in ready_to_spawn {
642								// Add to running queue before spawning
643								match schedule.tasks_running.lock() {
644									Ok(mut tasks_running) => {
645										tasks_running.insert(dep_id, dep_task_meta.clone());
646									}
647									Err(poisoned) => {
648										error!("Mutex poisoned: tasks_running (recovering)");
649										poisoned.into_inner().insert(dep_id, dep_task_meta.clone());
650									}
651								}
652								schedule.spawn_task(
653									stat.clone(),
654									dep_task_meta.task.clone(),
655									dep_id,
656									dep_task_meta,
657								);
658							}
659						}
660						Err(e) => {
661							error!("Failed to release dependents of task {}: {}", id, e);
662						}
663					}
664				} else {
665					warn!("Completed task {} not found in running queue", id);
666				}
667			}
668		});
669
670		// Handle scheduled tasks
671		let schedule = self.clone();
672		tokio::spawn(async move {
673			loop {
674				let is_empty = match schedule.tasks_scheduled.lock() {
675					Ok(guard) => guard.is_empty(),
676					Err(poisoned) => {
677						error!("Mutex poisoned: tasks_scheduled (recovering)");
678						poisoned.into_inner().is_empty()
679					}
680				};
681				if is_empty {
682					schedule.notify_schedule.notified().await;
683				}
684				let time = Timestamp::now();
685				if let Some((timestamp, _id)) = loop {
686					let mut tasks_scheduled = match schedule.tasks_scheduled.lock() {
687						Ok(guard) => guard,
688						Err(poisoned) => {
689							error!("Mutex poisoned: tasks_scheduled (recovering)");
690							poisoned.into_inner()
691						}
692					};
693					if let Some((&(timestamp, id), _)) = tasks_scheduled.first_key_value() {
694						let (timestamp, id) = (timestamp, id);
695						if timestamp <= Timestamp::now() {
696							debug!("Spawning task id {} (from schedule)", id);
697							if let Some(task) = tasks_scheduled.remove(&(timestamp, id)) {
698								let mut tasks_running = match schedule.tasks_running.lock() {
699									Ok(guard) => guard,
700									Err(poisoned) => {
701										error!("Mutex poisoned: tasks_running (recovering)");
702										poisoned.into_inner()
703									}
704								};
705								tasks_running.insert(id, task.clone());
706								schedule.spawn_task(state.clone(), task.task.clone(), id, task);
707							} else {
708								error!("Task disappeared while being removed from schedule");
709								break None;
710							}
711						} else {
712							break Some((timestamp, id));
713						}
714					} else {
715						break None;
716					}
717				} {
718					let wait = tokio::time::Duration::from_secs((timestamp.0 - time.0) as u64);
719					tokio::select! {
720						_ = tokio::time::sleep(wait) => (), _ = schedule.notify_schedule.notified() => ()
721					};
722				}
723			}
724		});
725
726		let schedule = self.clone();
727		tokio::spawn(async move {
728			let _ignore_err = schedule.load().await;
729		});
730	}
731
732	fn register_builder(
733		&self,
734		name: &'static str,
735		builder: &'static TaskBuilder<S>,
736	) -> ClResult<&Self> {
737		let mut task_builders = self
738			.task_builders
739			.write()
740			.map_err(|_| Error::Internal("task_builders RwLock poisoned".into()))?;
741		task_builders.insert(name, Box::new(builder));
742		Ok(self)
743	}
744
745	pub fn register<T: Task<S>>(&self) -> ClResult<&Self> {
746		info!("Registering task type {}", T::kind());
747		self.register_builder(T::kind(), &|id: TaskId, params: &str| T::build(id, params))?;
748		Ok(self)
749	}
750
751	/// Create a builder for scheduling a task using the fluent API
752	pub fn task(&self, task: Arc<dyn Task<S>>) -> TaskSchedulerBuilder<'_, S> {
753		TaskSchedulerBuilder::new(self, task)
754	}
755
756	/// Internal method to schedule a task with all options
757	/// This is the core implementation used by the builder pattern
758	async fn _schedule_task(
759		&self,
760		task: Arc<dyn Task<S>>,
761		key: Option<&str>,
762		next_at: Option<Timestamp>,
763		deps: Option<Vec<TaskId>>,
764		retry: Option<RetryPolicy>,
765		cron: Option<CronSchedule>,
766	) -> ClResult<TaskId> {
767		let task_meta = TaskMeta {
768			task: task.clone(),
769			next_at,
770			deps: deps.clone().unwrap_or_default(),
771			retry_count: 0,
772			retry,
773			cron,
774		};
775
776		// Check if a task with this key already exists (key-based deduplication)
777		if let Some(key) = key {
778			if let Some((existing_id, existing_data)) = self.store.find_by_key(key).await? {
779				let new_serialized = task.serialize();
780				let existing_serialized = existing_data.input.as_ref();
781
782				// Compare serialized parameters
783				if new_serialized == existing_serialized {
784					info!(
785						"Recurring task '{}' already exists with identical parameters (id={})",
786						key, existing_id
787					);
788					// Update DB with current cron/next_at (may differ from what's stored)
789					self.store.update_task(existing_id, &task_meta).await?;
790					// Ensure the existing task is queued (may be loaded from DB but not yet in queue)
791					self.add_queue(existing_id, task_meta).await?;
792					return Ok(existing_id);
793				} else {
794					info!(
795						"Updating recurring task '{}' (id={}) - parameters changed",
796						key, existing_id
797					);
798					info!("  Old params: {}", existing_serialized);
799					info!("  New params: {}", new_serialized);
800
801					// Remove from all queues (if present)
802					self.remove_from_queues(existing_id)?;
803
804					// Update the task in database with new parameters
805					self.store.update_task(existing_id, &task_meta).await?;
806
807					// Re-add to appropriate queue with updated parameters
808					self.add_queue(existing_id, task_meta).await?;
809
810					return Ok(existing_id);
811				}
812			}
813		}
814
815		// No existing task - create new one
816		let id = self.store.add(&task_meta, key).await?;
817		self.add_queue(id, task_meta).await
818	}
819
820	pub async fn add(&self, task: Arc<dyn Task<S>>) -> ClResult<TaskId> {
821		self.task(task).now().await
822	}
823
824	pub async fn add_queue(&self, id: TaskId, task_meta: TaskMeta<S>) -> ClResult<TaskId> {
825		// If task is already running, update its metadata (especially for cron updates)
826		// but don't add to scheduled queue (it will reschedule on completion)
827		{
828			let mut running = lock!(self.tasks_running, "tasks_running")?;
829			if let Some(existing_meta) = running.get_mut(&id) {
830				debug!(
831					"Task {} is already running, updating metadata (will reschedule on completion)",
832					id
833				);
834				// Update the running task's metadata so it has the latest cron schedule
835				*existing_meta = task_meta;
836				return Ok(id);
837			}
838		}
839
840		// Remove from other queues if present (prevents duplicate entries with different timestamps)
841		{
842			let mut scheduled = lock!(self.tasks_scheduled, "tasks_scheduled")?;
843			if let Some(key) = scheduled
844				.iter()
845				.find(|((_, tid), _)| *tid == id)
846				.map(|((ts, tid), _)| (*ts, *tid))
847			{
848				scheduled.remove(&key);
849				debug!("Removed existing scheduled entry for task {} before re-queueing", id);
850			}
851		}
852		{
853			let mut waiting = lock!(self.tasks_waiting, "tasks_waiting")?;
854			if waiting.remove(&id).is_some() {
855				debug!("Removed existing waiting entry for task {} before re-queueing", id);
856			}
857		}
858
859		let deps = task_meta.deps.clone();
860
861		// VALIDATION: Tasks with dependencies should NEVER be in tasks_scheduled
862		if !deps.is_empty() && task_meta.next_at.is_some() {
863			warn!("Task {} has both dependencies and scheduled time - ignoring next_at, placing in waiting queue", id);
864			// Force to tasks_waiting instead
865			lock!(self.tasks_waiting, "tasks_waiting")?.insert(id, task_meta);
866			debug!("Task {} is waiting for {:?}", id, &deps);
867			for dep in deps {
868				lock!(self.task_dependents, "task_dependents")?.entry(dep).or_default().push(id);
869			}
870			return Ok(id);
871		}
872
873		if deps.is_empty() && task_meta.next_at.unwrap_or(Timestamp(0)) < Timestamp::now() {
874			debug!("Spawning task {}", id);
875			lock!(self.tasks_scheduled, "tasks_scheduled")?.insert((Timestamp(0), id), task_meta);
876			self.notify_schedule.notify_one();
877		} else if let Some(next_at) = task_meta.next_at {
878			debug!("Scheduling task {} for {}", id, next_at);
879			lock!(self.tasks_scheduled, "tasks_scheduled")?.insert((next_at, id), task_meta);
880			self.notify_schedule.notify_one();
881		} else {
882			lock!(self.tasks_waiting, "tasks_waiting")?.insert(id, task_meta);
883			debug!("Task {} is waiting for {:?}", id, &deps);
884			for dep in deps {
885				lock!(self.task_dependents, "task_dependents")?.entry(dep).or_default().push(id);
886			}
887		}
888		Ok(id)
889	}
890
891	/// Remove a task from all internal queues (waiting, scheduled, running)
892	/// Returns the removed TaskMeta if found
893	fn remove_from_queues(&self, task_id: TaskId) -> ClResult<Option<TaskMeta<S>>> {
894		// Try tasks_waiting
895		if let Some(task_meta) = lock!(self.tasks_waiting, "tasks_waiting")?.remove(&task_id) {
896			debug!("Removed task {} from waiting queue for update", task_id);
897			return Ok(Some(task_meta));
898		}
899
900		// Try tasks_scheduled (need to find by task_id in BTreeMap)
901		{
902			let mut scheduled = lock!(self.tasks_scheduled, "tasks_scheduled")?;
903			if let Some(key) = scheduled
904				.iter()
905				.find(|((_, id), _)| *id == task_id)
906				.map(|((ts, id), _)| (*ts, *id))
907			{
908				if let Some(task_meta) = scheduled.remove(&key) {
909					debug!("Removed task {} from scheduled queue for update", task_id);
910					return Ok(Some(task_meta));
911				}
912			}
913		}
914
915		// Try tasks_running (should rarely happen, but handle it)
916		if let Some(task_meta) = lock!(self.tasks_running, "tasks_running")?.remove(&task_id) {
917			warn!("Removed task {} from running queue during update", task_id);
918			return Ok(Some(task_meta));
919		}
920
921		Ok(None)
922	}
923
924	/// Release all dependent tasks of a completed task
925	/// This method safely handles dependency cleanup and spawning
926	fn release_dependents(
927		&self,
928		completed_task_id: TaskId,
929	) -> ClResult<Vec<(TaskId, TaskMeta<S>)>> {
930		// Get list of dependents (atomic removal to prevent re-processing)
931		let dependents = {
932			let mut deps_map = lock!(self.task_dependents, "task_dependents")?;
933			deps_map.remove(&completed_task_id).unwrap_or_default()
934		};
935
936		if dependents.is_empty() {
937			return Ok(Vec::new()); // No dependents to release
938		}
939
940		debug!("Releasing {} dependents of completed task {}", dependents.len(), completed_task_id);
941
942		let mut ready_to_spawn = Vec::new();
943
944		// For each dependent, check and remove dependency
945		for dependent_id in dependents {
946			// Try tasks_waiting first (most common case for dependent tasks)
947			{
948				let mut waiting = lock!(self.tasks_waiting, "tasks_waiting")?;
949				if let Some(task_meta) = waiting.get_mut(&dependent_id) {
950					// Remove the completed task from dependencies
951					task_meta.deps.retain(|x| *x != completed_task_id);
952
953					// If all dependencies are cleared, remove and queue for spawning
954					if task_meta.deps.is_empty() {
955						if let Some(task_to_spawn) = waiting.remove(&dependent_id) {
956							debug!(
957								"Dependent task {} ready to spawn (all dependencies cleared)",
958								dependent_id
959							);
960							ready_to_spawn.push((dependent_id, task_to_spawn));
961						}
962					} else {
963						debug!(
964							"Task {} still has {} remaining dependencies",
965							dependent_id,
966							task_meta.deps.len()
967						);
968					}
969					continue;
970				}
971			}
972
973			// Try tasks_scheduled if not in waiting (shouldn't happen with validation, but be defensive)
974			{
975				let mut scheduled = lock!(self.tasks_scheduled, "tasks_scheduled")?;
976				if let Some(scheduled_key) = scheduled
977					.iter()
978					.find(|((_, id), _)| *id == dependent_id)
979					.map(|((ts, id), _)| (*ts, *id))
980				{
981					if let Some(task_meta) = scheduled.get_mut(&scheduled_key) {
982						task_meta.deps.retain(|x| *x != completed_task_id);
983						let remaining = task_meta.deps.len();
984						if remaining == 0 {
985							debug!(
986								"Task {} in scheduled queue has no remaining dependencies",
987								dependent_id
988							);
989						} else {
990							debug!(
991								"Task {} in scheduled queue has {} remaining dependencies",
992								dependent_id, remaining
993							);
994						}
995					}
996					continue;
997				}
998			}
999
1000			// Task not found in any queue
1001			warn!(
1002				"Dependent task {} of completed task {} not found in any queue",
1003				dependent_id, completed_task_id
1004			);
1005		}
1006
1007		Ok(ready_to_spawn)
1008	}
1009
1010	async fn load(&self) -> ClResult<()> {
1011		let tasks = self.store.load().await?;
1012		debug!("Loaded {} tasks from store", tasks.len());
1013		for t in tasks {
1014			if let TaskStatus::Pending = t.status {
1015				debug!("Loading task {} {}", t.id, t.kind);
1016				let task = {
1017					let builder_map = self
1018						.task_builders
1019						.read()
1020						.map_err(|_| Error::Internal("task_builders RwLock poisoned".into()))?;
1021					let builder = builder_map.get(t.kind.as_ref()).ok_or(Error::Internal(
1022						format!("task builder not registered: {}", t.kind),
1023					))?;
1024					builder(t.id, &t.input)?
1025				};
1026				let (retry_count, retry) = match t.retry_data {
1027					Some(retry_str) => {
1028						let (retry_count, retry_min, retry_max, retry_times) = retry_str
1029							.split(',')
1030							.collect_tuple()
1031							.ok_or(Error::Internal("invalid retry policy format".into()))?;
1032						let retry_count: u16 = retry_count
1033							.parse()
1034							.map_err(|_| Error::Internal("retry count must be u16".into()))?;
1035						let retry = RetryPolicy {
1036							wait_min_max: (
1037								retry_min
1038									.parse()
1039									.map_err(|_| Error::Internal("retry_min must be u64".into()))?,
1040								retry_max
1041									.parse()
1042									.map_err(|_| Error::Internal("retry_max must be u64".into()))?,
1043							),
1044							times: retry_times
1045								.parse()
1046								.map_err(|_| Error::Internal("retry times must be u64".into()))?,
1047						};
1048						debug!("Loaded retry policy: {:?}", retry);
1049						(retry_count, Some(retry))
1050					}
1051					_ => (0, None),
1052				};
1053				// Parse cron data if present
1054				let cron =
1055					t.cron_data.as_ref().and_then(|cron_str| CronSchedule::parse(cron_str).ok());
1056
1057				let task_meta = TaskMeta {
1058					task,
1059					next_at: t.next_at,
1060					deps: t.deps.into(),
1061					retry_count,
1062					retry,
1063					cron,
1064				};
1065				self.add_queue(t.id, task_meta).await?;
1066			}
1067		}
1068		Ok(())
1069	}
1070
1071	fn spawn_task(&self, state: S, task: Arc<dyn Task<S>>, id: TaskId, task_meta: TaskMeta<S>) {
1072		let tx_finish = self.tx_finish.clone();
1073		let store = self.store.clone();
1074		let scheduler = self.clone();
1075		//let state = self.state.clone();
1076		tokio::spawn(async move {
1077			match task.run(&state).await {
1078				Ok(()) => {
1079					debug!("Task {} completed successfully", id);
1080					tx_finish.send(id).unwrap_or(());
1081				}
1082				Err(e) => {
1083					if let Some(retry_policy) = &task_meta.retry {
1084						if retry_policy.should_retry(task_meta.retry_count) {
1085							let backoff = retry_policy.calculate_backoff(task_meta.retry_count);
1086							let next_at = Timestamp::from_now(backoff as i64);
1087
1088							info!(
1089								"Task {} failed (attempt {}/{}). Scheduling retry in {} seconds: {}",
1090								id, task_meta.retry_count + 1, retry_policy.times, backoff, e
1091							);
1092
1093							// Update database with error and reschedule
1094							store
1095								.update_task_error(id, &e.to_string(), Some(next_at))
1096								.await
1097								.unwrap_or(());
1098
1099							// Remove from running tasks (we're not sending finish event)
1100							match scheduler.tasks_running.lock() {
1101								Ok(mut tasks_running) => {
1102									tasks_running.remove(&id);
1103								}
1104								Err(poisoned) => {
1105									error!("Mutex poisoned: tasks_running (recovering)");
1106									poisoned.into_inner().remove(&id);
1107								}
1108							}
1109
1110							// Re-queue task with incremented retry count
1111							let mut retry_meta = task_meta.clone();
1112							retry_meta.retry_count += 1;
1113							retry_meta.next_at = Some(next_at);
1114							scheduler.add_queue(id, retry_meta).await.unwrap_or(id);
1115						} else {
1116							// Max retries exhausted
1117							error!(
1118								"Task {} failed after {} retries: {}",
1119								id, task_meta.retry_count, e
1120							);
1121							store.update_task_error(id, &e.to_string(), None).await.unwrap_or(());
1122							tx_finish.send(id).unwrap_or(());
1123						}
1124					} else {
1125						// No retry policy - fail immediately
1126						error!("Task {} failed: {}", id, e);
1127						store.update_task_error(id, &e.to_string(), None).await.unwrap_or(());
1128						tx_finish.send(id).unwrap_or(());
1129					}
1130				}
1131			}
1132		});
1133	}
1134
1135	/// Get health status of the scheduler
1136	/// Returns information about tasks in each queue and detects anomalies
1137	pub async fn health_check(&self) -> ClResult<SchedulerHealth> {
1138		let waiting_count = lock!(self.tasks_waiting, "tasks_waiting")?.len();
1139		let scheduled_count = lock!(self.tasks_scheduled, "tasks_scheduled")?.len();
1140		let running_count = lock!(self.tasks_running, "tasks_running")?.len();
1141		let dependents_count = lock!(self.task_dependents, "task_dependents")?.len();
1142
1143		// Check for anomalies
1144		let mut stuck_tasks = Vec::new();
1145		let mut tasks_with_missing_deps = Vec::new();
1146
1147		// Check tasks_waiting for tasks with no dependencies (stuck)
1148		{
1149			let waiting = lock!(self.tasks_waiting, "tasks_waiting")?;
1150			let _deps_map = lock!(self.task_dependents, "task_dependents")?;
1151
1152			for (id, task_meta) in waiting.iter() {
1153				if task_meta.deps.is_empty() {
1154					stuck_tasks.push(*id);
1155					warn!("SCHEDULER HEALTH: Task {} in waiting with no dependencies", id);
1156				} else {
1157					// Check if all dependencies still exist
1158					for dep in &task_meta.deps {
1159						// Check if dependency is in any queue or dependents map
1160						let dep_exists = self
1161							.tasks_running
1162							.lock()
1163							.ok()
1164							.map(|r| r.contains_key(dep))
1165							.unwrap_or(false) || self
1166							.tasks_waiting
1167							.lock()
1168							.ok()
1169							.map(|w| w.contains_key(dep))
1170							.unwrap_or(false) || self
1171							.tasks_scheduled
1172							.lock()
1173							.ok()
1174							.map(|s| s.iter().any(|((_, task_id), _)| task_id == dep))
1175							.unwrap_or(false);
1176
1177						if !dep_exists {
1178							tasks_with_missing_deps.push((*id, *dep));
1179							warn!(
1180								"SCHEDULER HEALTH: Task {} depends on non-existent task {}",
1181								id, dep
1182							);
1183						}
1184					}
1185				}
1186			}
1187		}
1188
1189		Ok(SchedulerHealth {
1190			waiting: waiting_count,
1191			scheduled: scheduled_count,
1192			running: running_count,
1193			dependents: dependents_count,
1194			stuck_tasks,
1195			tasks_with_missing_deps,
1196		})
1197	}
1198}
1199
1200/// Health status of the scheduler
1201#[derive(Debug, Clone)]
1202pub struct SchedulerHealth {
1203	/// Number of tasks waiting for dependencies
1204	pub waiting: usize,
1205	/// Number of tasks scheduled for future execution
1206	pub scheduled: usize,
1207	/// Number of tasks currently running
1208	pub running: usize,
1209	/// Number of task entries in dependents map
1210	pub dependents: usize,
1211	/// IDs of tasks with no dependencies but still in waiting queue
1212	pub stuck_tasks: Vec<TaskId>,
1213	/// Pairs of (task_id, missing_dependency_id) where dependency doesn't exist
1214	pub tasks_with_missing_deps: Vec<(TaskId, TaskId)>,
1215}
1216
1217#[cfg(test)]
1218mod tests {
1219	use super::*;
1220	use serde::{Deserialize, Serialize};
1221
1222	type State = Arc<Mutex<Vec<u8>>>;
1223
1224	#[derive(Debug, Serialize, Deserialize)]
1225	struct TestTask {
1226		num: u8,
1227	}
1228
1229	impl TestTask {
1230		pub fn new(num: u8) -> Arc<Self> {
1231			Arc::new(Self { num })
1232		}
1233	}
1234
1235	#[async_trait]
1236	impl Task<State> for TestTask {
1237		fn kind() -> &'static str {
1238			"test"
1239		}
1240
1241		fn build(_id: TaskId, ctx: &str) -> ClResult<Arc<dyn Task<State>>> {
1242			let num: u8 = ctx
1243				.parse()
1244				.map_err(|_| Error::Internal("test task context must be u8".into()))?;
1245			let task = TestTask::new(num);
1246			Ok(task)
1247		}
1248
1249		fn serialize(&self) -> String {
1250			self.num.to_string()
1251		}
1252
1253		fn kind_of(&self) -> &'static str {
1254			"test"
1255		}
1256
1257		async fn run(&self, state: &State) -> ClResult<()> {
1258			info!("Running task {}", self.num);
1259			tokio::time::sleep(std::time::Duration::from_millis(200 * self.num as u64)).await;
1260			info!("Completed task {}", self.num);
1261			state.lock().unwrap().push(self.num);
1262			Ok(())
1263		}
1264	}
1265
1266	#[derive(Debug, Serialize, Deserialize, Clone)]
1267	struct FailingTask {
1268		id: u8,
1269		fail_count: u8,
1270		attempt: Arc<Mutex<u8>>,
1271	}
1272
1273	impl FailingTask {
1274		pub fn new(id: u8, fail_count: u8) -> Arc<Self> {
1275			Arc::new(Self { id, fail_count, attempt: Arc::new(Mutex::new(0)) })
1276		}
1277	}
1278
1279	#[async_trait]
1280	impl Task<State> for FailingTask {
1281		fn kind() -> &'static str {
1282			"failing"
1283		}
1284
1285		fn build(_id: TaskId, ctx: &str) -> ClResult<Arc<dyn Task<State>>> {
1286			let parts: Vec<&str> = ctx.split(',').collect();
1287			if parts.len() != 2 {
1288				return Err(Error::Internal("failing task context must have 2 parts".into()));
1289			}
1290			let id: u8 = parts[0]
1291				.parse()
1292				.map_err(|_| Error::Internal("failing task id must be u8".into()))?;
1293			let fail_count: u8 = parts[1]
1294				.parse()
1295				.map_err(|_| Error::Internal("failing task fail_count must be u8".into()))?;
1296			Ok(FailingTask::new(id, fail_count))
1297		}
1298
1299		fn serialize(&self) -> String {
1300			format!("{},{}", self.id, self.fail_count)
1301		}
1302
1303		fn kind_of(&self) -> &'static str {
1304			"failing"
1305		}
1306
1307		async fn run(&self, state: &State) -> ClResult<()> {
1308			let mut attempt = self.attempt.lock().unwrap();
1309			*attempt += 1;
1310			let current_attempt = *attempt;
1311
1312			info!("FailingTask {} - attempt {}/{}", self.id, current_attempt, self.fail_count + 1);
1313
1314			if current_attempt <= self.fail_count {
1315				error!("FailingTask {} failed on attempt {}", self.id, current_attempt);
1316				return Err(Error::ServiceUnavailable(format!("Task {} failed", self.id)));
1317			}
1318
1319			info!("FailingTask {} succeeded on attempt {}", self.id, current_attempt);
1320			state.lock().unwrap().push(self.id);
1321			Ok(())
1322		}
1323	}
1324
1325	#[tokio::test]
1326	pub async fn test_scheduler() {
1327		let _ = tracing_subscriber::fmt().try_init();
1328
1329		let task_store: Arc<dyn TaskStore<State>> = InMemoryTaskStore::new();
1330		let state: State = Arc::new(Mutex::new(Vec::new()));
1331		let scheduler = Scheduler::new(task_store);
1332		scheduler.start(state.clone());
1333		scheduler.register::<TestTask>().unwrap();
1334
1335		let _task1 = TestTask::new(1);
1336		let task2 = TestTask::new(1);
1337		let task3 = TestTask::new(1);
1338
1339		let task2_id = scheduler.task(task2).schedule_after(2).schedule().await.unwrap();
1340		let task3_id = scheduler.add(task3).await.unwrap();
1341		scheduler
1342			.task(TestTask::new(1))
1343			.depend_on(vec![task2_id, task3_id])
1344			.schedule()
1345			.await
1346			.unwrap();
1347
1348		tokio::time::sleep(std::time::Duration::from_secs(4)).await;
1349		let task4 = TestTask::new(1);
1350		let task5 = TestTask::new(1);
1351		scheduler.task(task4).schedule_after(2).schedule().await.unwrap();
1352		scheduler.task(task5).schedule_after(1).schedule().await.unwrap();
1353
1354		tokio::time::sleep(std::time::Duration::from_secs(3)).await;
1355
1356		let st = state.lock().unwrap();
1357		info!("res: {}", st.len());
1358		let str_vec = st.iter().map(|x| x.to_string()).collect::<Vec<String>>();
1359		assert_eq!(str_vec.join(":"), "1:1:1:1:1");
1360	}
1361
1362	#[tokio::test]
1363	pub async fn test_retry_with_backoff() {
1364		let _ = tracing_subscriber::fmt().try_init();
1365
1366		let task_store: Arc<dyn TaskStore<State>> = InMemoryTaskStore::new();
1367		let state: State = Arc::new(Mutex::new(Vec::new()));
1368		let scheduler = Scheduler::new(task_store);
1369		scheduler.start(state.clone());
1370		scheduler.register::<FailingTask>().unwrap();
1371
1372		// Create a task that fails twice, then succeeds
1373		// With retry policy: min=1s, max=3600s, max_attempts=3
1374		let failing_task = FailingTask::new(42, 2);
1375		let retry_policy = RetryPolicy { wait_min_max: (1, 3600), times: 3 };
1376
1377		scheduler.task(failing_task).with_retry(retry_policy).schedule().await.unwrap();
1378
1379		// Wait for retries: 1s (1st fail) + 1s (2nd fail) + time for success
1380		// First attempt: immediate fail
1381		// Wait 1s (min backoff)
1382		// Second attempt: fail
1383		// Wait 2s (min * 2)
1384		// Third attempt: success
1385		tokio::time::sleep(std::time::Duration::from_secs(6)).await;
1386
1387		let st = state.lock().unwrap();
1388		assert_eq!(st.len(), 1, "Task should have succeeded after retries");
1389		assert_eq!(st[0], 42);
1390	}
1391
1392	// ===== Builder Pattern Tests =====
1393
1394	#[tokio::test]
1395	pub async fn test_builder_simple_schedule() {
1396		let task_store: Arc<dyn TaskStore<State>> = InMemoryTaskStore::new();
1397		let state: State = Arc::new(Mutex::new(Vec::new()));
1398		let scheduler = Scheduler::new(task_store);
1399		scheduler.start(state.clone());
1400		scheduler.register::<TestTask>().unwrap();
1401
1402		// Test basic builder usage: .now()
1403		let task = TestTask::new(1);
1404		let id = scheduler.task(task).now().await.unwrap();
1405
1406		assert!(id > 0, "Task ID should be positive");
1407
1408		tokio::time::sleep(std::time::Duration::from_millis(500)).await;
1409
1410		let st = state.lock().unwrap();
1411		assert_eq!(st.len(), 1, "Task should have executed");
1412		assert_eq!(st[0], 1);
1413	}
1414
1415	#[tokio::test]
1416	pub async fn test_builder_with_key() {
1417		let task_store: Arc<dyn TaskStore<State>> = InMemoryTaskStore::new();
1418		let state: State = Arc::new(Mutex::new(Vec::new()));
1419		let scheduler = Scheduler::new(task_store);
1420		scheduler.start(state.clone());
1421		scheduler.register::<TestTask>().unwrap();
1422
1423		// Test builder with key
1424		let task = TestTask::new(1);
1425		let _id = scheduler.task(task).key("my-task-key").now().await.unwrap();
1426
1427		tokio::time::sleep(std::time::Duration::from_millis(500)).await;
1428
1429		let st = state.lock().unwrap();
1430		assert_eq!(st.len(), 1);
1431		assert_eq!(st[0], 1);
1432	}
1433
1434	#[tokio::test]
1435	pub async fn test_builder_with_delay() {
1436		let task_store: Arc<dyn TaskStore<State>> = InMemoryTaskStore::new();
1437		let state: State = Arc::new(Mutex::new(Vec::new()));
1438		let scheduler = Scheduler::new(task_store);
1439		scheduler.start(state.clone());
1440		scheduler.register::<TestTask>().unwrap();
1441
1442		// Test builder with .after() convenience method
1443		let task = TestTask::new(1);
1444		let _id = scheduler
1445			.task(task)
1446			.after(1)  // 1 second delay
1447			.await
1448			.unwrap();
1449
1450		// Should not have executed yet
1451		tokio::time::sleep(std::time::Duration::from_millis(500)).await;
1452		{
1453			let st = state.lock().unwrap();
1454			assert_eq!(st.len(), 0, "Task should not execute yet");
1455		}
1456
1457		// Wait for execution (1 sec delay + 200ms task sleep + buffer)
1458		tokio::time::sleep(std::time::Duration::from_millis(800)).await;
1459
1460		{
1461			let st = state.lock().unwrap();
1462			assert_eq!(st.len(), 1, "Task should have executed");
1463			assert_eq!(st[0], 1);
1464		}
1465	}
1466
1467	#[tokio::test]
1468	pub async fn test_builder_with_dependencies() {
1469		let task_store: Arc<dyn TaskStore<State>> = InMemoryTaskStore::new();
1470		let state: State = Arc::new(Mutex::new(Vec::new()));
1471		let scheduler = Scheduler::new(task_store);
1472		scheduler.start(state.clone());
1473		scheduler.register::<TestTask>().unwrap();
1474
1475		// Create first task (sleeps 200ms)
1476		let task1 = TestTask::new(1);
1477		let id1 = scheduler.task(task1).now().await.unwrap();
1478
1479		// Create second task (sleeps 400ms)
1480		let task2 = TestTask::new(1);
1481		let id2 = scheduler.task(task2).now().await.unwrap();
1482
1483		// Create third task that depends on first two (sleeps 600ms)
1484		let task3 = TestTask::new(1);
1485		let _id3 = scheduler.task(task3).depend_on(vec![id1, id2]).schedule().await.unwrap();
1486
1487		// Wait for all tasks: task1 200ms, task2 400ms, task3 600ms = ~1200ms
1488		tokio::time::sleep(std::time::Duration::from_millis(1500)).await;
1489
1490		let st = state.lock().unwrap();
1491		// Should have all three tasks in execution order: 1 finishes first (200ms), then 2 (200ms), then 3 (200ms after both)
1492		let str_vec = st.iter().map(|x| x.to_string()).collect::<Vec<String>>();
1493		assert_eq!(str_vec.join(":"), "1:1:1");
1494	}
1495
1496	#[tokio::test]
1497	pub async fn test_builder_with_retry() {
1498		let task_store: Arc<dyn TaskStore<State>> = InMemoryTaskStore::new();
1499		let state: State = Arc::new(Mutex::new(Vec::new()));
1500		let scheduler = Scheduler::new(task_store);
1501		scheduler.start(state.clone());
1502		scheduler.register::<FailingTask>().unwrap();
1503
1504		// Create task using builder with retry policy
1505		let failing_task = FailingTask::new(55, 1); // Fails once, succeeds second time
1506		let retry_policy = RetryPolicy { wait_min_max: (1, 3600), times: 3 };
1507
1508		let _id = scheduler.task(failing_task).with_retry(retry_policy).schedule().await.unwrap();
1509
1510		// Wait for retry cycle: 1 fail + 1s wait + 1 success
1511		tokio::time::sleep(std::time::Duration::from_secs(3)).await;
1512
1513		let st = state.lock().unwrap();
1514		assert_eq!(st.len(), 1);
1515		assert_eq!(st[0], 55);
1516	}
1517
1518	#[tokio::test]
1519	pub async fn test_builder_with_automatic_retry() {
1520		let task_store: Arc<dyn TaskStore<State>> = InMemoryTaskStore::new();
1521		let state: State = Arc::new(Mutex::new(Vec::new()));
1522		let scheduler = Scheduler::new(task_store);
1523		scheduler.start(state.clone());
1524		scheduler.register::<FailingTask>().unwrap();
1525
1526		// Create task using builder with automatic retry (default policy)
1527		let failing_task = FailingTask::new(66, 1);
1528		let _id = scheduler.task(failing_task).with_automatic_retry().await.unwrap();
1529
1530		// Wait for retry cycle with default policy (min=60s would be too long for test)
1531		// but we already tested retry logic thoroughly, just verify builder integration
1532		tokio::time::sleep(std::time::Duration::from_millis(500)).await;
1533
1534		// The important part is that this compiles and integrates correctly
1535		let st = state.lock().unwrap();
1536		// With default policy (min=60s), task shouldn't succeed in test timeframe
1537		// Just verify builder chaining works
1538		let _ = st.len(); // Verify state is accessible, but don't assert on timeout-dependent result
1539	}
1540
1541	#[tokio::test]
1542	pub async fn test_builder_fluent_chaining() {
1543		let task_store: Arc<dyn TaskStore<State>> = InMemoryTaskStore::new();
1544		let state: State = Arc::new(Mutex::new(Vec::new()));
1545		let scheduler = Scheduler::new(task_store);
1546		scheduler.start(state.clone());
1547		scheduler.register::<TestTask>().unwrap();
1548
1549		// Create first dependencies
1550		let dep1 = scheduler.task(TestTask::new(1)).now().await.unwrap();
1551		let dep2 = scheduler.task(TestTask::new(1)).now().await.unwrap();
1552
1553		// Test fluent chaining with multiple methods
1554		let retry_policy = RetryPolicy { wait_min_max: (1, 3600), times: 3 };
1555
1556		let task = TestTask::new(1);
1557		let _id = scheduler
1558			.task(task)
1559			.key("complex-task")
1560			.schedule_after(0)  // Schedule immediately
1561			.depend_on(vec![dep1, dep2])
1562			.with_retry(retry_policy)
1563			.schedule()
1564			.await
1565			.unwrap();
1566
1567		tokio::time::sleep(std::time::Duration::from_millis(800)).await;
1568
1569		let st = state.lock().unwrap();
1570		// Should have all tasks: 20:10 (immediate deps) then 30 (after deps)
1571		let str_vec = st.iter().map(|x| x.to_string()).collect::<Vec<String>>();
1572		assert_eq!(str_vec.join(":"), "1:1:1");
1573	}
1574
1575	#[tokio::test]
1576	pub async fn test_builder_backward_compatibility() {
1577		let task_store: Arc<dyn TaskStore<State>> = InMemoryTaskStore::new();
1578		let state: State = Arc::new(Mutex::new(Vec::new()));
1579		let scheduler = Scheduler::new(task_store);
1580		scheduler.start(state.clone());
1581		scheduler.register::<TestTask>().unwrap();
1582
1583		// Test that old API still works
1584		let _id1 = scheduler.add(TestTask::new(1)).await.unwrap();
1585
1586		// Test that new builder API works
1587		let _id2 = scheduler.task(TestTask::new(1)).now().await.unwrap();
1588
1589		tokio::time::sleep(std::time::Duration::from_millis(800)).await;
1590
1591		let st = state.lock().unwrap();
1592		// Both old and new API should have executed
1593		assert_eq!(st.len(), 2);
1594		let str_vec = st.iter().map(|x| x.to_string()).collect::<Vec<String>>();
1595		assert_eq!(str_vec.join(":"), "1:1");
1596	}
1597
1598	// ===== Phase 2: Integration Tests - Real-world scenarios =====
1599
1600	#[tokio::test]
1601	pub async fn test_builder_pipeline_scenario() {
1602		// Simulates: Task 1 -> Task 2 (depends on 1) -> Task 3 (depends on 2)
1603		let task_store: Arc<dyn TaskStore<State>> = InMemoryTaskStore::new();
1604		let state: State = Arc::new(Mutex::new(Vec::new()));
1605		let scheduler = Scheduler::new(task_store);
1606		scheduler.start(state.clone());
1607		scheduler.register::<TestTask>().unwrap();
1608
1609		// Stage 1: Create initial task
1610		let id1 = scheduler.task(TestTask::new(1)).key("stage-1").now().await.unwrap();
1611
1612		// Stage 2: Create task that depends on stage 1
1613		let id2 = scheduler.task(TestTask::new(1)).key("stage-2").after_task(id1).await.unwrap();
1614
1615		// Stage 3: Create task that depends on stage 2
1616		let _id3 = scheduler.task(TestTask::new(1)).key("stage-3").after_task(id2).await.unwrap();
1617
1618		// Wait for pipeline: 1(200ms) + 2(200ms) + 3(200ms) = 600ms
1619		tokio::time::sleep(std::time::Duration::from_millis(1200)).await;
1620
1621		let st = state.lock().unwrap();
1622		// Should execute in order: 1, 2, 3
1623		let str_vec = st.iter().map(|x| x.to_string()).collect::<Vec<String>>();
1624		assert_eq!(str_vec.join(":"), "1:1:1");
1625	}
1626
1627	#[tokio::test]
1628	pub async fn test_builder_multi_dependency_join() {
1629		// Simulates: Task 1 parallel with Task 2, then Task 3 waits for both
1630		let task_store: Arc<dyn TaskStore<State>> = InMemoryTaskStore::new();
1631		let state: State = Arc::new(Mutex::new(Vec::new()));
1632		let scheduler = Scheduler::new(task_store);
1633		scheduler.start(state.clone());
1634		scheduler.register::<TestTask>().unwrap();
1635
1636		// Parallel tasks
1637		let id1 = scheduler.task(TestTask::new(1)).now().await.unwrap();
1638		let id2 = scheduler.task(TestTask::new(1)).now().await.unwrap();
1639
1640		// Join task - waits for both
1641		let _id3 = scheduler
1642			.task(TestTask::new(1))
1643			.depend_on(vec![id1, id2])
1644			.schedule()
1645			.await
1646			.unwrap();
1647
1648		tokio::time::sleep(std::time::Duration::from_secs(1)).await;
1649
1650		let st = state.lock().unwrap();
1651		// 1 and 2 execute in parallel, then 3 executes after both
1652		let str_vec = st.iter().map(|x| x.to_string()).collect::<Vec<String>>();
1653		assert_eq!(str_vec.join(":"), "1:1:1");
1654	}
1655
1656	#[tokio::test]
1657	pub async fn test_builder_scheduled_task_with_dependencies() {
1658		// Simulates: Task depends on earlier task AND is scheduled for future time
1659		let task_store: Arc<dyn TaskStore<State>> = InMemoryTaskStore::new();
1660		let state: State = Arc::new(Mutex::new(Vec::new()));
1661		let scheduler = Scheduler::new(task_store);
1662		scheduler.start(state.clone());
1663		scheduler.register::<TestTask>().unwrap();
1664
1665		// Immediate task
1666		let dep_id = scheduler.task(TestTask::new(1)).now().await.unwrap();
1667
1668		// Task that waits for dependency AND scheduled delay
1669		let ts = Timestamp::from_now(1);
1670		let _task_id = scheduler
1671			.task(TestTask::new(1))
1672			.schedule_at(ts)
1673			.depend_on(vec![dep_id])
1674			.schedule()
1675			.await
1676			.unwrap();
1677
1678		// Wait for dependency to complete but before scheduled time
1679		tokio::time::sleep(std::time::Duration::from_millis(300)).await;
1680		{
1681			let st = state.lock().unwrap();
1682			assert_eq!(st.len(), 1); // Only dependency executed
1683		}
1684
1685		// Wait for scheduled time (1s total from initial schedule)
1686		tokio::time::sleep(std::time::Duration::from_millis(800)).await;
1687
1688		{
1689			let st = state.lock().unwrap();
1690			let str_vec = st.iter().map(|x| x.to_string()).collect::<Vec<String>>();
1691			assert_eq!(str_vec.join(":"), "1:1");
1692		}
1693	}
1694
1695	#[tokio::test]
1696	pub async fn test_builder_mixed_features() {
1697		// Simulates: Complex real-world scenario with key, scheduling, deps, and retry
1698		let task_store: Arc<dyn TaskStore<State>> = InMemoryTaskStore::new();
1699		let state: State = Arc::new(Mutex::new(Vec::new()));
1700		let scheduler = Scheduler::new(task_store);
1701		scheduler.start(state.clone());
1702		scheduler.register::<TestTask>().unwrap();
1703		scheduler.register::<FailingTask>().unwrap();
1704
1705		// Create initial tasks
1706		let id1 = scheduler.task(TestTask::new(1)).now().await.unwrap();
1707
1708		// Create complex task: scheduled + depends on id1 + has key
1709		let _id2 = scheduler
1710			.task(TestTask::new(1))
1711			.key("critical-task")
1712			.schedule_after(0)
1713			.depend_on(vec![id1])
1714			.schedule()
1715			.await
1716			.unwrap();
1717
1718		// Create task with retry
1719		let _id3 = scheduler
1720			.task(FailingTask::new(1, 0))  // Fails 0 times, succeeds immediately
1721			.key("retryable-task")
1722			.with_retry(RetryPolicy {
1723				wait_min_max: (1, 3600),
1724				times: 3,
1725			})
1726			.schedule()
1727			.await
1728			.unwrap();
1729
1730		// Wait for tasks: id1 (200ms) + id2 (200ms after id1) + id3 (200ms) = ~600ms
1731		tokio::time::sleep(std::time::Duration::from_millis(1200)).await;
1732
1733		let st = state.lock().unwrap();
1734		// All three tasks should execute
1735		let str_vec = st.iter().map(|x| x.to_string()).collect::<Vec<String>>();
1736		assert_eq!(str_vec.join(":"), "1:1:1");
1737	}
1738
1739	#[tokio::test]
1740	pub async fn test_builder_builder_reuse_not_possible() {
1741		// Verify that builder is consumed (moved) and can't be reused
1742		let task_store: Arc<dyn TaskStore<State>> = InMemoryTaskStore::new();
1743		let _state: State = Arc::new(Mutex::new(Vec::new()));
1744		let scheduler = Scheduler::new(task_store);
1745
1746		let task = TestTask::new(1);
1747		let builder = scheduler.task(task);
1748
1749		// This would not compile if uncommented (builder is moved):
1750		// let _id1 = builder.now().await.unwrap();
1751		// let _id2 = builder.now().await.unwrap();  // Error: use of moved value
1752
1753		// Can only call terminal method once
1754		let _id = builder.now().await.unwrap();
1755		// builder is now consumed, can't use again
1756
1757		// Test passes if it compiles (verifying move semantics)
1758	}
1759
1760	#[tokio::test]
1761	pub async fn test_builder_different_task_types() {
1762		// Test builder works with different task implementations
1763		let task_store: Arc<dyn TaskStore<State>> = InMemoryTaskStore::new();
1764		let state: State = Arc::new(Mutex::new(Vec::new()));
1765		let scheduler = Scheduler::new(task_store);
1766		scheduler.start(state.clone());
1767		scheduler.register::<TestTask>().unwrap();
1768		scheduler.register::<FailingTask>().unwrap();
1769
1770		// Mix of different task types
1771		let _id1 = scheduler.task(TestTask::new(1)).key("test-task").now().await.unwrap();
1772
1773		let _id2 = scheduler
1774			.task(FailingTask::new(1, 0))  // Won't fail
1775			.key("failing-task")
1776			.now()
1777			.await
1778			.unwrap();
1779
1780		let _id3 = scheduler.task(TestTask::new(1)).now().await.unwrap();
1781
1782		tokio::time::sleep(std::time::Duration::from_secs(1)).await;
1783
1784		let st = state.lock().unwrap();
1785		assert_eq!(st.len(), 3);
1786		let str_vec = st.iter().map(|x| x.to_string()).collect::<Vec<String>>();
1787		// All three tasks should execute
1788		assert_eq!(str_vec.join(":"), "1:1:1");
1789	}
1790
1791	// ===== Phase 3: Cron Placeholder Tests =====
1792	// These tests verify that cron methods compile and integrate
1793	// Actual cron functionality will be implemented in Phase 3
1794
1795	#[tokio::test]
1796	pub async fn test_builder_cron_placeholder_syntax() {
1797		// Verify cron placeholder methods compile and chain properly
1798		let task_store: Arc<dyn TaskStore<State>> = InMemoryTaskStore::new();
1799		let state: State = Arc::new(Mutex::new(Vec::new()));
1800		let scheduler = Scheduler::new(task_store);
1801		scheduler.start(state.clone());
1802		scheduler.register::<TestTask>().unwrap();
1803
1804		// Test that cron methods compile (they're no-ops in Phase 2)
1805		let task = TestTask::new(1);
1806		let _id = scheduler
1807			.task(task)
1808			.key("cron-task")
1809			.cron("0 9 * * *")  // 9 AM daily
1810			.schedule()
1811			.await
1812			.unwrap();
1813
1814		// Cron scheduling - task will execute at the next scheduled time
1815		// For cron "0 9 * * *", that's tomorrow at 9 AM, so task won't execute in this test
1816		// This test just verifies the methods compile and chain properly
1817		tokio::time::sleep(std::time::Duration::from_millis(500)).await;
1818
1819		let st = state.lock().unwrap();
1820		// Task is scheduled for future (9 AM), so it won't have executed yet
1821		// The important thing is that the cron methods compile and integrate
1822		assert_eq!(st.len(), 0); // Not executed yet since scheduled for future
1823	}
1824
1825	#[tokio::test]
1826	pub async fn test_builder_daily_at_placeholder() {
1827		// Verify daily_at placeholder compiles and integrates
1828		let task_store: Arc<dyn TaskStore<State>> = InMemoryTaskStore::new();
1829		let state: State = Arc::new(Mutex::new(Vec::new()));
1830		let scheduler = Scheduler::new(task_store);
1831		scheduler.start(state.clone());
1832		scheduler.register::<TestTask>().unwrap();
1833
1834		// Test that daily_at placeholder compiles
1835		let task = TestTask::new(1);
1836		let _id = scheduler
1837			.task(task)
1838			.key("daily-task")
1839			.daily_at(14, 30)  // 2:30 PM daily
1840			.schedule()
1841			.await
1842			.unwrap();
1843
1844		// Daily_at scheduling - task will execute at the specified time (2:30 PM daily)
1845		// Task is scheduled for future, so it won't execute in this test
1846		tokio::time::sleep(std::time::Duration::from_millis(500)).await;
1847
1848		let st = state.lock().unwrap();
1849		// Task is scheduled for future (2:30 PM), not executed yet
1850		// The important thing is that daily_at compiles and integrates properly
1851		assert_eq!(st.len(), 0);
1852	}
1853
1854	#[tokio::test]
1855	pub async fn test_builder_weekly_at_placeholder() {
1856		// Verify weekly_at placeholder compiles and integrates
1857		let task_store: Arc<dyn TaskStore<State>> = InMemoryTaskStore::new();
1858		let state: State = Arc::new(Mutex::new(Vec::new()));
1859		let scheduler = Scheduler::new(task_store);
1860		scheduler.start(state.clone());
1861		scheduler.register::<TestTask>().unwrap();
1862
1863		// Test that weekly_at placeholder compiles
1864		let task = TestTask::new(1);
1865		let _id = scheduler
1866			.task(task)
1867			.key("weekly-task")
1868			.weekly_at(1, 9, 0)  // Monday at 9 AM
1869			.schedule()
1870			.await
1871			.unwrap();
1872
1873		// Weekly_at scheduling - task will execute on Monday at 9 AM
1874		// Task is scheduled for future, so it won't execute in this test
1875		tokio::time::sleep(std::time::Duration::from_millis(500)).await;
1876
1877		let st = state.lock().unwrap();
1878		// Task is scheduled for future (Monday 9 AM), not executed yet
1879		// The important thing is that weekly_at compiles and integrates properly
1880		assert_eq!(st.len(), 0);
1881	}
1882
1883	#[tokio::test]
1884	pub async fn test_builder_cron_with_retry() {
1885		// Verify cron methods chain with retry (future combined usage)
1886		let task_store: Arc<dyn TaskStore<State>> = InMemoryTaskStore::new();
1887		let state: State = Arc::new(Mutex::new(Vec::new()));
1888		let scheduler = Scheduler::new(task_store);
1889		scheduler.start(state.clone());
1890		scheduler.register::<TestTask>().unwrap();
1891
1892		// Test future usage pattern: cron + retry
1893		let task = TestTask::new(1);
1894		let _id = scheduler
1895			.task(task)
1896			.key("reliable-scheduled-task")
1897			.daily_at(2, 0)  // 2 AM daily
1898			.with_retry(RetryPolicy {
1899				wait_min_max: (60, 3600),
1900				times: 5,
1901			})
1902			.schedule()
1903			.await
1904			.unwrap();
1905
1906		// Verify cron+retry chain compiles properly
1907		// Task is scheduled for 2 AM, so won't execute in this test
1908		tokio::time::sleep(std::time::Duration::from_millis(500)).await;
1909
1910		let st = state.lock().unwrap();
1911		// Task scheduled for future (2 AM), not executed yet
1912		// The important thing is that chaining cron + retry works
1913		assert_eq!(st.len(), 0);
1914	}
1915
1916	// ===== Cron Schedule Tests =====
1917
1918	#[test]
1919	fn test_cron_to_string() {
1920		// Test that to_cron_string returns the original expression
1921		let cron = CronSchedule::parse("*/5 * * * *").unwrap();
1922		assert_eq!(cron.to_cron_string(), "*/5 * * * *");
1923	}
1924
1925	#[tokio::test]
1926	pub async fn test_running_task_not_double_scheduled() {
1927		let _ = tracing_subscriber::fmt().try_init();
1928
1929		let task_store: Arc<dyn TaskStore<State>> = InMemoryTaskStore::new();
1930		let state: State = Arc::new(Mutex::new(Vec::new()));
1931		let scheduler = Scheduler::new(task_store);
1932		scheduler.start(state.clone());
1933		scheduler.register::<TestTask>().unwrap();
1934
1935		// Create a task
1936		let task = TestTask::new(5); // Takes 1 second (5 * 200ms)
1937		let task_id = scheduler.add(task.clone()).await.unwrap();
1938
1939		// Wait a bit for task to start running
1940		tokio::time::sleep(std::time::Duration::from_millis(100)).await;
1941
1942		// Verify task is in tasks_running
1943		{
1944			let running = scheduler.tasks_running.lock().unwrap();
1945			assert!(running.contains_key(&task_id), "Task should be in running queue");
1946		}
1947
1948		// Try to add the same task again via add_queue
1949		let task_meta = TaskMeta {
1950			task: task.clone(),
1951			next_at: Some(Timestamp::now()),
1952			deps: vec![],
1953			retry_count: 0,
1954			retry: None,
1955			cron: None,
1956		};
1957		let result = scheduler.add_queue(task_id, task_meta).await;
1958
1959		// Should succeed but not actually add to scheduled queue
1960		assert!(result.is_ok(), "add_queue should succeed");
1961
1962		// Verify task is NOT in tasks_scheduled (only in running)
1963		{
1964			let scheduled = scheduler.tasks_scheduled.lock().unwrap();
1965			let in_scheduled = scheduled.iter().any(|((_, id), _)| *id == task_id);
1966			assert!(!in_scheduled, "Task should NOT be in scheduled queue while running");
1967		}
1968
1969		// Wait for original task to complete
1970		tokio::time::sleep(std::time::Duration::from_secs(2)).await;
1971
1972		// Verify task completed
1973		let st = state.lock().unwrap();
1974		assert_eq!(st.len(), 1, "Only one task execution should have occurred");
1975		assert_eq!(st[0], 5);
1976	}
1977
1978	#[tokio::test]
1979	pub async fn test_running_task_metadata_updated() {
1980		let _ = tracing_subscriber::fmt().try_init();
1981
1982		let task_store: Arc<dyn TaskStore<State>> = InMemoryTaskStore::new();
1983		let state: State = Arc::new(Mutex::new(Vec::new()));
1984		let scheduler = Scheduler::new(task_store);
1985		scheduler.start(state.clone());
1986		scheduler.register::<TestTask>().unwrap();
1987
1988		// Create a task without cron
1989		let task = TestTask::new(5); // Takes 1 second (5 * 200ms)
1990		let task_id = scheduler.add(task.clone()).await.unwrap();
1991
1992		// Wait a bit for task to start running
1993		tokio::time::sleep(std::time::Duration::from_millis(100)).await;
1994
1995		// Verify task is running and has no cron
1996		{
1997			let running = scheduler.tasks_running.lock().unwrap();
1998			let meta = running.get(&task_id).expect("Task should be running");
1999			assert!(meta.cron.is_none(), "Task should have no cron initially");
2000		}
2001
2002		// Try to update the running task with a cron schedule
2003		let cron = CronSchedule::parse("*/5 * * * *").unwrap();
2004		let task_meta_with_cron = TaskMeta {
2005			task: task.clone(),
2006			next_at: Some(Timestamp::now()),
2007			deps: vec![],
2008			retry_count: 0,
2009			retry: None,
2010			cron: Some(cron.clone()),
2011		};
2012		let result = scheduler.add_queue(task_id, task_meta_with_cron).await;
2013
2014		// Should succeed
2015		assert!(result.is_ok(), "add_queue should succeed");
2016
2017		// Verify the running task now has the cron schedule
2018		{
2019			let running = scheduler.tasks_running.lock().unwrap();
2020			let meta = running.get(&task_id).expect("Task should still be running");
2021			assert!(meta.cron.is_some(), "Task should now have cron after update");
2022		}
2023
2024		// Wait for task to complete
2025		tokio::time::sleep(std::time::Duration::from_secs(2)).await;
2026	}
2027}
2028
2029// vim: ts=4