1use std::collections::HashMap;
35use std::sync::atomic::{AtomicU64, Ordering};
36use std::sync::{Arc, RwLock};
37
38use asupersync::runtime::{RuntimeBuilder, RuntimeHandle};
39use asupersync::{Budget, CancelKind, Cx};
40use fastmcp_core::logging::{debug, info, targets, warn};
41use fastmcp_core::{McpError, McpResult};
42use fastmcp_protocol::{
43 JsonRpcRequest, TaskId, TaskInfo, TaskResult, TaskStatus, TaskStatusNotificationParams,
44};
45
46pub type TaskNotificationSender = Arc<dyn Fn(JsonRpcRequest) + Send + Sync>;
48
49pub type TaskHandler = Box<dyn Fn(&Cx, serde_json::Value) -> TaskFuture + Send + Sync + 'static>;
53
54pub type TaskFuture = std::pin::Pin<
56 Box<dyn std::future::Future<Output = McpResult<serde_json::Value>> + Send + 'static>,
57>;
58
59struct TaskState {
61 info: TaskInfo,
63 cancel_requested: bool,
65 result: Option<TaskResult>,
67 cx: Cx,
69}
70
71fn can_transition(from: TaskStatus, to: TaskStatus) -> bool {
72 matches!(
73 (from, to),
74 (
75 TaskStatus::Pending,
76 TaskStatus::Running | TaskStatus::Failed | TaskStatus::Cancelled
77 ) | (
78 TaskStatus::Running,
79 TaskStatus::Completed | TaskStatus::Failed | TaskStatus::Cancelled
80 )
81 )
82}
83
84fn transition_state(state: &mut TaskState, to: TaskStatus) -> bool {
85 let from = state.info.status;
86 if from == to {
87 return true;
88 }
89 if !can_transition(from, to) {
90 warn!(
91 target: targets::SERVER,
92 "task {} invalid transition {:?} -> {:?}",
93 state.info.id,
94 from,
95 to
96 );
97 return false;
98 }
99
100 state.info.status = to;
101 let now = chrono::Utc::now().to_rfc3339();
102 match to {
103 TaskStatus::Running => {
104 state.info.started_at = Some(now.clone());
105 }
106 TaskStatus::Completed | TaskStatus::Failed | TaskStatus::Cancelled => {
107 state.info.completed_at = Some(now.clone());
108 }
109 TaskStatus::Pending => {}
110 }
111
112 info!(
113 target: targets::SERVER,
114 "task {} status {:?} -> {:?} at {}",
115 state.info.id,
116 from,
117 to,
118 now
119 );
120 true
121}
122
123fn mark_task_failed_snapshot(
124 tasks: &Arc<RwLock<HashMap<TaskId, TaskState>>>,
125 task_id: &TaskId,
126 error_msg: String,
127 lock_context: &'static str,
128) -> Option<TaskStatusSnapshot> {
129 let mut tasks_guard = tasks.write().unwrap_or_else(|poisoned| {
130 warn!(
131 target: targets::SERVER,
132 "tasks lock poisoned in {}, recovering",
133 lock_context
134 );
135 poisoned.into_inner()
136 });
137
138 let state = tasks_guard.get_mut(task_id)?;
139 if state.cancel_requested || !transition_state(state, TaskStatus::Failed) {
140 return None;
141 }
142
143 state.info.error = Some(error_msg.clone());
144 state.result = Some(TaskResult {
145 id: task_id.clone(),
146 success: false,
147 data: None,
148 error: Some(error_msg),
149 });
150 Some(TaskStatusSnapshot::from(state))
151}
152
153fn build_runtime_handle() -> Option<RuntimeHandle> {
154 match RuntimeBuilder::multi_thread().build() {
155 Ok(runtime) => Some(runtime.handle()),
156 Err(multi_err) => {
157 warn!(
158 target: targets::SERVER,
159 "failed to initialize multi-thread runtime for tasks: {}; attempting current-thread fallback",
160 multi_err
161 );
162 match RuntimeBuilder::current_thread().build() {
163 Ok(runtime) => Some(runtime.handle()),
164 Err(single_err) => {
165 warn!(
166 target: targets::SERVER,
167 "failed to initialize current-thread runtime fallback for tasks: {}",
168 single_err
169 );
170 None
171 }
172 }
173 }
174 }
175}
176
177pub struct TaskManager {
182 tasks: Arc<RwLock<HashMap<TaskId, TaskState>>>,
184 handlers: Arc<RwLock<HashMap<String, TaskHandler>>>,
186 task_counter: AtomicU64,
188 list_changed_notifications: bool,
190 runtime: Option<RuntimeHandle>,
192 auto_execute: bool,
194 notification_sender: Arc<RwLock<Option<TaskNotificationSender>>>,
196}
197
198impl TaskManager {
199 #[must_use]
201 pub fn new() -> Self {
202 let runtime = build_runtime_handle();
203 if runtime.is_none() {
204 warn!(
205 target: targets::SERVER,
206 "TaskManager runtime unavailable; auto-executed tasks will fail until runtime becomes available"
207 );
208 }
209 Self {
210 tasks: Arc::new(RwLock::new(HashMap::new())),
211 handlers: Arc::new(RwLock::new(HashMap::new())),
212 task_counter: AtomicU64::new(0),
213 list_changed_notifications: false,
214 runtime,
215 auto_execute: true,
216 notification_sender: Arc::new(RwLock::new(None)),
217 }
218 }
219
220 #[must_use]
222 pub fn with_list_changed_notifications() -> Self {
223 Self {
224 list_changed_notifications: true,
225 ..Self::new()
226 }
227 }
228
229 #[must_use]
233 pub fn new_for_testing() -> Self {
234 let mut manager = Self::new();
235 manager.auto_execute = false;
236 manager
237 }
238
239 #[must_use]
241 pub fn into_shared(self) -> SharedTaskManager {
242 Arc::new(self)
243 }
244
245 #[must_use]
247 pub fn has_list_changed_notifications(&self) -> bool {
248 self.list_changed_notifications
249 }
250
251 pub fn set_notification_sender(&self, sender: TaskNotificationSender) {
253 let mut guard = self.notification_sender.write().unwrap_or_else(|poisoned| {
254 warn!(target: targets::SERVER, "notification sender lock poisoned, recovering");
255 poisoned.into_inner()
256 });
257 *guard = Some(sender);
258 }
259
260 pub fn register_handler<F, Fut>(&self, task_type: impl Into<String>, handler: F)
264 where
265 F: Fn(&Cx, serde_json::Value) -> Fut + Send + Sync + 'static,
266 Fut: std::future::Future<Output = McpResult<serde_json::Value>> + Send + 'static,
267 {
268 let task_type = task_type.into();
269 let boxed_handler: TaskHandler = Box::new(move |cx, params| Box::pin(handler(cx, params)));
270
271 let mut handlers = self.handlers.write().unwrap_or_else(|poisoned| {
272 warn!(target: targets::SERVER, "handlers lock poisoned, recovering");
273 poisoned.into_inner()
274 });
275 handlers.insert(task_type, boxed_handler);
276 }
277
278 pub fn submit(
283 &self,
284 _cx: &Cx,
285 task_type: impl Into<String>,
286 params: Option<serde_json::Value>,
287 ) -> McpResult<TaskId> {
288 let task_type = task_type.into();
289
290 {
292 let handlers = self.handlers.read().unwrap_or_else(|poisoned| {
293 warn!(target: targets::SERVER, "handlers lock poisoned, recovering");
294 poisoned.into_inner()
295 });
296 if !handlers.contains_key(&task_type) {
297 return Err(McpError::invalid_params(format!(
298 "Unknown task type: {task_type}"
299 )));
300 }
301 }
302
303 let counter = self.task_counter.fetch_add(1, Ordering::SeqCst);
305 let task_id = TaskId::from_string(format!("task-{counter:08x}"));
306
307 let now = chrono::Utc::now().to_rfc3339();
309 let task_cx = Cx::for_request_with_budget(Budget::INFINITE);
310 let info = TaskInfo {
311 id: task_id.clone(),
312 task_type: task_type.clone(),
313 status: TaskStatus::Pending,
314 progress: None,
315 message: None,
316 created_at: now,
317 started_at: None,
318 completed_at: None,
319 error: None,
320 };
321
322 let info_snapshot = info.clone();
323
324 let state = TaskState {
326 info,
327 cancel_requested: false,
328 result: None,
329 cx: task_cx.clone(),
330 };
331
332 {
333 let mut tasks = self.tasks.write().unwrap_or_else(|poisoned| {
334 warn!(target: targets::SERVER, "tasks lock poisoned, recovering");
335 poisoned.into_inner()
336 });
337 tasks.insert(task_id.clone(), state);
338 }
339
340 self.notify_status(info_snapshot, None);
341
342 if self.auto_execute {
343 let params = params.unwrap_or_else(|| serde_json::json!({}));
344 self.spawn_task(task_id.clone(), task_type, task_cx, params);
345 }
346
347 Ok(task_id)
348 }
349
350 #[allow(clippy::too_many_lines)]
351 fn spawn_task(
352 &self,
353 task_id: TaskId,
354 task_type: String,
355 task_cx: Cx,
356 params: serde_json::Value,
357 ) {
358 let Some(runtime) = self.runtime.clone() else {
359 let failure_snapshot = mark_task_failed_snapshot(
360 &self.tasks,
361 &task_id,
362 "Task runtime unavailable".to_string(),
363 "spawn_task runtime unavailable",
364 );
365 self.notify_snapshot(failure_snapshot);
366 return;
367 };
368
369 let tasks = Arc::clone(&self.tasks);
370 let handlers = Arc::clone(&self.handlers);
371 let notification_sender = Arc::clone(&self.notification_sender);
372 let scheduled_task_id = task_id.clone();
373 let scheduling = runtime.try_spawn(async move {
374 let running_snapshot = {
375 let mut tasks_guard = tasks.write().unwrap_or_else(|poisoned| {
376 warn!(target: targets::SERVER, "tasks lock poisoned in spawn_task, recovering");
377 poisoned.into_inner()
378 });
379 match tasks_guard.get_mut(&task_id) {
380 Some(state) => {
381 if state.cancel_requested || !transition_state(state, TaskStatus::Running) {
382 None
383 } else {
384 Some(TaskStatusSnapshot::from(state))
385 }
386 }
387 None => None,
388 }
389 };
390
391 let should_start = running_snapshot.is_some();
392 notify_snapshot(¬ification_sender, running_snapshot);
393
394 if !should_start {
395 return;
396 }
397
398 let task_future = {
399 let handlers_guard = handlers.read().unwrap_or_else(|poisoned| {
400 warn!(target: targets::SERVER, "handlers lock poisoned in spawn_task, recovering");
401 poisoned.into_inner()
402 });
403 let Some(handler) = handlers_guard.get(&task_type) else {
404 let failure_snapshot = mark_task_failed_snapshot(
405 &tasks,
406 &task_id,
407 format!("Unknown task type: {task_type}"),
408 "spawn_task failure",
409 );
410 notify_snapshot(¬ification_sender, failure_snapshot);
411 return;
412 };
413 (handler)(&task_cx, params)
414 };
415
416 let result = task_future.await;
417
418 let completion_snapshot = {
419 let mut tasks_guard = tasks.write().unwrap_or_else(|poisoned| {
420 warn!(target: targets::SERVER, "tasks lock poisoned in spawn_task completion, recovering");
421 poisoned.into_inner()
422 });
423 match tasks_guard.get_mut(&task_id) {
424 Some(state) => {
425 if state.cancel_requested {
426 None
427 } else {
428 let mut snapshot = None;
429 match result {
430 Ok(data) => {
431 if transition_state(state, TaskStatus::Completed) {
432 state.info.progress = Some(1.0);
433 state.result = Some(TaskResult {
434 id: task_id.clone(),
435 success: true,
436 data: Some(data),
437 error: None,
438 });
439 snapshot = Some(TaskStatusSnapshot::from(state));
440 }
441 }
442 Err(err) => {
443 let error_msg = err.message;
444 if transition_state(state, TaskStatus::Failed) {
445 state.info.error = Some(error_msg.clone());
446 state.result = Some(TaskResult {
447 id: task_id.clone(),
448 success: false,
449 data: None,
450 error: Some(error_msg),
451 });
452 snapshot = Some(TaskStatusSnapshot::from(state));
453 }
454 }
455 }
456 snapshot
457 }
458 }
459 None => None,
460 }
461 };
462
463 notify_snapshot(¬ification_sender, completion_snapshot);
464 });
465
466 if let Err(err) = scheduling {
467 warn!(
468 target: targets::SERVER,
469 "failed to schedule task {}: {}",
470 scheduled_task_id,
471 err
472 );
473 let failure_snapshot = mark_task_failed_snapshot(
474 &self.tasks,
475 &scheduled_task_id,
476 format!("Failed to schedule task: {err}"),
477 "spawn_task scheduling",
478 );
479 self.notify_snapshot(failure_snapshot);
480 }
481 }
482
483 pub fn start_task(&self, task_id: &TaskId) -> McpResult<()> {
487 let snapshot = {
488 let mut tasks = self.tasks.write().unwrap_or_else(|poisoned| {
489 warn!(target: targets::SERVER, "tasks lock poisoned in start_task, recovering");
490 poisoned.into_inner()
491 });
492 let state = tasks
493 .get_mut(task_id)
494 .ok_or_else(|| McpError::invalid_params(format!("Task not found: {task_id}")))?;
495
496 if state.info.status != TaskStatus::Pending {
497 return Err(McpError::invalid_params(format!(
498 "Task {task_id} is not pending"
499 )));
500 }
501
502 if !transition_state(state, TaskStatus::Running) {
503 return Err(McpError::invalid_params(format!(
504 "Task {task_id} cannot transition to running"
505 )));
506 }
507 Some(TaskStatusSnapshot::from(state))
508 };
509
510 self.notify_snapshot(snapshot);
511 Ok(())
512 }
513
514 pub fn update_progress(&self, task_id: &TaskId, progress: f64, message: Option<String>) {
516 let snapshot = {
517 let mut tasks = self.tasks.write().unwrap_or_else(|poisoned| {
518 warn!(target: targets::SERVER, "tasks lock poisoned in update_progress, recovering");
519 poisoned.into_inner()
520 });
521 if let Some(state) = tasks.get_mut(task_id) {
522 if state.info.status != TaskStatus::Running {
523 debug!(
524 target: targets::SERVER,
525 "task {} progress update ignored in state {:?}",
526 task_id,
527 state.info.status
528 );
529 return;
530 }
531 state.info.progress = Some(progress.clamp(0.0, 1.0));
532 state.info.message = message;
533 Some(TaskStatusSnapshot::from(state))
534 } else {
535 None
536 }
537 };
538
539 self.notify_snapshot(snapshot);
540 }
541
542 pub fn complete_task(&self, task_id: &TaskId, data: serde_json::Value) {
544 let snapshot = {
545 let mut tasks = self.tasks.write().unwrap_or_else(|poisoned| {
546 warn!(target: targets::SERVER, "tasks lock poisoned in complete_task, recovering");
547 poisoned.into_inner()
548 });
549 if let Some(state) = tasks.get_mut(task_id) {
550 if !transition_state(state, TaskStatus::Completed) {
551 return;
552 }
553 state.info.progress = Some(1.0);
554 state.result = Some(TaskResult {
555 id: task_id.clone(),
556 success: true,
557 data: Some(data),
558 error: None,
559 });
560 Some(TaskStatusSnapshot::from(state))
561 } else {
562 None
563 }
564 };
565
566 self.notify_snapshot(snapshot);
567 }
568
569 pub fn fail_task(&self, task_id: &TaskId, error: impl Into<String>) {
571 let error = error.into();
572 let snapshot = {
573 let mut tasks = self.tasks.write().unwrap_or_else(|poisoned| {
574 warn!(target: targets::SERVER, "tasks lock poisoned in fail_task, recovering");
575 poisoned.into_inner()
576 });
577 if let Some(state) = tasks.get_mut(task_id) {
578 if !transition_state(state, TaskStatus::Failed) {
579 return;
580 }
581 state.info.error = Some(error.clone());
582 state.result = Some(TaskResult {
583 id: task_id.clone(),
584 success: false,
585 data: None,
586 error: Some(error),
587 });
588 Some(TaskStatusSnapshot::from(state))
589 } else {
590 None
591 }
592 };
593
594 self.notify_snapshot(snapshot);
595 }
596
597 #[must_use]
599 pub fn get_info(&self, task_id: &TaskId) -> Option<TaskInfo> {
600 let tasks = self.tasks.read().unwrap_or_else(|poisoned| {
601 warn!(target: targets::SERVER, "tasks lock poisoned in get_info, recovering");
602 poisoned.into_inner()
603 });
604 tasks.get(task_id).map(|s| s.info.clone())
605 }
606
607 #[must_use]
609 pub fn get_result(&self, task_id: &TaskId) -> Option<TaskResult> {
610 let tasks = self.tasks.read().unwrap_or_else(|poisoned| {
611 warn!(target: targets::SERVER, "tasks lock poisoned in get_result, recovering");
612 poisoned.into_inner()
613 });
614 tasks.get(task_id).and_then(|s| s.result.clone())
615 }
616
617 #[must_use]
619 pub fn list_tasks(&self, status_filter: Option<TaskStatus>) -> Vec<TaskInfo> {
620 let tasks = self.tasks.read().unwrap_or_else(|poisoned| {
621 warn!(target: targets::SERVER, "tasks lock poisoned in list_tasks, recovering");
622 poisoned.into_inner()
623 });
624 tasks
625 .values()
626 .filter(|s| status_filter.is_none_or(|f| s.info.status == f))
627 .map(|s| s.info.clone())
628 .collect()
629 }
630
631 pub fn cancel(&self, task_id: &TaskId, reason: Option<String>) -> McpResult<TaskInfo> {
636 let snapshot = {
637 let mut tasks = self.tasks.write().unwrap_or_else(|poisoned| {
638 warn!(target: targets::SERVER, "tasks lock poisoned in cancel, recovering");
639 poisoned.into_inner()
640 });
641 let state = tasks
642 .get_mut(task_id)
643 .ok_or_else(|| McpError::invalid_params(format!("Task not found: {task_id}")))?;
644
645 if state.info.status.is_terminal() {
647 return Err(McpError::invalid_params(format!(
648 "Task {task_id} is already in terminal state: {:?}",
649 state.info.status
650 )));
651 }
652
653 if !transition_state(state, TaskStatus::Cancelled) {
654 return Err(McpError::invalid_params(format!(
655 "Task {task_id} cannot be cancelled from {:?}",
656 state.info.status
657 )));
658 }
659
660 state.cancel_requested = true;
661
662 state.cx.cancel_with(CancelKind::User, None);
663 if !state.cx.is_cancel_requested() {
664 warn!(
665 target: targets::SERVER,
666 "task {} cancel signal not observed on context",
667 task_id
668 );
669 }
670
671 let error_msg = reason.unwrap_or_else(|| "Cancelled by request".to_string());
672 state.info.error = Some(error_msg.clone());
673 state.result = Some(TaskResult {
674 id: task_id.clone(),
675 success: false,
676 data: None,
677 error: Some(error_msg),
678 });
679
680 let snapshot = TaskStatusSnapshot::from(state);
681 (snapshot, state.info.clone())
682 };
683
684 let (snapshot, info) = snapshot;
685 self.notify_snapshot(Some(snapshot));
686 Ok(info)
687 }
688
689 #[must_use]
691 pub fn is_cancel_requested(&self, task_id: &TaskId) -> bool {
692 let tasks = self.tasks.read().unwrap_or_else(|poisoned| {
693 warn!(target: targets::SERVER, "tasks lock poisoned in is_cancel_requested, recovering");
694 poisoned.into_inner()
695 });
696 tasks.get(task_id).is_some_and(|s| s.cancel_requested)
697 }
698
699 #[must_use]
701 pub fn active_count(&self) -> usize {
702 let tasks = self.tasks.read().unwrap_or_else(|poisoned| {
703 warn!(target: targets::SERVER, "tasks lock poisoned in active_count, recovering");
704 poisoned.into_inner()
705 });
706 tasks.values().filter(|s| s.info.status.is_active()).count()
707 }
708
709 #[must_use]
711 pub fn total_count(&self) -> usize {
712 let tasks = self.tasks.read().unwrap_or_else(|poisoned| {
713 warn!(target: targets::SERVER, "tasks lock poisoned in total_count, recovering");
714 poisoned.into_inner()
715 });
716 tasks.len()
717 }
718
719 pub fn cleanup_completed(&self, max_age: std::time::Duration) {
723 let cutoff = chrono::Utc::now() - chrono::Duration::from_std(max_age).unwrap_or_default();
724
725 let mut tasks = self.tasks.write().unwrap_or_else(|poisoned| {
726 warn!(target: targets::SERVER, "tasks lock poisoned in cleanup_completed, recovering");
727 poisoned.into_inner()
728 });
729 tasks.retain(|_, state| {
730 if state.info.status.is_active() {
732 return true;
733 }
734
735 if let Some(ref completed) = state.info.completed_at {
737 if let Ok(parsed) = chrono::DateTime::parse_from_rfc3339(completed) {
738 return parsed.with_timezone(&chrono::Utc) > cutoff;
739 }
740 return true;
741 }
742
743 true
744 });
745 }
746
747 fn notify_snapshot(&self, snapshot: Option<TaskStatusSnapshot>) {
748 if let Some(snapshot) = snapshot {
749 self.notify_status(snapshot.info, snapshot.result);
750 }
751 }
752
753 fn notify_status(&self, info: TaskInfo, result: Option<TaskResult>) {
754 let sender = {
755 let guard = self.notification_sender.read().unwrap_or_else(|poisoned| {
756 warn!(target: targets::SERVER, "notification sender lock poisoned in notify_status, recovering");
757 poisoned.into_inner()
758 });
759 guard.clone()
760 };
761 let Some(sender) = sender else {
762 return;
763 };
764
765 let params = TaskStatusNotificationParams {
766 id: info.id.clone(),
767 status: info.status,
768 progress: info.progress,
769 message: info.message.clone(),
770 error: info.error.clone(),
771 result,
772 };
773 let payload = match serde_json::to_value(params) {
774 Ok(value) => value,
775 Err(err) => {
776 warn!(
777 target: targets::SERVER,
778 "failed to serialize task status notification: {}",
779 err
780 );
781 return;
782 }
783 };
784 sender(JsonRpcRequest::notification(
785 "notifications/tasks/status",
786 Some(payload),
787 ));
788 }
789}
790
791#[derive(Debug, Clone)]
792struct TaskStatusSnapshot {
793 info: TaskInfo,
794 result: Option<TaskResult>,
795}
796
797impl TaskStatusSnapshot {
798 fn from(state: &TaskState) -> Self {
799 Self {
800 info: state.info.clone(),
801 result: state.result.clone(),
802 }
803 }
804}
805
806fn notify_snapshot(
807 sender: &Arc<RwLock<Option<TaskNotificationSender>>>,
808 snapshot: Option<TaskStatusSnapshot>,
809) {
810 let Some(snapshot) = snapshot else {
811 return;
812 };
813 let sender = {
814 let guard = sender.read().unwrap_or_else(|poisoned| {
815 warn!(target: targets::SERVER, "notification sender lock poisoned in notify_snapshot, recovering");
816 poisoned.into_inner()
817 });
818 guard.clone()
819 };
820 let Some(sender) = sender else {
821 return;
822 };
823 let params = TaskStatusNotificationParams {
824 id: snapshot.info.id.clone(),
825 status: snapshot.info.status,
826 progress: snapshot.info.progress,
827 message: snapshot.info.message.clone(),
828 error: snapshot.info.error.clone(),
829 result: snapshot.result,
830 };
831 let payload = match serde_json::to_value(params) {
832 Ok(value) => value,
833 Err(err) => {
834 warn!(
835 target: targets::SERVER,
836 "failed to serialize task status notification: {}",
837 err
838 );
839 return;
840 }
841 };
842 sender(JsonRpcRequest::notification(
843 "notifications/tasks/status",
844 Some(payload),
845 ));
846}
847
848impl Default for TaskManager {
849 fn default() -> Self {
850 Self::new()
851 }
852}
853
854impl std::fmt::Debug for TaskManager {
855 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
856 let task_count = self
858 .tasks
859 .read()
860 .map(|g| g.len())
861 .unwrap_or_else(|poisoned| poisoned.into_inner().len());
862 let handler_count = self
863 .handlers
864 .read()
865 .map(|g| g.len())
866 .unwrap_or_else(|poisoned| poisoned.into_inner().len());
867 f.debug_struct("TaskManager")
868 .field("task_count", &task_count)
869 .field("handler_count", &handler_count)
870 .field("task_counter", &self.task_counter.load(Ordering::SeqCst))
871 .field(
872 "list_changed_notifications",
873 &self.list_changed_notifications,
874 )
875 .field("auto_execute", &self.auto_execute)
876 .finish_non_exhaustive()
877 }
878}
879
880pub type SharedTaskManager = Arc<TaskManager>;
882
883#[cfg(test)]
884mod tests {
885 use super::*;
886 use std::sync::Arc;
887 use std::thread;
888 use std::time::Duration;
889
890 #[test]
891 fn test_task_manager_creation() {
892 let manager = TaskManager::new();
893 assert_eq!(manager.total_count(), 0);
894 assert_eq!(manager.active_count(), 0);
895 assert!(!manager.has_list_changed_notifications());
896 }
897
898 #[test]
899 fn test_task_manager_with_notifications() {
900 let manager = TaskManager::with_list_changed_notifications();
901 assert!(manager.has_list_changed_notifications());
902 }
903
904 #[test]
905 fn test_register_handler() {
906 let manager = TaskManager::new();
907
908 manager.register_handler("test_task", |_cx, _params| async {
909 Ok(serde_json::json!({}))
910 });
911
912 let cx = Cx::for_testing();
914 let result = manager.submit(&cx, "test_task", None);
915 assert!(result.is_ok());
916 }
917
918 #[test]
919 fn test_submit_auto_execute_fails_when_runtime_unavailable() {
920 let mut manager = TaskManager::new_for_testing();
921 manager.auto_execute = true;
922 manager.runtime = None;
923
924 manager.register_handler("test_task", |_cx, _params| async {
925 Ok(serde_json::json!({}))
926 });
927
928 let cx = Cx::for_testing();
929 let task_id = manager.submit(&cx, "test_task", None).unwrap();
930
931 let info = manager.get_info(&task_id).unwrap();
932 assert_eq!(info.status, TaskStatus::Failed);
933 assert_eq!(info.error.as_deref(), Some("Task runtime unavailable"));
934
935 let result = manager.get_result(&task_id).unwrap();
936 assert!(!result.success);
937 assert_eq!(result.error.as_deref(), Some("Task runtime unavailable"));
938 }
939
940 #[test]
941 fn test_submit_unknown_task_type() {
942 let manager = TaskManager::new();
943 let cx = Cx::for_testing();
944
945 let result = manager.submit(&cx, "unknown_task", None);
946 assert!(result.is_err());
947 }
948
949 #[test]
950 fn test_task_lifecycle() {
951 let manager = TaskManager::new_for_testing();
952 let cx = Cx::for_testing();
953
954 manager.register_handler("test", |_cx, _params| async {
955 Ok(serde_json::json!({"done": true}))
956 });
957
958 let task_id = manager.submit(&cx, "test", None).unwrap();
960
961 let info = manager.get_info(&task_id).unwrap();
963 assert_eq!(info.status, TaskStatus::Pending);
964 assert!(info.started_at.is_none());
965
966 manager.start_task(&task_id).unwrap();
968 let info = manager.get_info(&task_id).unwrap();
969 assert_eq!(info.status, TaskStatus::Running);
970 assert!(info.started_at.is_some());
971
972 manager.update_progress(&task_id, 0.5, Some("Halfway done".into()));
974 let info = manager.get_info(&task_id).unwrap();
975 assert_eq!(info.progress, Some(0.5));
976 assert_eq!(info.message, Some("Halfway done".into()));
977
978 manager.complete_task(&task_id, serde_json::json!({"result": 42}));
980 let info = manager.get_info(&task_id).unwrap();
981 assert_eq!(info.status, TaskStatus::Completed);
982 assert!(info.completed_at.is_some());
983
984 let result = manager.get_result(&task_id).unwrap();
986 assert!(result.success);
987 assert_eq!(result.data, Some(serde_json::json!({"result": 42})));
988 }
989
990 #[test]
991 fn test_task_failure() {
992 let manager = TaskManager::new_for_testing();
993 let cx = Cx::for_testing();
994
995 manager.register_handler("fail_test", |_cx, _params| async {
996 Ok(serde_json::json!({}))
997 });
998
999 let task_id = manager.submit(&cx, "fail_test", None).unwrap();
1000 manager.start_task(&task_id).unwrap();
1001 manager.fail_task(&task_id, "Something went wrong");
1002
1003 let info = manager.get_info(&task_id).unwrap();
1004 assert_eq!(info.status, TaskStatus::Failed);
1005 assert_eq!(info.error, Some("Something went wrong".into()));
1006
1007 let result = manager.get_result(&task_id).unwrap();
1008 assert!(!result.success);
1009 assert_eq!(result.error, Some("Something went wrong".into()));
1010 }
1011
1012 #[test]
1013 fn test_task_cancellation() {
1014 let manager = TaskManager::new_for_testing();
1015 let cx = Cx::for_testing();
1016
1017 manager.register_handler("cancel_test", |_cx, _params| async {
1018 Ok(serde_json::json!({}))
1019 });
1020
1021 let task_id = manager.submit(&cx, "cancel_test", None).unwrap();
1022 manager.start_task(&task_id).unwrap();
1023
1024 let info = manager
1026 .cancel(&task_id, Some("User cancelled".into()))
1027 .unwrap();
1028 assert_eq!(info.status, TaskStatus::Cancelled);
1029
1030 assert!(manager.is_cancel_requested(&task_id));
1032
1033 let result = manager.cancel(&task_id, None);
1035 assert!(result.is_err());
1036 }
1037
1038 #[test]
1039 fn test_list_tasks() {
1040 let manager = TaskManager::new_for_testing();
1041 let cx = Cx::for_testing();
1042
1043 manager.register_handler("list_test", |_cx, _params| async {
1044 Ok(serde_json::json!({}))
1045 });
1046
1047 let task1 = manager.submit(&cx, "list_test", None).unwrap();
1048 let task2 = manager.submit(&cx, "list_test", None).unwrap();
1049 let _task3 = manager.submit(&cx, "list_test", None).unwrap();
1050
1051 assert_eq!(manager.list_tasks(Some(TaskStatus::Pending)).len(), 3);
1053 assert_eq!(manager.list_tasks(Some(TaskStatus::Running)).len(), 0);
1054
1055 manager.start_task(&task1).unwrap();
1057 assert_eq!(manager.list_tasks(Some(TaskStatus::Pending)).len(), 2);
1058 assert_eq!(manager.list_tasks(Some(TaskStatus::Running)).len(), 1);
1059
1060 manager.start_task(&task2).unwrap();
1062 manager.complete_task(&task2, serde_json::json!({}));
1063 assert_eq!(manager.list_tasks(Some(TaskStatus::Completed)).len(), 1);
1064
1065 assert_eq!(manager.list_tasks(None).len(), 3);
1067 }
1068
1069 #[test]
1070 fn test_active_count() {
1071 let manager = TaskManager::new_for_testing();
1072 let cx = Cx::for_testing();
1073
1074 manager.register_handler("count_test", |_cx, _params| async {
1075 Ok(serde_json::json!({}))
1076 });
1077
1078 let task1 = manager.submit(&cx, "count_test", None).unwrap();
1079 let task2 = manager.submit(&cx, "count_test", None).unwrap();
1080
1081 assert_eq!(manager.active_count(), 2);
1082 assert_eq!(manager.total_count(), 2);
1083
1084 manager.start_task(&task1).unwrap();
1085 assert_eq!(manager.active_count(), 2);
1086
1087 manager.complete_task(&task1, serde_json::json!({}));
1088 assert_eq!(manager.active_count(), 1);
1089
1090 manager.cancel(&task2, None).unwrap();
1091 assert_eq!(manager.active_count(), 0);
1092 assert_eq!(manager.total_count(), 2);
1093 }
1094
1095 #[test]
1096 fn test_progress_clamping() {
1097 let manager = TaskManager::new_for_testing();
1098 let cx = Cx::for_testing();
1099
1100 manager.register_handler("clamp_test", |_cx, _params| async {
1101 Ok(serde_json::json!({}))
1102 });
1103
1104 let task_id = manager.submit(&cx, "clamp_test", None).unwrap();
1105 manager.start_task(&task_id).unwrap();
1106
1107 manager.update_progress(&task_id, -0.5, None);
1109 assert_eq!(manager.get_info(&task_id).unwrap().progress, Some(0.0));
1110
1111 manager.update_progress(&task_id, 1.5, None);
1112 assert_eq!(manager.get_info(&task_id).unwrap().progress, Some(1.0));
1113
1114 manager.update_progress(&task_id, 0.75, None);
1115 assert_eq!(manager.get_info(&task_id).unwrap().progress, Some(0.75));
1116 }
1117
1118 #[test]
1119 fn test_invalid_transition_rejected() {
1120 let manager = TaskManager::new_for_testing();
1121 let cx = Cx::for_testing();
1122
1123 manager.register_handler("transition_test", |_cx, _params| async {
1124 Ok(serde_json::json!({}))
1125 });
1126
1127 let task_id = manager.submit(&cx, "transition_test", None).unwrap();
1128
1129 manager.complete_task(&task_id, serde_json::json!({"result": "noop"}));
1131 let info = manager.get_info(&task_id).unwrap();
1132 assert_eq!(info.status, TaskStatus::Pending);
1133
1134 manager.start_task(&task_id).unwrap();
1135 manager.complete_task(&task_id, serde_json::json!({"result": "ok"}));
1136 let info = manager.get_info(&task_id).unwrap();
1137 assert_eq!(info.status, TaskStatus::Completed);
1138
1139 let result = manager.start_task(&task_id);
1141 assert!(result.is_err());
1142 }
1143
1144 #[test]
1145 fn test_concurrent_submissions() {
1146 let manager = Arc::new(TaskManager::new_for_testing());
1147 manager.register_handler("concurrent_test", |_cx, _params| async {
1148 Ok(serde_json::json!({}))
1149 });
1150
1151 let mut handles = Vec::new();
1152 for _ in 0..4 {
1153 let manager = Arc::clone(&manager);
1154 handles.push(thread::spawn(move || {
1155 let cx = Cx::for_testing();
1156 for _ in 0..10 {
1157 let _ = manager.submit(&cx, "concurrent_test", None).unwrap();
1158 }
1159 }));
1160 }
1161
1162 for handle in handles {
1163 handle.join().expect("thread join failed");
1164 }
1165
1166 assert_eq!(manager.total_count(), 40);
1167 assert_eq!(manager.list_tasks(Some(TaskStatus::Pending)).len(), 40);
1168 }
1169
1170 #[test]
1171 fn test_task_status_notifications() {
1172 let manager = TaskManager::new_for_testing();
1173 manager.register_handler("notify_test", |_cx, _params| async {
1174 Ok(serde_json::json!({"ok": true}))
1175 });
1176
1177 let events: Arc<std::sync::Mutex<Vec<TaskStatusNotificationParams>>> =
1178 Arc::new(std::sync::Mutex::new(Vec::new()));
1179 let sender_events = Arc::clone(&events);
1180 let sender: TaskNotificationSender = Arc::new(move |request| {
1181 if request.method != "notifications/tasks/status" {
1182 return;
1183 }
1184 let params = request
1185 .params
1186 .as_ref()
1187 .and_then(|value| serde_json::from_value(value.clone()).ok())
1188 .expect("task status params");
1189 sender_events
1190 .lock()
1191 .expect("events lock poisoned")
1192 .push(params);
1193 });
1194 manager.set_notification_sender(sender);
1195
1196 let cx = Cx::for_testing();
1197 let task_id = manager.submit(&cx, "notify_test", None).unwrap();
1198 manager.start_task(&task_id).unwrap();
1199 manager.update_progress(&task_id, 0.5, Some("half".to_string()));
1200 manager.complete_task(&task_id, serde_json::json!({"result": 1}));
1201
1202 let recorded = events.lock().expect("events lock poisoned").clone();
1203 assert!(!recorded.is_empty(), "expected task status notifications");
1204 assert_eq!(recorded[0].id, task_id);
1205 assert_eq!(recorded[0].status, TaskStatus::Pending);
1206 assert_eq!(recorded[1].status, TaskStatus::Running);
1207 assert_eq!(recorded[2].progress, Some(0.5));
1208 assert_eq!(recorded.last().expect("last").status, TaskStatus::Completed);
1209 }
1210
1211 #[test]
1214 fn can_transition_valid_pairs() {
1215 assert!(can_transition(TaskStatus::Pending, TaskStatus::Running));
1216 assert!(can_transition(TaskStatus::Pending, TaskStatus::Failed));
1217 assert!(can_transition(TaskStatus::Pending, TaskStatus::Cancelled));
1218 assert!(can_transition(TaskStatus::Running, TaskStatus::Completed));
1219 assert!(can_transition(TaskStatus::Running, TaskStatus::Failed));
1220 assert!(can_transition(TaskStatus::Running, TaskStatus::Cancelled));
1221 }
1222
1223 #[test]
1224 fn can_transition_invalid_pairs() {
1225 assert!(!can_transition(TaskStatus::Pending, TaskStatus::Completed));
1226 assert!(!can_transition(TaskStatus::Completed, TaskStatus::Running));
1227 assert!(!can_transition(TaskStatus::Completed, TaskStatus::Pending));
1228 assert!(!can_transition(
1229 TaskStatus::Completed,
1230 TaskStatus::Cancelled
1231 ));
1232 assert!(!can_transition(TaskStatus::Failed, TaskStatus::Running));
1233 assert!(!can_transition(TaskStatus::Cancelled, TaskStatus::Running));
1234 }
1235
1236 #[test]
1239 fn default_creates_empty_manager() {
1240 let manager = TaskManager::default();
1241 assert_eq!(manager.total_count(), 0);
1242 assert!(!manager.has_list_changed_notifications());
1243 }
1244
1245 #[test]
1246 fn new_for_testing_disables_auto_execute() {
1247 let manager = TaskManager::new_for_testing();
1248 assert!(!manager.auto_execute);
1249 }
1250
1251 #[test]
1252 fn into_shared_returns_arc() {
1253 let manager = TaskManager::new_for_testing();
1254 let shared: SharedTaskManager = manager.into_shared();
1255 assert_eq!(shared.total_count(), 0);
1256 }
1257
1258 #[test]
1259 fn debug_output_contains_fields() {
1260 let manager = TaskManager::new_for_testing();
1261 let debug = format!("{:?}", manager);
1262 assert!(debug.contains("TaskManager"));
1263 assert!(debug.contains("task_count"));
1264 assert!(debug.contains("handler_count"));
1265 assert!(debug.contains("task_counter"));
1266 assert!(debug.contains("list_changed_notifications"));
1267 assert!(debug.contains("auto_execute"));
1268 }
1269
1270 #[test]
1273 fn get_info_nonexistent_returns_none() {
1274 let manager = TaskManager::new_for_testing();
1275 let fake_id = TaskId::from_string("nonexistent".to_string());
1276 assert!(manager.get_info(&fake_id).is_none());
1277 }
1278
1279 #[test]
1280 fn get_result_nonexistent_returns_none() {
1281 let manager = TaskManager::new_for_testing();
1282 let fake_id = TaskId::from_string("nonexistent".to_string());
1283 assert!(manager.get_result(&fake_id).is_none());
1284 }
1285
1286 #[test]
1287 fn get_result_pending_task_returns_none() {
1288 let manager = TaskManager::new_for_testing();
1289 let cx = Cx::for_testing();
1290 manager.register_handler("t", |_cx, _params| async { Ok(serde_json::json!({})) });
1291 let id = manager.submit(&cx, "t", None).unwrap();
1292 assert!(manager.get_result(&id).is_none());
1293 }
1294
1295 #[test]
1298 fn is_cancel_requested_nonexistent_returns_false() {
1299 let manager = TaskManager::new_for_testing();
1300 let fake_id = TaskId::from_string("nonexistent".to_string());
1301 assert!(!manager.is_cancel_requested(&fake_id));
1302 }
1303
1304 #[test]
1305 fn is_cancel_requested_before_cancel_returns_false() {
1306 let manager = TaskManager::new_for_testing();
1307 let cx = Cx::for_testing();
1308 manager.register_handler("t", |_cx, _params| async { Ok(serde_json::json!({})) });
1309 let id = manager.submit(&cx, "t", None).unwrap();
1310 assert!(!manager.is_cancel_requested(&id));
1311 }
1312
1313 #[test]
1316 fn update_progress_on_pending_task_is_ignored() {
1317 let manager = TaskManager::new_for_testing();
1318 let cx = Cx::for_testing();
1319 manager.register_handler("t", |_cx, _params| async { Ok(serde_json::json!({})) });
1320 let id = manager.submit(&cx, "t", None).unwrap();
1321 manager.update_progress(&id, 0.5, Some("test".to_string()));
1323 let info = manager.get_info(&id).unwrap();
1324 assert!(info.progress.is_none());
1325 }
1326
1327 #[test]
1328 fn update_progress_on_completed_task_is_ignored() {
1329 let manager = TaskManager::new_for_testing();
1330 let cx = Cx::for_testing();
1331 manager.register_handler("t", |_cx, _params| async { Ok(serde_json::json!({})) });
1332 let id = manager.submit(&cx, "t", None).unwrap();
1333 manager.start_task(&id).unwrap();
1334 manager.complete_task(&id, serde_json::json!({}));
1335 manager.update_progress(&id, 0.1, None);
1337 let info = manager.get_info(&id).unwrap();
1338 assert_eq!(info.progress, Some(1.0)); }
1340
1341 #[test]
1344 fn complete_task_nonexistent_does_not_panic() {
1345 let manager = TaskManager::new_for_testing();
1346 let fake_id = TaskId::from_string("nonexistent".to_string());
1347 manager.complete_task(&fake_id, serde_json::json!({})); }
1349
1350 #[test]
1351 fn fail_task_nonexistent_does_not_panic() {
1352 let manager = TaskManager::new_for_testing();
1353 let fake_id = TaskId::from_string("nonexistent".to_string());
1354 manager.fail_task(&fake_id, "error"); }
1356
1357 #[test]
1360 fn cancel_nonexistent_task_returns_error() {
1361 let manager = TaskManager::new_for_testing();
1362 let fake_id = TaskId::from_string("nonexistent".to_string());
1363 let err = manager.cancel(&fake_id, None).unwrap_err();
1364 assert!(err.message.contains("not found"));
1365 }
1366
1367 #[test]
1368 fn cancel_pending_task_directly() {
1369 let manager = TaskManager::new_for_testing();
1370 let cx = Cx::for_testing();
1371 manager.register_handler("t", |_cx, _params| async { Ok(serde_json::json!({})) });
1372 let id = manager.submit(&cx, "t", None).unwrap();
1373 let info = manager.cancel(&id, None).unwrap();
1375 assert_eq!(info.status, TaskStatus::Cancelled);
1376 assert!(manager.is_cancel_requested(&id));
1377 }
1378
1379 #[test]
1380 fn cancel_with_default_reason() {
1381 let manager = TaskManager::new_for_testing();
1382 let cx = Cx::for_testing();
1383 manager.register_handler("t", |_cx, _params| async { Ok(serde_json::json!({})) });
1384 let id = manager.submit(&cx, "t", None).unwrap();
1385 let info = manager.cancel(&id, None).unwrap();
1386 assert_eq!(info.error, Some("Cancelled by request".to_string()));
1387 }
1388
1389 #[test]
1392 fn task_ids_are_sequential() {
1393 let manager = TaskManager::new_for_testing();
1394 let cx = Cx::for_testing();
1395 manager.register_handler("t", |_cx, _params| async { Ok(serde_json::json!({})) });
1396 let id1 = manager.submit(&cx, "t", None).unwrap();
1397 let id2 = manager.submit(&cx, "t", None).unwrap();
1398 assert_ne!(id1, id2);
1399 assert!(id1.0.starts_with("task-"));
1400 assert!(id2.0.starts_with("task-"));
1401 }
1402
1403 #[test]
1406 fn start_task_nonexistent_returns_error() {
1407 let manager = TaskManager::new_for_testing();
1408 let fake_id = TaskId::from_string("nonexistent".to_string());
1409 let err = manager.start_task(&fake_id).unwrap_err();
1410 assert!(err.message.contains("not found"));
1411 }
1412
1413 #[test]
1414 fn start_task_already_running_returns_error() {
1415 let manager = TaskManager::new_for_testing();
1416 let cx = Cx::for_testing();
1417 manager.register_handler("t", |_cx, _params| async { Ok(serde_json::json!({})) });
1418 let id = manager.submit(&cx, "t", None).unwrap();
1419 manager.start_task(&id).unwrap();
1420 let err = manager.start_task(&id).unwrap_err();
1421 assert!(err.message.contains("not pending"));
1422 }
1423
1424 #[test]
1427 fn cleanup_completed_removes_old_terminal_tasks() {
1428 let manager = TaskManager::new_for_testing();
1429 let cx = Cx::for_testing();
1430 manager.register_handler("t", |_cx, _params| async { Ok(serde_json::json!({})) });
1431
1432 let id = manager.submit(&cx, "t", None).unwrap();
1433 manager.start_task(&id).unwrap();
1434 manager.complete_task(&id, serde_json::json!({}));
1435 assert_eq!(manager.total_count(), 1);
1436
1437 manager.cleanup_completed(std::time::Duration::from_secs(0));
1439 assert_eq!(manager.total_count(), 0);
1440 }
1441
1442 #[test]
1443 fn cleanup_completed_keeps_active_tasks() {
1444 let manager = TaskManager::new_for_testing();
1445 let cx = Cx::for_testing();
1446 manager.register_handler("t", |_cx, _params| async { Ok(serde_json::json!({})) });
1447
1448 let id1 = manager.submit(&cx, "t", None).unwrap();
1449 let id2 = manager.submit(&cx, "t", None).unwrap();
1450 manager.start_task(&id1).unwrap();
1451 manager.complete_task(&id1, serde_json::json!({}));
1452 manager.cleanup_completed(std::time::Duration::from_secs(0));
1455 assert_eq!(manager.total_count(), 1); assert!(manager.get_info(&id2).is_some());
1457 }
1458
1459 #[test]
1460 fn cleanup_completed_keeps_recent_tasks() {
1461 let manager = TaskManager::new_for_testing();
1462 let cx = Cx::for_testing();
1463 manager.register_handler("t", |_cx, _params| async { Ok(serde_json::json!({})) });
1464
1465 let id = manager.submit(&cx, "t", None).unwrap();
1466 manager.start_task(&id).unwrap();
1467 manager.complete_task(&id, serde_json::json!({}));
1468
1469 manager.cleanup_completed(std::time::Duration::from_secs(3600));
1471 assert_eq!(manager.total_count(), 1);
1472 }
1473
1474 #[test]
1477 fn transition_same_state_returns_true() {
1478 let task_id = TaskId::from_string("test".to_string());
1480 let mut state = TaskState {
1481 info: TaskInfo {
1482 id: task_id,
1483 task_type: "t".to_string(),
1484 status: TaskStatus::Running,
1485 progress: None,
1486 message: None,
1487 created_at: String::new(),
1488 started_at: None,
1489 completed_at: None,
1490 error: None,
1491 },
1492 cancel_requested: false,
1493 result: None,
1494 cx: Cx::for_testing(),
1495 };
1496 assert!(transition_state(&mut state, TaskStatus::Running));
1498 }
1499
1500 #[test]
1503 fn submit_with_none_params_creates_task() {
1504 let manager = TaskManager::new_for_testing();
1505 let cx = Cx::for_testing();
1506 manager.register_handler("t", |_cx, _params| async { Ok(serde_json::json!({})) });
1507 let id = manager.submit(&cx, "t", None).unwrap();
1508 let info = manager.get_info(&id).unwrap();
1509 assert_eq!(info.task_type, "t");
1510 assert_eq!(info.status, TaskStatus::Pending);
1511 assert!(info.started_at.is_none());
1512 assert!(info.completed_at.is_none());
1513 assert!(info.error.is_none());
1514 }
1515
1516 #[test]
1517 fn submit_with_some_params_creates_task() {
1518 let manager = TaskManager::new_for_testing();
1519 let cx = Cx::for_testing();
1520 manager.register_handler("t", |_cx, _params| async { Ok(serde_json::json!({})) });
1521 let id = manager
1522 .submit(&cx, "t", Some(serde_json::json!({"key": "value"})))
1523 .unwrap();
1524 assert!(manager.get_info(&id).is_some());
1525 }
1526
1527 #[test]
1530 fn fail_task_sets_error_result() {
1531 let manager = TaskManager::new_for_testing();
1532 let cx = Cx::for_testing();
1533 manager.register_handler("t", |_cx, _params| async { Ok(serde_json::json!({})) });
1534 let id = manager.submit(&cx, "t", None).unwrap();
1535 manager.start_task(&id).unwrap();
1536 manager.fail_task(&id, "boom");
1537 let result = manager.get_result(&id).unwrap();
1538 assert!(!result.success);
1539 assert_eq!(result.error, Some("boom".to_string()));
1540 assert!(result.data.is_none());
1541 }
1542
1543 #[test]
1546 fn update_progress_nonexistent_does_not_panic() {
1547 let manager = TaskManager::new_for_testing();
1548 let fake_id = TaskId::from_string("nonexistent".to_string());
1549 manager.update_progress(&fake_id, 0.5, None); }
1551
1552 #[test]
1555 fn fail_task_on_completed_is_ignored() {
1556 let manager = TaskManager::new_for_testing();
1557 let cx = Cx::for_testing();
1558 manager.register_handler("t", |_cx, _params| async { Ok(serde_json::json!({})) });
1559 let id = manager.submit(&cx, "t", None).unwrap();
1560 manager.start_task(&id).unwrap();
1561 manager.complete_task(&id, serde_json::json!({"done": true}));
1562 manager.fail_task(&id, "too late");
1564 let info = manager.get_info(&id).unwrap();
1565 assert_eq!(info.status, TaskStatus::Completed);
1566 let result = manager.get_result(&id).unwrap();
1567 assert!(result.success);
1568 }
1569
1570 #[test]
1573 fn complete_task_on_failed_is_ignored() {
1574 let manager = TaskManager::new_for_testing();
1575 let cx = Cx::for_testing();
1576 manager.register_handler("t", |_cx, _params| async { Ok(serde_json::json!({})) });
1577 let id = manager.submit(&cx, "t", None).unwrap();
1578 manager.start_task(&id).unwrap();
1579 manager.fail_task(&id, "something broke");
1580 manager.complete_task(&id, serde_json::json!({"late": true}));
1582 let info = manager.get_info(&id).unwrap();
1583 assert_eq!(info.status, TaskStatus::Failed);
1584 let result = manager.get_result(&id).unwrap();
1585 assert!(!result.success);
1586 }
1587
1588 #[test]
1591 fn register_handler_replaces_existing() {
1592 let manager = TaskManager::new_for_testing();
1593 manager.register_handler("t", |_cx, _params| async {
1594 Ok(serde_json::json!({"v": 1}))
1595 });
1596 manager.register_handler("t", |_cx, _params| async {
1597 Ok(serde_json::json!({"v": 2}))
1598 });
1599 let cx = Cx::for_testing();
1601 let id = manager.submit(&cx, "t", None).unwrap();
1602 assert!(manager.get_info(&id).is_some());
1603 }
1604
1605 #[test]
1608 fn transition_to_running_sets_started_at() {
1609 let task_id = TaskId::from_string("ts-test".to_string());
1610 let mut state = TaskState {
1611 info: TaskInfo {
1612 id: task_id,
1613 task_type: "t".to_string(),
1614 status: TaskStatus::Pending,
1615 progress: None,
1616 message: None,
1617 created_at: String::new(),
1618 started_at: None,
1619 completed_at: None,
1620 error: None,
1621 },
1622 cancel_requested: false,
1623 result: None,
1624 cx: Cx::for_testing(),
1625 };
1626 assert!(state.info.started_at.is_none());
1627 assert!(transition_state(&mut state, TaskStatus::Running));
1628 assert!(state.info.started_at.is_some());
1629 }
1630
1631 #[test]
1632 fn transition_to_completed_sets_completed_at() {
1633 let task_id = TaskId::from_string("ts-test".to_string());
1634 let mut state = TaskState {
1635 info: TaskInfo {
1636 id: task_id,
1637 task_type: "t".to_string(),
1638 status: TaskStatus::Running,
1639 progress: None,
1640 message: None,
1641 created_at: String::new(),
1642 started_at: Some("earlier".to_string()),
1643 completed_at: None,
1644 error: None,
1645 },
1646 cancel_requested: false,
1647 result: None,
1648 cx: Cx::for_testing(),
1649 };
1650 assert!(state.info.completed_at.is_none());
1651 assert!(transition_state(&mut state, TaskStatus::Completed));
1652 assert!(state.info.completed_at.is_some());
1653 }
1654
1655 #[test]
1656 fn transition_to_failed_sets_completed_at() {
1657 let task_id = TaskId::from_string("ts-test".to_string());
1658 let mut state = TaskState {
1659 info: TaskInfo {
1660 id: task_id,
1661 task_type: "t".to_string(),
1662 status: TaskStatus::Running,
1663 progress: None,
1664 message: None,
1665 created_at: String::new(),
1666 started_at: Some("earlier".to_string()),
1667 completed_at: None,
1668 error: None,
1669 },
1670 cancel_requested: false,
1671 result: None,
1672 cx: Cx::for_testing(),
1673 };
1674 assert!(transition_state(&mut state, TaskStatus::Failed));
1675 assert!(state.info.completed_at.is_some());
1676 }
1677
1678 #[test]
1679 fn transition_to_cancelled_sets_completed_at() {
1680 let task_id = TaskId::from_string("ts-test".to_string());
1681 let mut state = TaskState {
1682 info: TaskInfo {
1683 id: task_id,
1684 task_type: "t".to_string(),
1685 status: TaskStatus::Running,
1686 progress: None,
1687 message: None,
1688 created_at: String::new(),
1689 started_at: Some("earlier".to_string()),
1690 completed_at: None,
1691 error: None,
1692 },
1693 cancel_requested: false,
1694 result: None,
1695 cx: Cx::for_testing(),
1696 };
1697 assert!(transition_state(&mut state, TaskStatus::Cancelled));
1698 assert!(state.info.completed_at.is_some());
1699 }
1700
1701 #[test]
1702 fn transition_invalid_returns_false() {
1703 let task_id = TaskId::from_string("ts-test".to_string());
1704 let mut state = TaskState {
1705 info: TaskInfo {
1706 id: task_id,
1707 task_type: "t".to_string(),
1708 status: TaskStatus::Pending,
1709 progress: None,
1710 message: None,
1711 created_at: String::new(),
1712 started_at: None,
1713 completed_at: None,
1714 error: None,
1715 },
1716 cancel_requested: false,
1717 result: None,
1718 cx: Cx::for_testing(),
1719 };
1720 assert!(!transition_state(&mut state, TaskStatus::Completed));
1722 assert_eq!(state.info.status, TaskStatus::Pending);
1724 }
1725
1726 #[test]
1729 fn task_status_snapshot_debug_and_clone() {
1730 let task_id = TaskId::from_string("snap-test".to_string());
1731 let state = TaskState {
1732 info: TaskInfo {
1733 id: task_id,
1734 task_type: "t".to_string(),
1735 status: TaskStatus::Running,
1736 progress: Some(0.5),
1737 message: Some("testing".to_string()),
1738 created_at: "now".to_string(),
1739 started_at: Some("now".to_string()),
1740 completed_at: None,
1741 error: None,
1742 },
1743 cancel_requested: false,
1744 result: None,
1745 cx: Cx::for_testing(),
1746 };
1747 let snapshot = TaskStatusSnapshot::from(&state);
1748 let debug = format!("{:?}", snapshot);
1749 assert!(debug.contains("TaskStatusSnapshot"));
1750 let cloned = snapshot.clone();
1751 assert_eq!(cloned.info.status, TaskStatus::Running);
1752 assert!(cloned.result.is_none());
1753 }
1754
1755 #[test]
1758 fn cleanup_completed_removes_failed_and_cancelled() {
1759 let manager = TaskManager::new_for_testing();
1760 let cx = Cx::for_testing();
1761 manager.register_handler("t", |_cx, _params| async { Ok(serde_json::json!({})) });
1762
1763 let id1 = manager.submit(&cx, "t", None).unwrap();
1764 let id2 = manager.submit(&cx, "t", None).unwrap();
1765 let id3 = manager.submit(&cx, "t", None).unwrap();
1766
1767 manager.start_task(&id1).unwrap();
1769 manager.complete_task(&id1, serde_json::json!({}));
1770
1771 manager.start_task(&id2).unwrap();
1773 manager.fail_task(&id2, "error");
1774
1775 manager.cancel(&id3, None).unwrap();
1777
1778 assert_eq!(manager.total_count(), 3);
1779
1780 manager.cleanup_completed(std::time::Duration::from_secs(0));
1782 assert_eq!(manager.total_count(), 0);
1783 }
1784
1785 #[test]
1788 fn set_notification_sender_replaces_existing() {
1789 let manager = TaskManager::new_for_testing();
1790 manager.register_handler("t", |_cx, _params| async { Ok(serde_json::json!({})) });
1791
1792 let count1 = Arc::new(std::sync::atomic::AtomicUsize::new(0));
1793 let count2 = Arc::new(std::sync::atomic::AtomicUsize::new(0));
1794
1795 let c1 = Arc::clone(&count1);
1796 let sender1: TaskNotificationSender = Arc::new(move |_| {
1797 c1.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
1798 });
1799 manager.set_notification_sender(sender1);
1800
1801 let cx = Cx::for_testing();
1802 let _id1 = manager.submit(&cx, "t", None).unwrap();
1803 assert!(count1.load(std::sync::atomic::Ordering::SeqCst) > 0);
1804
1805 let c2 = Arc::clone(&count2);
1807 let sender2: TaskNotificationSender = Arc::new(move |_| {
1808 c2.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
1809 });
1810 manager.set_notification_sender(sender2);
1811
1812 let _id2 = manager.submit(&cx, "t", None).unwrap();
1813 assert!(count2.load(std::sync::atomic::Ordering::SeqCst) > 0);
1814 }
1815
1816 #[test]
1819 fn cancel_with_custom_reason() {
1820 let manager = TaskManager::new_for_testing();
1821 let cx = Cx::for_testing();
1822 manager.register_handler("t", |_cx, _params| async { Ok(serde_json::json!({})) });
1823 let id = manager.submit(&cx, "t", None).unwrap();
1824 manager.start_task(&id).unwrap();
1825 let info = manager.cancel(&id, Some("Timeout".to_string())).unwrap();
1826 assert_eq!(info.error, Some("Timeout".to_string()));
1827 let result = manager.get_result(&id).unwrap();
1828 assert_eq!(result.error, Some("Timeout".to_string()));
1829 }
1830
1831 #[test]
1834 fn can_transition_self_is_false() {
1835 assert!(!can_transition(TaskStatus::Pending, TaskStatus::Pending));
1838 assert!(!can_transition(TaskStatus::Running, TaskStatus::Running));
1839 assert!(!can_transition(
1840 TaskStatus::Completed,
1841 TaskStatus::Completed
1842 ));
1843 assert!(!can_transition(TaskStatus::Failed, TaskStatus::Failed));
1844 assert!(!can_transition(
1845 TaskStatus::Cancelled,
1846 TaskStatus::Cancelled
1847 ));
1848 }
1849
1850 #[test]
1853 fn transition_state_identity_pending_returns_true() {
1854 let task_id = TaskId::from_string("identity-test".to_string());
1855 let mut state = TaskState {
1856 info: TaskInfo {
1857 id: task_id,
1858 task_type: "t".to_string(),
1859 status: TaskStatus::Pending,
1860 progress: None,
1861 message: None,
1862 created_at: String::new(),
1863 started_at: None,
1864 completed_at: None,
1865 error: None,
1866 },
1867 cancel_requested: false,
1868 result: None,
1869 cx: Cx::for_testing(),
1870 };
1871 assert!(transition_state(&mut state, TaskStatus::Pending));
1872 assert_eq!(state.info.status, TaskStatus::Pending);
1873 }
1874
1875 #[test]
1878 fn list_tasks_no_filter_returns_all() {
1879 let manager = TaskManager::new_for_testing();
1880 let cx = Cx::for_testing();
1881 manager.register_handler("t", |_cx, _params| async { Ok(serde_json::json!({})) });
1882 let id1 = manager.submit(&cx, "t", None).unwrap();
1883 let _id2 = manager.submit(&cx, "t", None).unwrap();
1884 manager.start_task(&id1).unwrap();
1885 manager.complete_task(&id1, serde_json::json!({}));
1886 let all = manager.list_tasks(None);
1888 assert_eq!(all.len(), 2);
1889 }
1890
1891 #[test]
1894 fn cancel_notification_includes_error_and_result() {
1895 let manager = TaskManager::new_for_testing();
1896 manager.register_handler("t", |_cx, _params| async { Ok(serde_json::json!({})) });
1897
1898 let events: Arc<std::sync::Mutex<Vec<TaskStatusNotificationParams>>> =
1899 Arc::new(std::sync::Mutex::new(Vec::new()));
1900 let sender_events = Arc::clone(&events);
1901 let sender: TaskNotificationSender = Arc::new(move |request| {
1902 if request.method == "notifications/tasks/status" {
1903 let params: TaskStatusNotificationParams = request
1904 .params
1905 .as_ref()
1906 .and_then(|v| serde_json::from_value(v.clone()).ok())
1907 .unwrap();
1908 sender_events.lock().unwrap().push(params);
1909 }
1910 });
1911 manager.set_notification_sender(sender);
1912
1913 let cx = Cx::for_testing();
1914 let id = manager.submit(&cx, "t", None).unwrap();
1915 manager.cancel(&id, Some("user abort".to_string())).unwrap();
1916
1917 let recorded = events.lock().unwrap().clone();
1918 let last = recorded.last().unwrap();
1920 assert_eq!(last.status, TaskStatus::Cancelled);
1921 assert_eq!(last.error, Some("user abort".to_string()));
1922 assert!(last.result.is_some());
1923 let result = last.result.as_ref().unwrap();
1924 assert!(!result.success);
1925 }
1926
1927 #[test]
1930 fn complete_task_sets_progress_to_one() {
1931 let manager = TaskManager::new_for_testing();
1932 let cx = Cx::for_testing();
1933 manager.register_handler("t", |_cx, _params| async { Ok(serde_json::json!({})) });
1934 let id = manager.submit(&cx, "t", None).unwrap();
1935 manager.start_task(&id).unwrap();
1936 manager.update_progress(&id, 0.5, None);
1937 manager.complete_task(&id, serde_json::json!({}));
1938 let info = manager.get_info(&id).unwrap();
1939 assert_eq!(info.progress, Some(1.0));
1940 }
1941
1942 #[test]
1945 fn cleanup_completed_keeps_terminal_without_completed_at() {
1946 let manager = TaskManager::new_for_testing();
1947 let cx = Cx::for_testing();
1948 manager.register_handler("t", |_cx, _params| async { Ok(serde_json::json!({})) });
1949 let id = manager.submit(&cx, "t", None).unwrap();
1950 manager.start_task(&id).unwrap();
1951 manager.complete_task(&id, serde_json::json!({}));
1952
1953 {
1955 let mut tasks = manager.tasks.write().unwrap();
1956 tasks.get_mut(&id).unwrap().info.completed_at = None;
1957 }
1958
1959 manager.cleanup_completed(std::time::Duration::from_secs(0));
1961 assert_eq!(manager.total_count(), 1);
1962 }
1963
1964 #[test]
1965 fn cleanup_completed_keeps_terminal_with_unparseable_timestamp() {
1966 let manager = TaskManager::new_for_testing();
1967 let cx = Cx::for_testing();
1968 manager.register_handler("t", |_cx, _params| async { Ok(serde_json::json!({})) });
1969 let id = manager.submit(&cx, "t", None).unwrap();
1970 manager.start_task(&id).unwrap();
1971 manager.complete_task(&id, serde_json::json!({}));
1972
1973 {
1975 let mut tasks = manager.tasks.write().unwrap();
1976 tasks.get_mut(&id).unwrap().info.completed_at = Some("not-a-date".to_string());
1977 }
1978
1979 manager.cleanup_completed(std::time::Duration::from_secs(0));
1980 assert_eq!(manager.total_count(), 1);
1981 }
1982
1983 #[test]
1986 fn debug_output_with_tasks_and_handlers() {
1987 let manager = TaskManager::new_for_testing();
1988 manager.register_handler("type_a", |_cx, _params| async { Ok(serde_json::json!({})) });
1989 manager.register_handler("type_b", |_cx, _params| async { Ok(serde_json::json!({})) });
1990 let cx = Cx::for_testing();
1991 let _ = manager.submit(&cx, "type_a", None).unwrap();
1992 let _ = manager.submit(&cx, "type_b", None).unwrap();
1993
1994 let debug = format!("{:?}", manager);
1995 assert!(debug.contains("task_count: 2"));
1996 assert!(debug.contains("handler_count: 2"));
1997 }
1998
1999 #[test]
2002 fn multiple_handler_types_independent() {
2003 let manager = TaskManager::new_for_testing();
2004 let cx = Cx::for_testing();
2005 manager.register_handler("analyze", |_cx, _params| async {
2006 Ok(serde_json::json!({"type": "analyze"}))
2007 });
2008 manager.register_handler("summarize", |_cx, _params| async {
2009 Ok(serde_json::json!({"type": "summarize"}))
2010 });
2011
2012 let id_a = manager.submit(&cx, "analyze", None).unwrap();
2013 let id_s = manager.submit(&cx, "summarize", None).unwrap();
2014
2015 let info_a = manager.get_info(&id_a).unwrap();
2016 let info_s = manager.get_info(&id_s).unwrap();
2017 assert_eq!(info_a.task_type, "analyze");
2018 assert_eq!(info_s.task_type, "summarize");
2019 }
2020
2021 #[test]
2024 fn list_tasks_filter_failed() {
2025 let manager = TaskManager::new_for_testing();
2026 let cx = Cx::for_testing();
2027 manager.register_handler("t", |_cx, _params| async { Ok(serde_json::json!({})) });
2028
2029 let id = manager.submit(&cx, "t", None).unwrap();
2030 manager.start_task(&id).unwrap();
2031 manager.fail_task(&id, "err");
2032
2033 assert_eq!(manager.list_tasks(Some(TaskStatus::Failed)).len(), 1);
2034 assert_eq!(manager.list_tasks(Some(TaskStatus::Completed)).len(), 0);
2035 }
2036
2037 #[test]
2038 fn list_tasks_filter_cancelled() {
2039 let manager = TaskManager::new_for_testing();
2040 let cx = Cx::for_testing();
2041 manager.register_handler("t", |_cx, _params| async { Ok(serde_json::json!({})) });
2042
2043 let id = manager.submit(&cx, "t", None).unwrap();
2044 manager.cancel(&id, None).unwrap();
2045
2046 assert_eq!(manager.list_tasks(Some(TaskStatus::Cancelled)).len(), 1);
2047 assert_eq!(manager.list_tasks(Some(TaskStatus::Pending)).len(), 0);
2048 }
2049
2050 #[test]
2053 fn progress_notification_includes_message() {
2054 let manager = TaskManager::new_for_testing();
2055 manager.register_handler("t", |_cx, _params| async { Ok(serde_json::json!({})) });
2056
2057 let events: Arc<std::sync::Mutex<Vec<TaskStatusNotificationParams>>> =
2058 Arc::new(std::sync::Mutex::new(Vec::new()));
2059 let sender_events = Arc::clone(&events);
2060 let sender: TaskNotificationSender = Arc::new(move |request| {
2061 if request.method == "notifications/tasks/status" {
2062 let params: TaskStatusNotificationParams = request
2063 .params
2064 .as_ref()
2065 .and_then(|v| serde_json::from_value(v.clone()).ok())
2066 .unwrap();
2067 sender_events.lock().unwrap().push(params);
2068 }
2069 });
2070 manager.set_notification_sender(sender);
2071
2072 let cx = Cx::for_testing();
2073 let id = manager.submit(&cx, "t", None).unwrap();
2074 manager.start_task(&id).unwrap();
2075 manager.update_progress(&id, 0.75, Some("three quarters".to_string()));
2076
2077 let recorded = events.lock().unwrap().clone();
2078 let progress_event = recorded
2079 .iter()
2080 .find(|e| e.progress == Some(0.75))
2081 .expect("progress notification");
2082 assert_eq!(progress_event.message, Some("three quarters".to_string()));
2083 assert_eq!(progress_event.status, TaskStatus::Running);
2084 }
2085
2086 #[test]
2089 fn task_status_snapshot_includes_result() {
2090 let task_id = TaskId::from_string("snap-result");
2091 let state = TaskState {
2092 info: TaskInfo {
2093 id: task_id.clone(),
2094 task_type: "t".to_string(),
2095 status: TaskStatus::Completed,
2096 progress: Some(1.0),
2097 message: None,
2098 created_at: "now".to_string(),
2099 started_at: Some("now".to_string()),
2100 completed_at: Some("now".to_string()),
2101 error: None,
2102 },
2103 cancel_requested: false,
2104 result: Some(TaskResult {
2105 id: task_id,
2106 success: true,
2107 data: Some(serde_json::json!({"done": true})),
2108 error: None,
2109 }),
2110 cx: Cx::for_testing(),
2111 };
2112 let snapshot = TaskStatusSnapshot::from(&state);
2113 assert!(snapshot.result.is_some());
2114 let result = snapshot.result.unwrap();
2115 assert!(result.success);
2116 assert_eq!(result.data, Some(serde_json::json!({"done": true})));
2117 }
2118
2119 #[test]
2122 fn submit_unknown_task_type_error_message() {
2123 let manager = TaskManager::new_for_testing();
2124 let cx = Cx::for_testing();
2125 let err = manager.submit(&cx, "nonexistent_type", None).unwrap_err();
2126 assert!(err.message.contains("Unknown task type"));
2127 assert!(err.message.contains("nonexistent_type"));
2128 }
2129
2130 #[test]
2133 fn cancel_result_has_no_data() {
2134 let manager = TaskManager::new_for_testing();
2135 let cx = Cx::for_testing();
2136 manager.register_handler("t", |_cx, _params| async { Ok(serde_json::json!({})) });
2137 let id = manager.submit(&cx, "t", None).unwrap();
2138 manager.start_task(&id).unwrap();
2139 manager.cancel(&id, Some("abort".to_string())).unwrap();
2140 let result = manager.get_result(&id).unwrap();
2141 assert!(!result.success);
2142 assert!(result.data.is_none());
2143 assert_eq!(result.error, Some("abort".to_string()));
2144 }
2145
2146 #[test]
2149 fn cancel_completed_task_returns_error() {
2150 let manager = TaskManager::new_for_testing();
2151 let cx = Cx::for_testing();
2152 manager.register_handler("t", |_cx, _params| async { Ok(serde_json::json!({})) });
2153 let id = manager.submit(&cx, "t", None).unwrap();
2154 manager.start_task(&id).unwrap();
2155 manager.complete_task(&id, serde_json::json!({}));
2156 let err = manager.cancel(&id, None).unwrap_err();
2157 assert!(err.message.contains("terminal"));
2158 }
2159
2160 #[test]
2161 fn cancel_failed_task_returns_error() {
2162 let manager = TaskManager::new_for_testing();
2163 let cx = Cx::for_testing();
2164 manager.register_handler("t", |_cx, _params| async { Ok(serde_json::json!({})) });
2165 let id = manager.submit(&cx, "t", None).unwrap();
2166 manager.start_task(&id).unwrap();
2167 manager.fail_task(&id, "broke");
2168 let err = manager.cancel(&id, None).unwrap_err();
2169 assert!(err.message.contains("terminal"));
2170 }
2171
2172 #[test]
2173 fn fail_task_on_pending_records_failure() {
2174 let manager = TaskManager::new_for_testing();
2175 let cx = Cx::for_testing();
2176 manager.register_handler("t", |_cx, _params| async { Ok(serde_json::json!({})) });
2177 let id = manager.submit(&cx, "t", None).unwrap();
2178 manager.fail_task(&id, "too early");
2179 let info = manager.get_info(&id).unwrap();
2180 assert_eq!(info.status, TaskStatus::Failed);
2181 assert_eq!(info.error.as_deref(), Some("too early"));
2182 assert!(info.completed_at.is_some());
2183
2184 let result = manager
2185 .get_result(&id)
2186 .expect("failed task should record a result");
2187 assert!(!result.success);
2188 assert_eq!(result.error.as_deref(), Some("too early"));
2189 }
2190
2191 #[test]
2192 fn spawn_task_skips_handler_for_pre_failed_pending_task() {
2193 let manager = TaskManager::new();
2194 let task_runs = Arc::new(AtomicU64::new(0));
2195 let task_type = "never-run".to_string();
2196 let task_id = TaskId::from_string("task-prefailed");
2197 let task_cx = Cx::for_request_with_budget(Budget::INFINITE);
2198 let now = chrono::Utc::now().to_rfc3339();
2199
2200 manager.register_handler(task_type.clone(), {
2201 let task_runs = Arc::clone(&task_runs);
2202 move |_cx, _params| {
2203 let task_runs = Arc::clone(&task_runs);
2204 async move {
2205 task_runs.fetch_add(1, Ordering::SeqCst);
2206 Ok(serde_json::json!({"unexpected": true}))
2207 }
2208 }
2209 });
2210
2211 {
2212 let mut tasks = manager.tasks.write().unwrap_or_else(|poisoned| {
2213 warn!(target: targets::SERVER, "tasks lock poisoned in test, recovering");
2214 poisoned.into_inner()
2215 });
2216 tasks.insert(
2217 task_id.clone(),
2218 TaskState {
2219 info: TaskInfo {
2220 id: task_id.clone(),
2221 task_type: task_type.clone(),
2222 status: TaskStatus::Failed,
2223 progress: None,
2224 message: None,
2225 created_at: now,
2226 started_at: None,
2227 completed_at: Some(chrono::Utc::now().to_rfc3339()),
2228 error: Some("prefailed".to_string()),
2229 },
2230 cancel_requested: false,
2231 result: Some(TaskResult {
2232 id: task_id.clone(),
2233 success: false,
2234 data: None,
2235 error: Some("prefailed".to_string()),
2236 }),
2237 cx: task_cx.clone(),
2238 },
2239 );
2240 }
2241
2242 manager.spawn_task(task_id.clone(), task_type, task_cx, serde_json::json!({}));
2243
2244 let deadline = std::time::Instant::now() + Duration::from_secs(1);
2245 while std::time::Instant::now() < deadline {
2246 if task_runs.load(Ordering::SeqCst) > 0 {
2247 break;
2248 }
2249 thread::sleep(Duration::from_millis(10));
2250 }
2251
2252 assert_eq!(
2253 task_runs.load(Ordering::SeqCst),
2254 0,
2255 "pre-failed pending task must not execute its handler"
2256 );
2257
2258 let info = manager
2259 .get_info(&task_id)
2260 .expect("prefailed task should remain present");
2261 assert_eq!(info.status, TaskStatus::Failed);
2262 assert_eq!(info.error.as_deref(), Some("prefailed"));
2263 }
2264
2265 #[test]
2266 fn complete_task_on_cancelled_is_ignored() {
2267 let manager = TaskManager::new_for_testing();
2268 let cx = Cx::for_testing();
2269 manager.register_handler("t", |_cx, _params| async { Ok(serde_json::json!({})) });
2270 let id = manager.submit(&cx, "t", None).unwrap();
2271 manager.start_task(&id).unwrap();
2272 manager.cancel(&id, Some("aborted".to_string())).unwrap();
2273 manager.complete_task(&id, serde_json::json!({"late": true}));
2275 let info = manager.get_info(&id).unwrap();
2276 assert_eq!(info.status, TaskStatus::Cancelled);
2277 }
2278
2279 #[test]
2280 fn update_progress_none_message_clears_previous() {
2281 let manager = TaskManager::new_for_testing();
2282 let cx = Cx::for_testing();
2283 manager.register_handler("t", |_cx, _params| async { Ok(serde_json::json!({})) });
2284 let id = manager.submit(&cx, "t", None).unwrap();
2285 manager.start_task(&id).unwrap();
2286 manager.update_progress(&id, 0.3, Some("step 1".to_string()));
2287 assert_eq!(
2288 manager.get_info(&id).unwrap().message,
2289 Some("step 1".to_string())
2290 );
2291 manager.update_progress(&id, 0.6, None);
2292 assert!(manager.get_info(&id).unwrap().message.is_none());
2293 }
2294
2295 #[test]
2296 fn no_notification_sender_does_not_panic() {
2297 let manager = TaskManager::new_for_testing();
2298 let cx = Cx::for_testing();
2299 manager.register_handler("t", |_cx, _params| async { Ok(serde_json::json!({})) });
2300 let id = manager.submit(&cx, "t", None).unwrap();
2302 manager.start_task(&id).unwrap();
2303 manager.update_progress(&id, 0.5, None);
2304 manager.complete_task(&id, serde_json::json!({}));
2305 assert_eq!(manager.get_info(&id).unwrap().status, TaskStatus::Completed);
2306 }
2307}