1use std::collections::HashMap;
35use std::sync::atomic::{AtomicU64, Ordering};
36use std::sync::{Arc, RwLock};
37
38use asupersync::runtime::{RuntimeBuilder, RuntimeHandle};
39use asupersync::{Budget, CancelKind, Cx};
40use fastmcp_core::logging::{debug, info, targets, warn};
41use fastmcp_core::{McpError, McpResult};
42use fastmcp_protocol::{
43 JsonRpcRequest, TaskId, TaskInfo, TaskResult, TaskStatus, TaskStatusNotificationParams,
44};
45
46pub type TaskNotificationSender = Arc<dyn Fn(JsonRpcRequest) + Send + Sync>;
48
49pub type TaskHandler = Box<dyn Fn(&Cx, serde_json::Value) -> TaskFuture + Send + Sync + 'static>;
53
54pub type TaskFuture = std::pin::Pin<
56 Box<dyn std::future::Future<Output = McpResult<serde_json::Value>> + Send + 'static>,
57>;
58
59struct TaskState {
61 info: TaskInfo,
63 cancel_requested: bool,
65 result: Option<TaskResult>,
67 cx: Cx,
69}
70
71fn can_transition(from: TaskStatus, to: TaskStatus) -> bool {
72 matches!(
73 (from, to),
74 (
75 TaskStatus::Pending,
76 TaskStatus::Running | TaskStatus::Failed | TaskStatus::Cancelled
77 ) | (
78 TaskStatus::Running,
79 TaskStatus::Completed | TaskStatus::Failed | TaskStatus::Cancelled
80 )
81 )
82}
83
84fn transition_state(state: &mut TaskState, to: TaskStatus) -> bool {
85 let from = state.info.status;
86 if from == to {
87 return true;
88 }
89 if !can_transition(from, to) {
90 warn!(
91 target: targets::SERVER,
92 "task {} invalid transition {:?} -> {:?}",
93 state.info.id,
94 from,
95 to
96 );
97 return false;
98 }
99
100 state.info.status = to;
101 let now = chrono::Utc::now().to_rfc3339();
102 match to {
103 TaskStatus::Running => {
104 state.info.started_at = Some(now.clone());
105 }
106 TaskStatus::Completed | TaskStatus::Failed | TaskStatus::Cancelled => {
107 state.info.completed_at = Some(now.clone());
108 }
109 TaskStatus::Pending => {}
110 }
111
112 info!(
113 target: targets::SERVER,
114 "task {} status {:?} -> {:?} at {}",
115 state.info.id,
116 from,
117 to,
118 now
119 );
120 true
121}
122
123fn mark_task_failed_snapshot(
124 tasks: &Arc<RwLock<HashMap<TaskId, TaskState>>>,
125 task_id: &TaskId,
126 error_msg: String,
127 lock_context: &'static str,
128) -> Option<TaskStatusSnapshot> {
129 let mut tasks_guard = tasks.write().unwrap_or_else(|poisoned| {
130 warn!(
131 target: targets::SERVER,
132 "tasks lock poisoned in {}, recovering",
133 lock_context
134 );
135 poisoned.into_inner()
136 });
137
138 let state = tasks_guard.get_mut(task_id)?;
139 if state.cancel_requested || !transition_state(state, TaskStatus::Failed) {
140 return None;
141 }
142
143 state.info.error = Some(error_msg.clone());
144 state.result = Some(TaskResult {
145 id: task_id.clone(),
146 success: false,
147 data: None,
148 error: Some(error_msg),
149 });
150 Some(TaskStatusSnapshot::from(state))
151}
152
153fn build_runtime_handle() -> Option<RuntimeHandle> {
154 match RuntimeBuilder::multi_thread().build() {
155 Ok(runtime) => Some(runtime.handle()),
156 Err(multi_err) => {
157 warn!(
158 target: targets::SERVER,
159 "failed to initialize multi-thread runtime for tasks: {}; attempting current-thread fallback",
160 multi_err
161 );
162 match RuntimeBuilder::current_thread().build() {
163 Ok(runtime) => Some(runtime.handle()),
164 Err(single_err) => {
165 warn!(
166 target: targets::SERVER,
167 "failed to initialize current-thread runtime fallback for tasks: {}",
168 single_err
169 );
170 None
171 }
172 }
173 }
174 }
175}
176
177pub struct TaskManager {
182 tasks: Arc<RwLock<HashMap<TaskId, TaskState>>>,
184 handlers: Arc<RwLock<HashMap<String, TaskHandler>>>,
186 task_counter: AtomicU64,
188 list_changed_notifications: bool,
190 runtime: Option<RuntimeHandle>,
192 auto_execute: bool,
194 notification_sender: Arc<RwLock<Option<TaskNotificationSender>>>,
196}
197
198impl TaskManager {
199 #[must_use]
201 pub fn new() -> Self {
202 let runtime = build_runtime_handle();
203 if runtime.is_none() {
204 warn!(
205 target: targets::SERVER,
206 "TaskManager runtime unavailable; auto-executed tasks will fail until runtime becomes available"
207 );
208 }
209 Self {
210 tasks: Arc::new(RwLock::new(HashMap::new())),
211 handlers: Arc::new(RwLock::new(HashMap::new())),
212 task_counter: AtomicU64::new(0),
213 list_changed_notifications: false,
214 runtime,
215 auto_execute: true,
216 notification_sender: Arc::new(RwLock::new(None)),
217 }
218 }
219
220 #[must_use]
222 pub fn with_list_changed_notifications() -> Self {
223 Self {
224 list_changed_notifications: true,
225 ..Self::new()
226 }
227 }
228
229 #[must_use]
233 pub fn new_for_testing() -> Self {
234 let mut manager = Self::new();
235 manager.auto_execute = false;
236 manager
237 }
238
239 #[must_use]
241 pub fn into_shared(self) -> SharedTaskManager {
242 Arc::new(self)
243 }
244
245 #[must_use]
247 pub fn has_list_changed_notifications(&self) -> bool {
248 self.list_changed_notifications
249 }
250
251 pub fn set_notification_sender(&self, sender: TaskNotificationSender) {
253 let mut guard = self.notification_sender.write().unwrap_or_else(|poisoned| {
254 warn!(target: targets::SERVER, "notification sender lock poisoned, recovering");
255 poisoned.into_inner()
256 });
257 *guard = Some(sender);
258 }
259
260 pub fn register_handler<F, Fut>(&self, task_type: impl Into<String>, handler: F)
264 where
265 F: Fn(&Cx, serde_json::Value) -> Fut + Send + Sync + 'static,
266 Fut: std::future::Future<Output = McpResult<serde_json::Value>> + Send + 'static,
267 {
268 let task_type = task_type.into();
269 let boxed_handler: TaskHandler = Box::new(move |cx, params| Box::pin(handler(cx, params)));
270
271 let mut handlers = self.handlers.write().unwrap_or_else(|poisoned| {
272 warn!(target: targets::SERVER, "handlers lock poisoned, recovering");
273 poisoned.into_inner()
274 });
275 handlers.insert(task_type, boxed_handler);
276 }
277
278 pub fn submit(
283 &self,
284 _cx: &Cx,
285 task_type: impl Into<String>,
286 params: Option<serde_json::Value>,
287 ) -> McpResult<TaskId> {
288 let task_type = task_type.into();
289
290 {
292 let handlers = self.handlers.read().unwrap_or_else(|poisoned| {
293 warn!(target: targets::SERVER, "handlers lock poisoned, recovering");
294 poisoned.into_inner()
295 });
296 if !handlers.contains_key(&task_type) {
297 return Err(McpError::invalid_params(format!(
298 "Unknown task type: {task_type}"
299 )));
300 }
301 }
302
303 let counter = self.task_counter.fetch_add(1, Ordering::SeqCst);
305 let task_id = TaskId::from_string(format!("task-{counter:08x}"));
306
307 let now = chrono::Utc::now().to_rfc3339();
309 let task_cx = Cx::for_request_with_budget(Budget::INFINITE);
310 let info = TaskInfo {
311 id: task_id.clone(),
312 task_type: task_type.clone(),
313 status: TaskStatus::Pending,
314 progress: None,
315 message: None,
316 created_at: now,
317 started_at: None,
318 completed_at: None,
319 error: None,
320 };
321
322 let info_snapshot = info.clone();
323
324 let state = TaskState {
326 info,
327 cancel_requested: false,
328 result: None,
329 cx: task_cx.clone(),
330 };
331
332 {
333 let mut tasks = self.tasks.write().unwrap_or_else(|poisoned| {
334 warn!(target: targets::SERVER, "tasks lock poisoned, recovering");
335 poisoned.into_inner()
336 });
337 tasks.insert(task_id.clone(), state);
338 }
339
340 self.notify_status(info_snapshot, None);
341
342 if self.auto_execute {
343 let params = params.unwrap_or_else(|| serde_json::json!({}));
344 self.spawn_task(task_id.clone(), task_type, task_cx, params);
345 }
346
347 Ok(task_id)
348 }
349
350 #[allow(clippy::too_many_lines)]
351 fn spawn_task(
352 &self,
353 task_id: TaskId,
354 task_type: String,
355 task_cx: Cx,
356 params: serde_json::Value,
357 ) {
358 let Some(runtime) = self.runtime.clone() else {
359 let failure_snapshot = mark_task_failed_snapshot(
360 &self.tasks,
361 &task_id,
362 "Task runtime unavailable".to_string(),
363 "spawn_task runtime unavailable",
364 );
365 self.notify_snapshot(failure_snapshot);
366 return;
367 };
368
369 let tasks = Arc::clone(&self.tasks);
370 let handlers = Arc::clone(&self.handlers);
371 let notification_sender = Arc::clone(&self.notification_sender);
372 let scheduled_task_id = task_id.clone();
373 let scheduling = runtime.try_spawn(async move {
374 let running_snapshot = {
375 let mut tasks_guard = tasks.write().unwrap_or_else(|poisoned| {
376 warn!(target: targets::SERVER, "tasks lock poisoned in spawn_task, recovering");
377 poisoned.into_inner()
378 });
379 match tasks_guard.get_mut(&task_id) {
380 Some(state) => {
381 if state.cancel_requested || !transition_state(state, TaskStatus::Running) {
382 None
383 } else {
384 Some(TaskStatusSnapshot::from(state))
385 }
386 }
387 None => None,
388 }
389 };
390
391 notify_snapshot(¬ification_sender, running_snapshot);
392
393 let task_future = {
394 let handlers_guard = handlers.read().unwrap_or_else(|poisoned| {
395 warn!(target: targets::SERVER, "handlers lock poisoned in spawn_task, recovering");
396 poisoned.into_inner()
397 });
398 let Some(handler) = handlers_guard.get(&task_type) else {
399 let failure_snapshot = mark_task_failed_snapshot(
400 &tasks,
401 &task_id,
402 format!("Unknown task type: {task_type}"),
403 "spawn_task failure",
404 );
405 notify_snapshot(¬ification_sender, failure_snapshot);
406 return;
407 };
408 (handler)(&task_cx, params)
409 };
410
411 let result = task_future.await;
412
413 let completion_snapshot = {
414 let mut tasks_guard = tasks.write().unwrap_or_else(|poisoned| {
415 warn!(target: targets::SERVER, "tasks lock poisoned in spawn_task completion, recovering");
416 poisoned.into_inner()
417 });
418 match tasks_guard.get_mut(&task_id) {
419 Some(state) => {
420 if state.cancel_requested {
421 None
422 } else {
423 let mut snapshot = None;
424 match result {
425 Ok(data) => {
426 if transition_state(state, TaskStatus::Completed) {
427 state.info.progress = Some(1.0);
428 state.result = Some(TaskResult {
429 id: task_id.clone(),
430 success: true,
431 data: Some(data),
432 error: None,
433 });
434 snapshot = Some(TaskStatusSnapshot::from(state));
435 }
436 }
437 Err(err) => {
438 let error_msg = err.message;
439 if transition_state(state, TaskStatus::Failed) {
440 state.info.error = Some(error_msg.clone());
441 state.result = Some(TaskResult {
442 id: task_id.clone(),
443 success: false,
444 data: None,
445 error: Some(error_msg),
446 });
447 snapshot = Some(TaskStatusSnapshot::from(state));
448 }
449 }
450 }
451 snapshot
452 }
453 }
454 None => None,
455 }
456 };
457
458 notify_snapshot(¬ification_sender, completion_snapshot);
459 });
460
461 if let Err(err) = scheduling {
462 warn!(
463 target: targets::SERVER,
464 "failed to schedule task {}: {}",
465 scheduled_task_id,
466 err
467 );
468 let failure_snapshot = mark_task_failed_snapshot(
469 &self.tasks,
470 &scheduled_task_id,
471 format!("Failed to schedule task: {err}"),
472 "spawn_task scheduling",
473 );
474 self.notify_snapshot(failure_snapshot);
475 }
476 }
477
478 pub fn start_task(&self, task_id: &TaskId) -> McpResult<()> {
482 let snapshot = {
483 let mut tasks = self.tasks.write().unwrap_or_else(|poisoned| {
484 warn!(target: targets::SERVER, "tasks lock poisoned in start_task, recovering");
485 poisoned.into_inner()
486 });
487 let state = tasks
488 .get_mut(task_id)
489 .ok_or_else(|| McpError::invalid_params(format!("Task not found: {task_id}")))?;
490
491 if state.info.status != TaskStatus::Pending {
492 return Err(McpError::invalid_params(format!(
493 "Task {task_id} is not pending"
494 )));
495 }
496
497 if !transition_state(state, TaskStatus::Running) {
498 return Err(McpError::invalid_params(format!(
499 "Task {task_id} cannot transition to running"
500 )));
501 }
502 Some(TaskStatusSnapshot::from(state))
503 };
504
505 self.notify_snapshot(snapshot);
506 Ok(())
507 }
508
509 pub fn update_progress(&self, task_id: &TaskId, progress: f64, message: Option<String>) {
511 let snapshot = {
512 let mut tasks = self.tasks.write().unwrap_or_else(|poisoned| {
513 warn!(target: targets::SERVER, "tasks lock poisoned in update_progress, recovering");
514 poisoned.into_inner()
515 });
516 if let Some(state) = tasks.get_mut(task_id) {
517 if state.info.status != TaskStatus::Running {
518 debug!(
519 target: targets::SERVER,
520 "task {} progress update ignored in state {:?}",
521 task_id,
522 state.info.status
523 );
524 return;
525 }
526 state.info.progress = Some(progress.clamp(0.0, 1.0));
527 state.info.message = message;
528 Some(TaskStatusSnapshot::from(state))
529 } else {
530 None
531 }
532 };
533
534 self.notify_snapshot(snapshot);
535 }
536
537 pub fn complete_task(&self, task_id: &TaskId, data: serde_json::Value) {
539 let snapshot = {
540 let mut tasks = self.tasks.write().unwrap_or_else(|poisoned| {
541 warn!(target: targets::SERVER, "tasks lock poisoned in complete_task, recovering");
542 poisoned.into_inner()
543 });
544 if let Some(state) = tasks.get_mut(task_id) {
545 if !transition_state(state, TaskStatus::Completed) {
546 return;
547 }
548 state.info.progress = Some(1.0);
549 state.result = Some(TaskResult {
550 id: task_id.clone(),
551 success: true,
552 data: Some(data),
553 error: None,
554 });
555 Some(TaskStatusSnapshot::from(state))
556 } else {
557 None
558 }
559 };
560
561 self.notify_snapshot(snapshot);
562 }
563
564 pub fn fail_task(&self, task_id: &TaskId, error: impl Into<String>) {
566 let error = error.into();
567 let snapshot = {
568 let mut tasks = self.tasks.write().unwrap_or_else(|poisoned| {
569 warn!(target: targets::SERVER, "tasks lock poisoned in fail_task, recovering");
570 poisoned.into_inner()
571 });
572 if let Some(state) = tasks.get_mut(task_id) {
573 if !transition_state(state, TaskStatus::Failed) {
574 return;
575 }
576 state.info.error = Some(error.clone());
577 state.result = Some(TaskResult {
578 id: task_id.clone(),
579 success: false,
580 data: None,
581 error: Some(error),
582 });
583 Some(TaskStatusSnapshot::from(state))
584 } else {
585 None
586 }
587 };
588
589 self.notify_snapshot(snapshot);
590 }
591
592 #[must_use]
594 pub fn get_info(&self, task_id: &TaskId) -> Option<TaskInfo> {
595 let tasks = self.tasks.read().unwrap_or_else(|poisoned| {
596 warn!(target: targets::SERVER, "tasks lock poisoned in get_info, recovering");
597 poisoned.into_inner()
598 });
599 tasks.get(task_id).map(|s| s.info.clone())
600 }
601
602 #[must_use]
604 pub fn get_result(&self, task_id: &TaskId) -> Option<TaskResult> {
605 let tasks = self.tasks.read().unwrap_or_else(|poisoned| {
606 warn!(target: targets::SERVER, "tasks lock poisoned in get_result, recovering");
607 poisoned.into_inner()
608 });
609 tasks.get(task_id).and_then(|s| s.result.clone())
610 }
611
612 #[must_use]
614 pub fn list_tasks(&self, status_filter: Option<TaskStatus>) -> Vec<TaskInfo> {
615 let tasks = self.tasks.read().unwrap_or_else(|poisoned| {
616 warn!(target: targets::SERVER, "tasks lock poisoned in list_tasks, recovering");
617 poisoned.into_inner()
618 });
619 tasks
620 .values()
621 .filter(|s| status_filter.is_none_or(|f| s.info.status == f))
622 .map(|s| s.info.clone())
623 .collect()
624 }
625
626 pub fn cancel(&self, task_id: &TaskId, reason: Option<String>) -> McpResult<TaskInfo> {
631 let snapshot = {
632 let mut tasks = self.tasks.write().unwrap_or_else(|poisoned| {
633 warn!(target: targets::SERVER, "tasks lock poisoned in cancel, recovering");
634 poisoned.into_inner()
635 });
636 let state = tasks
637 .get_mut(task_id)
638 .ok_or_else(|| McpError::invalid_params(format!("Task not found: {task_id}")))?;
639
640 if state.info.status.is_terminal() {
642 return Err(McpError::invalid_params(format!(
643 "Task {task_id} is already in terminal state: {:?}",
644 state.info.status
645 )));
646 }
647
648 if !transition_state(state, TaskStatus::Cancelled) {
649 return Err(McpError::invalid_params(format!(
650 "Task {task_id} cannot be cancelled from {:?}",
651 state.info.status
652 )));
653 }
654
655 state.cancel_requested = true;
656
657 state.cx.cancel_with(CancelKind::User, None);
658 if !state.cx.is_cancel_requested() {
659 warn!(
660 target: targets::SERVER,
661 "task {} cancel signal not observed on context",
662 task_id
663 );
664 }
665
666 let error_msg = reason.unwrap_or_else(|| "Cancelled by request".to_string());
667 state.info.error = Some(error_msg.clone());
668 state.result = Some(TaskResult {
669 id: task_id.clone(),
670 success: false,
671 data: None,
672 error: Some(error_msg),
673 });
674
675 let snapshot = TaskStatusSnapshot::from(state);
676 (snapshot, state.info.clone())
677 };
678
679 let (snapshot, info) = snapshot;
680 self.notify_snapshot(Some(snapshot));
681 Ok(info)
682 }
683
684 #[must_use]
686 pub fn is_cancel_requested(&self, task_id: &TaskId) -> bool {
687 let tasks = self.tasks.read().unwrap_or_else(|poisoned| {
688 warn!(target: targets::SERVER, "tasks lock poisoned in is_cancel_requested, recovering");
689 poisoned.into_inner()
690 });
691 tasks.get(task_id).is_some_and(|s| s.cancel_requested)
692 }
693
694 #[must_use]
696 pub fn active_count(&self) -> usize {
697 let tasks = self.tasks.read().unwrap_or_else(|poisoned| {
698 warn!(target: targets::SERVER, "tasks lock poisoned in active_count, recovering");
699 poisoned.into_inner()
700 });
701 tasks.values().filter(|s| s.info.status.is_active()).count()
702 }
703
704 #[must_use]
706 pub fn total_count(&self) -> usize {
707 let tasks = self.tasks.read().unwrap_or_else(|poisoned| {
708 warn!(target: targets::SERVER, "tasks lock poisoned in total_count, recovering");
709 poisoned.into_inner()
710 });
711 tasks.len()
712 }
713
714 pub fn cleanup_completed(&self, max_age: std::time::Duration) {
718 let cutoff = chrono::Utc::now() - chrono::Duration::from_std(max_age).unwrap_or_default();
719
720 let mut tasks = self.tasks.write().unwrap_or_else(|poisoned| {
721 warn!(target: targets::SERVER, "tasks lock poisoned in cleanup_completed, recovering");
722 poisoned.into_inner()
723 });
724 tasks.retain(|_, state| {
725 if state.info.status.is_active() {
727 return true;
728 }
729
730 if let Some(ref completed) = state.info.completed_at {
732 if let Ok(parsed) = chrono::DateTime::parse_from_rfc3339(completed) {
733 return parsed.with_timezone(&chrono::Utc) > cutoff;
734 }
735 return true;
736 }
737
738 true
739 });
740 }
741
742 fn notify_snapshot(&self, snapshot: Option<TaskStatusSnapshot>) {
743 if let Some(snapshot) = snapshot {
744 self.notify_status(snapshot.info, snapshot.result);
745 }
746 }
747
748 fn notify_status(&self, info: TaskInfo, result: Option<TaskResult>) {
749 let sender = {
750 let guard = self.notification_sender.read().unwrap_or_else(|poisoned| {
751 warn!(target: targets::SERVER, "notification sender lock poisoned in notify_status, recovering");
752 poisoned.into_inner()
753 });
754 guard.clone()
755 };
756 let Some(sender) = sender else {
757 return;
758 };
759
760 let params = TaskStatusNotificationParams {
761 id: info.id.clone(),
762 status: info.status,
763 progress: info.progress,
764 message: info.message.clone(),
765 error: info.error.clone(),
766 result,
767 };
768 let payload = match serde_json::to_value(params) {
769 Ok(value) => value,
770 Err(err) => {
771 warn!(
772 target: targets::SERVER,
773 "failed to serialize task status notification: {}",
774 err
775 );
776 return;
777 }
778 };
779 sender(JsonRpcRequest::notification(
780 "notifications/tasks/status",
781 Some(payload),
782 ));
783 }
784}
785
786#[derive(Debug, Clone)]
787struct TaskStatusSnapshot {
788 info: TaskInfo,
789 result: Option<TaskResult>,
790}
791
792impl TaskStatusSnapshot {
793 fn from(state: &TaskState) -> Self {
794 Self {
795 info: state.info.clone(),
796 result: state.result.clone(),
797 }
798 }
799}
800
801fn notify_snapshot(
802 sender: &Arc<RwLock<Option<TaskNotificationSender>>>,
803 snapshot: Option<TaskStatusSnapshot>,
804) {
805 let Some(snapshot) = snapshot else {
806 return;
807 };
808 let sender = {
809 let guard = sender.read().unwrap_or_else(|poisoned| {
810 warn!(target: targets::SERVER, "notification sender lock poisoned in notify_snapshot, recovering");
811 poisoned.into_inner()
812 });
813 guard.clone()
814 };
815 let Some(sender) = sender else {
816 return;
817 };
818 let params = TaskStatusNotificationParams {
819 id: snapshot.info.id.clone(),
820 status: snapshot.info.status,
821 progress: snapshot.info.progress,
822 message: snapshot.info.message.clone(),
823 error: snapshot.info.error.clone(),
824 result: snapshot.result,
825 };
826 let payload = match serde_json::to_value(params) {
827 Ok(value) => value,
828 Err(err) => {
829 warn!(
830 target: targets::SERVER,
831 "failed to serialize task status notification: {}",
832 err
833 );
834 return;
835 }
836 };
837 sender(JsonRpcRequest::notification(
838 "notifications/tasks/status",
839 Some(payload),
840 ));
841}
842
843impl Default for TaskManager {
844 fn default() -> Self {
845 Self::new()
846 }
847}
848
849impl std::fmt::Debug for TaskManager {
850 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
851 let task_count = self
853 .tasks
854 .read()
855 .map(|g| g.len())
856 .unwrap_or_else(|poisoned| poisoned.into_inner().len());
857 let handler_count = self
858 .handlers
859 .read()
860 .map(|g| g.len())
861 .unwrap_or_else(|poisoned| poisoned.into_inner().len());
862 f.debug_struct("TaskManager")
863 .field("task_count", &task_count)
864 .field("handler_count", &handler_count)
865 .field("task_counter", &self.task_counter.load(Ordering::SeqCst))
866 .field(
867 "list_changed_notifications",
868 &self.list_changed_notifications,
869 )
870 .field("auto_execute", &self.auto_execute)
871 .finish_non_exhaustive()
872 }
873}
874
875pub type SharedTaskManager = Arc<TaskManager>;
877
878#[cfg(test)]
879mod tests {
880 use super::*;
881 use std::sync::Arc;
882 use std::thread;
883
884 #[test]
885 fn test_task_manager_creation() {
886 let manager = TaskManager::new();
887 assert_eq!(manager.total_count(), 0);
888 assert_eq!(manager.active_count(), 0);
889 assert!(!manager.has_list_changed_notifications());
890 }
891
892 #[test]
893 fn test_task_manager_with_notifications() {
894 let manager = TaskManager::with_list_changed_notifications();
895 assert!(manager.has_list_changed_notifications());
896 }
897
898 #[test]
899 fn test_register_handler() {
900 let manager = TaskManager::new();
901
902 manager.register_handler("test_task", |_cx, _params| async {
903 Ok(serde_json::json!({}))
904 });
905
906 let cx = Cx::for_testing();
908 let result = manager.submit(&cx, "test_task", None);
909 assert!(result.is_ok());
910 }
911
912 #[test]
913 fn test_submit_auto_execute_fails_when_runtime_unavailable() {
914 let mut manager = TaskManager::new_for_testing();
915 manager.auto_execute = true;
916 manager.runtime = None;
917
918 manager.register_handler("test_task", |_cx, _params| async {
919 Ok(serde_json::json!({}))
920 });
921
922 let cx = Cx::for_testing();
923 let task_id = manager.submit(&cx, "test_task", None).unwrap();
924
925 let info = manager.get_info(&task_id).unwrap();
926 assert_eq!(info.status, TaskStatus::Failed);
927 assert_eq!(info.error.as_deref(), Some("Task runtime unavailable"));
928
929 let result = manager.get_result(&task_id).unwrap();
930 assert!(!result.success);
931 assert_eq!(result.error.as_deref(), Some("Task runtime unavailable"));
932 }
933
934 #[test]
935 fn test_submit_unknown_task_type() {
936 let manager = TaskManager::new();
937 let cx = Cx::for_testing();
938
939 let result = manager.submit(&cx, "unknown_task", None);
940 assert!(result.is_err());
941 }
942
943 #[test]
944 fn test_task_lifecycle() {
945 let manager = TaskManager::new_for_testing();
946 let cx = Cx::for_testing();
947
948 manager.register_handler("test", |_cx, _params| async {
949 Ok(serde_json::json!({"done": true}))
950 });
951
952 let task_id = manager.submit(&cx, "test", None).unwrap();
954
955 let info = manager.get_info(&task_id).unwrap();
957 assert_eq!(info.status, TaskStatus::Pending);
958 assert!(info.started_at.is_none());
959
960 manager.start_task(&task_id).unwrap();
962 let info = manager.get_info(&task_id).unwrap();
963 assert_eq!(info.status, TaskStatus::Running);
964 assert!(info.started_at.is_some());
965
966 manager.update_progress(&task_id, 0.5, Some("Halfway done".into()));
968 let info = manager.get_info(&task_id).unwrap();
969 assert_eq!(info.progress, Some(0.5));
970 assert_eq!(info.message, Some("Halfway done".into()));
971
972 manager.complete_task(&task_id, serde_json::json!({"result": 42}));
974 let info = manager.get_info(&task_id).unwrap();
975 assert_eq!(info.status, TaskStatus::Completed);
976 assert!(info.completed_at.is_some());
977
978 let result = manager.get_result(&task_id).unwrap();
980 assert!(result.success);
981 assert_eq!(result.data, Some(serde_json::json!({"result": 42})));
982 }
983
984 #[test]
985 fn test_task_failure() {
986 let manager = TaskManager::new_for_testing();
987 let cx = Cx::for_testing();
988
989 manager.register_handler("fail_test", |_cx, _params| async {
990 Ok(serde_json::json!({}))
991 });
992
993 let task_id = manager.submit(&cx, "fail_test", None).unwrap();
994 manager.start_task(&task_id).unwrap();
995 manager.fail_task(&task_id, "Something went wrong");
996
997 let info = manager.get_info(&task_id).unwrap();
998 assert_eq!(info.status, TaskStatus::Failed);
999 assert_eq!(info.error, Some("Something went wrong".into()));
1000
1001 let result = manager.get_result(&task_id).unwrap();
1002 assert!(!result.success);
1003 assert_eq!(result.error, Some("Something went wrong".into()));
1004 }
1005
1006 #[test]
1007 fn test_task_cancellation() {
1008 let manager = TaskManager::new_for_testing();
1009 let cx = Cx::for_testing();
1010
1011 manager.register_handler("cancel_test", |_cx, _params| async {
1012 Ok(serde_json::json!({}))
1013 });
1014
1015 let task_id = manager.submit(&cx, "cancel_test", None).unwrap();
1016 manager.start_task(&task_id).unwrap();
1017
1018 let info = manager
1020 .cancel(&task_id, Some("User cancelled".into()))
1021 .unwrap();
1022 assert_eq!(info.status, TaskStatus::Cancelled);
1023
1024 assert!(manager.is_cancel_requested(&task_id));
1026
1027 let result = manager.cancel(&task_id, None);
1029 assert!(result.is_err());
1030 }
1031
1032 #[test]
1033 fn test_list_tasks() {
1034 let manager = TaskManager::new_for_testing();
1035 let cx = Cx::for_testing();
1036
1037 manager.register_handler("list_test", |_cx, _params| async {
1038 Ok(serde_json::json!({}))
1039 });
1040
1041 let task1 = manager.submit(&cx, "list_test", None).unwrap();
1042 let task2 = manager.submit(&cx, "list_test", None).unwrap();
1043 let _task3 = manager.submit(&cx, "list_test", None).unwrap();
1044
1045 assert_eq!(manager.list_tasks(Some(TaskStatus::Pending)).len(), 3);
1047 assert_eq!(manager.list_tasks(Some(TaskStatus::Running)).len(), 0);
1048
1049 manager.start_task(&task1).unwrap();
1051 assert_eq!(manager.list_tasks(Some(TaskStatus::Pending)).len(), 2);
1052 assert_eq!(manager.list_tasks(Some(TaskStatus::Running)).len(), 1);
1053
1054 manager.start_task(&task2).unwrap();
1056 manager.complete_task(&task2, serde_json::json!({}));
1057 assert_eq!(manager.list_tasks(Some(TaskStatus::Completed)).len(), 1);
1058
1059 assert_eq!(manager.list_tasks(None).len(), 3);
1061 }
1062
1063 #[test]
1064 fn test_active_count() {
1065 let manager = TaskManager::new_for_testing();
1066 let cx = Cx::for_testing();
1067
1068 manager.register_handler("count_test", |_cx, _params| async {
1069 Ok(serde_json::json!({}))
1070 });
1071
1072 let task1 = manager.submit(&cx, "count_test", None).unwrap();
1073 let task2 = manager.submit(&cx, "count_test", None).unwrap();
1074
1075 assert_eq!(manager.active_count(), 2);
1076 assert_eq!(manager.total_count(), 2);
1077
1078 manager.start_task(&task1).unwrap();
1079 assert_eq!(manager.active_count(), 2);
1080
1081 manager.complete_task(&task1, serde_json::json!({}));
1082 assert_eq!(manager.active_count(), 1);
1083
1084 manager.cancel(&task2, None).unwrap();
1085 assert_eq!(manager.active_count(), 0);
1086 assert_eq!(manager.total_count(), 2);
1087 }
1088
1089 #[test]
1090 fn test_progress_clamping() {
1091 let manager = TaskManager::new_for_testing();
1092 let cx = Cx::for_testing();
1093
1094 manager.register_handler("clamp_test", |_cx, _params| async {
1095 Ok(serde_json::json!({}))
1096 });
1097
1098 let task_id = manager.submit(&cx, "clamp_test", None).unwrap();
1099 manager.start_task(&task_id).unwrap();
1100
1101 manager.update_progress(&task_id, -0.5, None);
1103 assert_eq!(manager.get_info(&task_id).unwrap().progress, Some(0.0));
1104
1105 manager.update_progress(&task_id, 1.5, None);
1106 assert_eq!(manager.get_info(&task_id).unwrap().progress, Some(1.0));
1107
1108 manager.update_progress(&task_id, 0.75, None);
1109 assert_eq!(manager.get_info(&task_id).unwrap().progress, Some(0.75));
1110 }
1111
1112 #[test]
1113 fn test_invalid_transition_rejected() {
1114 let manager = TaskManager::new_for_testing();
1115 let cx = Cx::for_testing();
1116
1117 manager.register_handler("transition_test", |_cx, _params| async {
1118 Ok(serde_json::json!({}))
1119 });
1120
1121 let task_id = manager.submit(&cx, "transition_test", None).unwrap();
1122
1123 manager.complete_task(&task_id, serde_json::json!({"result": "noop"}));
1125 let info = manager.get_info(&task_id).unwrap();
1126 assert_eq!(info.status, TaskStatus::Pending);
1127
1128 manager.start_task(&task_id).unwrap();
1129 manager.complete_task(&task_id, serde_json::json!({"result": "ok"}));
1130 let info = manager.get_info(&task_id).unwrap();
1131 assert_eq!(info.status, TaskStatus::Completed);
1132
1133 let result = manager.start_task(&task_id);
1135 assert!(result.is_err());
1136 }
1137
1138 #[test]
1139 fn test_concurrent_submissions() {
1140 let manager = Arc::new(TaskManager::new_for_testing());
1141 manager.register_handler("concurrent_test", |_cx, _params| async {
1142 Ok(serde_json::json!({}))
1143 });
1144
1145 let mut handles = Vec::new();
1146 for _ in 0..4 {
1147 let manager = Arc::clone(&manager);
1148 handles.push(thread::spawn(move || {
1149 let cx = Cx::for_testing();
1150 for _ in 0..10 {
1151 let _ = manager.submit(&cx, "concurrent_test", None).unwrap();
1152 }
1153 }));
1154 }
1155
1156 for handle in handles {
1157 handle.join().expect("thread join failed");
1158 }
1159
1160 assert_eq!(manager.total_count(), 40);
1161 assert_eq!(manager.list_tasks(Some(TaskStatus::Pending)).len(), 40);
1162 }
1163
1164 #[test]
1165 fn test_task_status_notifications() {
1166 let manager = TaskManager::new_for_testing();
1167 manager.register_handler("notify_test", |_cx, _params| async {
1168 Ok(serde_json::json!({"ok": true}))
1169 });
1170
1171 let events: Arc<std::sync::Mutex<Vec<TaskStatusNotificationParams>>> =
1172 Arc::new(std::sync::Mutex::new(Vec::new()));
1173 let sender_events = Arc::clone(&events);
1174 let sender: TaskNotificationSender = Arc::new(move |request| {
1175 if request.method != "notifications/tasks/status" {
1176 return;
1177 }
1178 let params = request
1179 .params
1180 .as_ref()
1181 .and_then(|value| serde_json::from_value(value.clone()).ok())
1182 .expect("task status params");
1183 sender_events
1184 .lock()
1185 .expect("events lock poisoned")
1186 .push(params);
1187 });
1188 manager.set_notification_sender(sender);
1189
1190 let cx = Cx::for_testing();
1191 let task_id = manager.submit(&cx, "notify_test", None).unwrap();
1192 manager.start_task(&task_id).unwrap();
1193 manager.update_progress(&task_id, 0.5, Some("half".to_string()));
1194 manager.complete_task(&task_id, serde_json::json!({"result": 1}));
1195
1196 let recorded = events.lock().expect("events lock poisoned").clone();
1197 assert!(!recorded.is_empty(), "expected task status notifications");
1198 assert_eq!(recorded[0].id, task_id);
1199 assert_eq!(recorded[0].status, TaskStatus::Pending);
1200 assert_eq!(recorded[1].status, TaskStatus::Running);
1201 assert_eq!(recorded[2].progress, Some(0.5));
1202 assert_eq!(recorded.last().expect("last").status, TaskStatus::Completed);
1203 }
1204
1205 #[test]
1208 fn can_transition_valid_pairs() {
1209 assert!(can_transition(TaskStatus::Pending, TaskStatus::Running));
1210 assert!(can_transition(TaskStatus::Pending, TaskStatus::Cancelled));
1211 assert!(can_transition(TaskStatus::Running, TaskStatus::Completed));
1212 assert!(can_transition(TaskStatus::Running, TaskStatus::Failed));
1213 assert!(can_transition(TaskStatus::Running, TaskStatus::Cancelled));
1214 }
1215
1216 #[test]
1217 fn can_transition_invalid_pairs() {
1218 assert!(!can_transition(TaskStatus::Pending, TaskStatus::Completed));
1219 assert!(!can_transition(TaskStatus::Pending, TaskStatus::Failed));
1220 assert!(!can_transition(TaskStatus::Completed, TaskStatus::Running));
1221 assert!(!can_transition(TaskStatus::Completed, TaskStatus::Pending));
1222 assert!(!can_transition(
1223 TaskStatus::Completed,
1224 TaskStatus::Cancelled
1225 ));
1226 assert!(!can_transition(TaskStatus::Failed, TaskStatus::Running));
1227 assert!(!can_transition(TaskStatus::Cancelled, TaskStatus::Running));
1228 }
1229
1230 #[test]
1233 fn default_creates_empty_manager() {
1234 let manager = TaskManager::default();
1235 assert_eq!(manager.total_count(), 0);
1236 assert!(!manager.has_list_changed_notifications());
1237 }
1238
1239 #[test]
1240 fn new_for_testing_disables_auto_execute() {
1241 let manager = TaskManager::new_for_testing();
1242 assert!(!manager.auto_execute);
1243 }
1244
1245 #[test]
1246 fn into_shared_returns_arc() {
1247 let manager = TaskManager::new_for_testing();
1248 let shared: SharedTaskManager = manager.into_shared();
1249 assert_eq!(shared.total_count(), 0);
1250 }
1251
1252 #[test]
1253 fn debug_output_contains_fields() {
1254 let manager = TaskManager::new_for_testing();
1255 let debug = format!("{:?}", manager);
1256 assert!(debug.contains("TaskManager"));
1257 assert!(debug.contains("task_count"));
1258 assert!(debug.contains("handler_count"));
1259 assert!(debug.contains("task_counter"));
1260 assert!(debug.contains("list_changed_notifications"));
1261 assert!(debug.contains("auto_execute"));
1262 }
1263
1264 #[test]
1267 fn get_info_nonexistent_returns_none() {
1268 let manager = TaskManager::new_for_testing();
1269 let fake_id = TaskId::from_string("nonexistent".to_string());
1270 assert!(manager.get_info(&fake_id).is_none());
1271 }
1272
1273 #[test]
1274 fn get_result_nonexistent_returns_none() {
1275 let manager = TaskManager::new_for_testing();
1276 let fake_id = TaskId::from_string("nonexistent".to_string());
1277 assert!(manager.get_result(&fake_id).is_none());
1278 }
1279
1280 #[test]
1281 fn get_result_pending_task_returns_none() {
1282 let manager = TaskManager::new_for_testing();
1283 let cx = Cx::for_testing();
1284 manager.register_handler("t", |_cx, _params| async { Ok(serde_json::json!({})) });
1285 let id = manager.submit(&cx, "t", None).unwrap();
1286 assert!(manager.get_result(&id).is_none());
1287 }
1288
1289 #[test]
1292 fn is_cancel_requested_nonexistent_returns_false() {
1293 let manager = TaskManager::new_for_testing();
1294 let fake_id = TaskId::from_string("nonexistent".to_string());
1295 assert!(!manager.is_cancel_requested(&fake_id));
1296 }
1297
1298 #[test]
1299 fn is_cancel_requested_before_cancel_returns_false() {
1300 let manager = TaskManager::new_for_testing();
1301 let cx = Cx::for_testing();
1302 manager.register_handler("t", |_cx, _params| async { Ok(serde_json::json!({})) });
1303 let id = manager.submit(&cx, "t", None).unwrap();
1304 assert!(!manager.is_cancel_requested(&id));
1305 }
1306
1307 #[test]
1310 fn update_progress_on_pending_task_is_ignored() {
1311 let manager = TaskManager::new_for_testing();
1312 let cx = Cx::for_testing();
1313 manager.register_handler("t", |_cx, _params| async { Ok(serde_json::json!({})) });
1314 let id = manager.submit(&cx, "t", None).unwrap();
1315 manager.update_progress(&id, 0.5, Some("test".to_string()));
1317 let info = manager.get_info(&id).unwrap();
1318 assert!(info.progress.is_none());
1319 }
1320
1321 #[test]
1322 fn update_progress_on_completed_task_is_ignored() {
1323 let manager = TaskManager::new_for_testing();
1324 let cx = Cx::for_testing();
1325 manager.register_handler("t", |_cx, _params| async { Ok(serde_json::json!({})) });
1326 let id = manager.submit(&cx, "t", None).unwrap();
1327 manager.start_task(&id).unwrap();
1328 manager.complete_task(&id, serde_json::json!({}));
1329 manager.update_progress(&id, 0.1, None);
1331 let info = manager.get_info(&id).unwrap();
1332 assert_eq!(info.progress, Some(1.0)); }
1334
1335 #[test]
1338 fn complete_task_nonexistent_does_not_panic() {
1339 let manager = TaskManager::new_for_testing();
1340 let fake_id = TaskId::from_string("nonexistent".to_string());
1341 manager.complete_task(&fake_id, serde_json::json!({})); }
1343
1344 #[test]
1345 fn fail_task_nonexistent_does_not_panic() {
1346 let manager = TaskManager::new_for_testing();
1347 let fake_id = TaskId::from_string("nonexistent".to_string());
1348 manager.fail_task(&fake_id, "error"); }
1350
1351 #[test]
1354 fn cancel_nonexistent_task_returns_error() {
1355 let manager = TaskManager::new_for_testing();
1356 let fake_id = TaskId::from_string("nonexistent".to_string());
1357 let err = manager.cancel(&fake_id, None).unwrap_err();
1358 assert!(err.message.contains("not found"));
1359 }
1360
1361 #[test]
1362 fn cancel_pending_task_directly() {
1363 let manager = TaskManager::new_for_testing();
1364 let cx = Cx::for_testing();
1365 manager.register_handler("t", |_cx, _params| async { Ok(serde_json::json!({})) });
1366 let id = manager.submit(&cx, "t", None).unwrap();
1367 let info = manager.cancel(&id, None).unwrap();
1369 assert_eq!(info.status, TaskStatus::Cancelled);
1370 assert!(manager.is_cancel_requested(&id));
1371 }
1372
1373 #[test]
1374 fn cancel_with_default_reason() {
1375 let manager = TaskManager::new_for_testing();
1376 let cx = Cx::for_testing();
1377 manager.register_handler("t", |_cx, _params| async { Ok(serde_json::json!({})) });
1378 let id = manager.submit(&cx, "t", None).unwrap();
1379 let info = manager.cancel(&id, None).unwrap();
1380 assert_eq!(info.error, Some("Cancelled by request".to_string()));
1381 }
1382
1383 #[test]
1386 fn task_ids_are_sequential() {
1387 let manager = TaskManager::new_for_testing();
1388 let cx = Cx::for_testing();
1389 manager.register_handler("t", |_cx, _params| async { Ok(serde_json::json!({})) });
1390 let id1 = manager.submit(&cx, "t", None).unwrap();
1391 let id2 = manager.submit(&cx, "t", None).unwrap();
1392 assert_ne!(id1, id2);
1393 assert!(id1.0.starts_with("task-"));
1394 assert!(id2.0.starts_with("task-"));
1395 }
1396
1397 #[test]
1400 fn start_task_nonexistent_returns_error() {
1401 let manager = TaskManager::new_for_testing();
1402 let fake_id = TaskId::from_string("nonexistent".to_string());
1403 let err = manager.start_task(&fake_id).unwrap_err();
1404 assert!(err.message.contains("not found"));
1405 }
1406
1407 #[test]
1408 fn start_task_already_running_returns_error() {
1409 let manager = TaskManager::new_for_testing();
1410 let cx = Cx::for_testing();
1411 manager.register_handler("t", |_cx, _params| async { Ok(serde_json::json!({})) });
1412 let id = manager.submit(&cx, "t", None).unwrap();
1413 manager.start_task(&id).unwrap();
1414 let err = manager.start_task(&id).unwrap_err();
1415 assert!(err.message.contains("not pending"));
1416 }
1417
1418 #[test]
1421 fn cleanup_completed_removes_old_terminal_tasks() {
1422 let manager = TaskManager::new_for_testing();
1423 let cx = Cx::for_testing();
1424 manager.register_handler("t", |_cx, _params| async { Ok(serde_json::json!({})) });
1425
1426 let id = manager.submit(&cx, "t", None).unwrap();
1427 manager.start_task(&id).unwrap();
1428 manager.complete_task(&id, serde_json::json!({}));
1429 assert_eq!(manager.total_count(), 1);
1430
1431 manager.cleanup_completed(std::time::Duration::from_secs(0));
1433 assert_eq!(manager.total_count(), 0);
1434 }
1435
1436 #[test]
1437 fn cleanup_completed_keeps_active_tasks() {
1438 let manager = TaskManager::new_for_testing();
1439 let cx = Cx::for_testing();
1440 manager.register_handler("t", |_cx, _params| async { Ok(serde_json::json!({})) });
1441
1442 let id1 = manager.submit(&cx, "t", None).unwrap();
1443 let id2 = manager.submit(&cx, "t", None).unwrap();
1444 manager.start_task(&id1).unwrap();
1445 manager.complete_task(&id1, serde_json::json!({}));
1446 manager.cleanup_completed(std::time::Duration::from_secs(0));
1449 assert_eq!(manager.total_count(), 1); assert!(manager.get_info(&id2).is_some());
1451 }
1452
1453 #[test]
1454 fn cleanup_completed_keeps_recent_tasks() {
1455 let manager = TaskManager::new_for_testing();
1456 let cx = Cx::for_testing();
1457 manager.register_handler("t", |_cx, _params| async { Ok(serde_json::json!({})) });
1458
1459 let id = manager.submit(&cx, "t", None).unwrap();
1460 manager.start_task(&id).unwrap();
1461 manager.complete_task(&id, serde_json::json!({}));
1462
1463 manager.cleanup_completed(std::time::Duration::from_secs(3600));
1465 assert_eq!(manager.total_count(), 1);
1466 }
1467
1468 #[test]
1471 fn transition_same_state_returns_true() {
1472 let task_id = TaskId::from_string("test".to_string());
1474 let mut state = TaskState {
1475 info: TaskInfo {
1476 id: task_id,
1477 task_type: "t".to_string(),
1478 status: TaskStatus::Running,
1479 progress: None,
1480 message: None,
1481 created_at: String::new(),
1482 started_at: None,
1483 completed_at: None,
1484 error: None,
1485 },
1486 cancel_requested: false,
1487 result: None,
1488 cx: Cx::for_testing(),
1489 };
1490 assert!(transition_state(&mut state, TaskStatus::Running));
1492 }
1493
1494 #[test]
1497 fn submit_with_none_params_creates_task() {
1498 let manager = TaskManager::new_for_testing();
1499 let cx = Cx::for_testing();
1500 manager.register_handler("t", |_cx, _params| async { Ok(serde_json::json!({})) });
1501 let id = manager.submit(&cx, "t", None).unwrap();
1502 let info = manager.get_info(&id).unwrap();
1503 assert_eq!(info.task_type, "t");
1504 assert_eq!(info.status, TaskStatus::Pending);
1505 assert!(info.started_at.is_none());
1506 assert!(info.completed_at.is_none());
1507 assert!(info.error.is_none());
1508 }
1509
1510 #[test]
1511 fn submit_with_some_params_creates_task() {
1512 let manager = TaskManager::new_for_testing();
1513 let cx = Cx::for_testing();
1514 manager.register_handler("t", |_cx, _params| async { Ok(serde_json::json!({})) });
1515 let id = manager
1516 .submit(&cx, "t", Some(serde_json::json!({"key": "value"})))
1517 .unwrap();
1518 assert!(manager.get_info(&id).is_some());
1519 }
1520
1521 #[test]
1524 fn fail_task_sets_error_result() {
1525 let manager = TaskManager::new_for_testing();
1526 let cx = Cx::for_testing();
1527 manager.register_handler("t", |_cx, _params| async { Ok(serde_json::json!({})) });
1528 let id = manager.submit(&cx, "t", None).unwrap();
1529 manager.start_task(&id).unwrap();
1530 manager.fail_task(&id, "boom");
1531 let result = manager.get_result(&id).unwrap();
1532 assert!(!result.success);
1533 assert_eq!(result.error, Some("boom".to_string()));
1534 assert!(result.data.is_none());
1535 }
1536
1537 #[test]
1540 fn update_progress_nonexistent_does_not_panic() {
1541 let manager = TaskManager::new_for_testing();
1542 let fake_id = TaskId::from_string("nonexistent".to_string());
1543 manager.update_progress(&fake_id, 0.5, None); }
1545
1546 #[test]
1549 fn fail_task_on_completed_is_ignored() {
1550 let manager = TaskManager::new_for_testing();
1551 let cx = Cx::for_testing();
1552 manager.register_handler("t", |_cx, _params| async { Ok(serde_json::json!({})) });
1553 let id = manager.submit(&cx, "t", None).unwrap();
1554 manager.start_task(&id).unwrap();
1555 manager.complete_task(&id, serde_json::json!({"done": true}));
1556 manager.fail_task(&id, "too late");
1558 let info = manager.get_info(&id).unwrap();
1559 assert_eq!(info.status, TaskStatus::Completed);
1560 let result = manager.get_result(&id).unwrap();
1561 assert!(result.success);
1562 }
1563
1564 #[test]
1567 fn complete_task_on_failed_is_ignored() {
1568 let manager = TaskManager::new_for_testing();
1569 let cx = Cx::for_testing();
1570 manager.register_handler("t", |_cx, _params| async { Ok(serde_json::json!({})) });
1571 let id = manager.submit(&cx, "t", None).unwrap();
1572 manager.start_task(&id).unwrap();
1573 manager.fail_task(&id, "something broke");
1574 manager.complete_task(&id, serde_json::json!({"late": true}));
1576 let info = manager.get_info(&id).unwrap();
1577 assert_eq!(info.status, TaskStatus::Failed);
1578 let result = manager.get_result(&id).unwrap();
1579 assert!(!result.success);
1580 }
1581
1582 #[test]
1585 fn register_handler_replaces_existing() {
1586 let manager = TaskManager::new_for_testing();
1587 manager.register_handler("t", |_cx, _params| async {
1588 Ok(serde_json::json!({"v": 1}))
1589 });
1590 manager.register_handler("t", |_cx, _params| async {
1591 Ok(serde_json::json!({"v": 2}))
1592 });
1593 let cx = Cx::for_testing();
1595 let id = manager.submit(&cx, "t", None).unwrap();
1596 assert!(manager.get_info(&id).is_some());
1597 }
1598
1599 #[test]
1602 fn transition_to_running_sets_started_at() {
1603 let task_id = TaskId::from_string("ts-test".to_string());
1604 let mut state = TaskState {
1605 info: TaskInfo {
1606 id: task_id,
1607 task_type: "t".to_string(),
1608 status: TaskStatus::Pending,
1609 progress: None,
1610 message: None,
1611 created_at: String::new(),
1612 started_at: None,
1613 completed_at: None,
1614 error: None,
1615 },
1616 cancel_requested: false,
1617 result: None,
1618 cx: Cx::for_testing(),
1619 };
1620 assert!(state.info.started_at.is_none());
1621 assert!(transition_state(&mut state, TaskStatus::Running));
1622 assert!(state.info.started_at.is_some());
1623 }
1624
1625 #[test]
1626 fn transition_to_completed_sets_completed_at() {
1627 let task_id = TaskId::from_string("ts-test".to_string());
1628 let mut state = TaskState {
1629 info: TaskInfo {
1630 id: task_id,
1631 task_type: "t".to_string(),
1632 status: TaskStatus::Running,
1633 progress: None,
1634 message: None,
1635 created_at: String::new(),
1636 started_at: Some("earlier".to_string()),
1637 completed_at: None,
1638 error: None,
1639 },
1640 cancel_requested: false,
1641 result: None,
1642 cx: Cx::for_testing(),
1643 };
1644 assert!(state.info.completed_at.is_none());
1645 assert!(transition_state(&mut state, TaskStatus::Completed));
1646 assert!(state.info.completed_at.is_some());
1647 }
1648
1649 #[test]
1650 fn transition_to_failed_sets_completed_at() {
1651 let task_id = TaskId::from_string("ts-test".to_string());
1652 let mut state = TaskState {
1653 info: TaskInfo {
1654 id: task_id,
1655 task_type: "t".to_string(),
1656 status: TaskStatus::Running,
1657 progress: None,
1658 message: None,
1659 created_at: String::new(),
1660 started_at: Some("earlier".to_string()),
1661 completed_at: None,
1662 error: None,
1663 },
1664 cancel_requested: false,
1665 result: None,
1666 cx: Cx::for_testing(),
1667 };
1668 assert!(transition_state(&mut state, TaskStatus::Failed));
1669 assert!(state.info.completed_at.is_some());
1670 }
1671
1672 #[test]
1673 fn transition_to_cancelled_sets_completed_at() {
1674 let task_id = TaskId::from_string("ts-test".to_string());
1675 let mut state = TaskState {
1676 info: TaskInfo {
1677 id: task_id,
1678 task_type: "t".to_string(),
1679 status: TaskStatus::Running,
1680 progress: None,
1681 message: None,
1682 created_at: String::new(),
1683 started_at: Some("earlier".to_string()),
1684 completed_at: None,
1685 error: None,
1686 },
1687 cancel_requested: false,
1688 result: None,
1689 cx: Cx::for_testing(),
1690 };
1691 assert!(transition_state(&mut state, TaskStatus::Cancelled));
1692 assert!(state.info.completed_at.is_some());
1693 }
1694
1695 #[test]
1696 fn transition_invalid_returns_false() {
1697 let task_id = TaskId::from_string("ts-test".to_string());
1698 let mut state = TaskState {
1699 info: TaskInfo {
1700 id: task_id,
1701 task_type: "t".to_string(),
1702 status: TaskStatus::Pending,
1703 progress: None,
1704 message: None,
1705 created_at: String::new(),
1706 started_at: None,
1707 completed_at: None,
1708 error: None,
1709 },
1710 cancel_requested: false,
1711 result: None,
1712 cx: Cx::for_testing(),
1713 };
1714 assert!(!transition_state(&mut state, TaskStatus::Completed));
1716 assert_eq!(state.info.status, TaskStatus::Pending);
1718 }
1719
1720 #[test]
1723 fn task_status_snapshot_debug_and_clone() {
1724 let task_id = TaskId::from_string("snap-test".to_string());
1725 let state = TaskState {
1726 info: TaskInfo {
1727 id: task_id,
1728 task_type: "t".to_string(),
1729 status: TaskStatus::Running,
1730 progress: Some(0.5),
1731 message: Some("testing".to_string()),
1732 created_at: "now".to_string(),
1733 started_at: Some("now".to_string()),
1734 completed_at: None,
1735 error: None,
1736 },
1737 cancel_requested: false,
1738 result: None,
1739 cx: Cx::for_testing(),
1740 };
1741 let snapshot = TaskStatusSnapshot::from(&state);
1742 let debug = format!("{:?}", snapshot);
1743 assert!(debug.contains("TaskStatusSnapshot"));
1744 let cloned = snapshot.clone();
1745 assert_eq!(cloned.info.status, TaskStatus::Running);
1746 assert!(cloned.result.is_none());
1747 }
1748
1749 #[test]
1752 fn cleanup_completed_removes_failed_and_cancelled() {
1753 let manager = TaskManager::new_for_testing();
1754 let cx = Cx::for_testing();
1755 manager.register_handler("t", |_cx, _params| async { Ok(serde_json::json!({})) });
1756
1757 let id1 = manager.submit(&cx, "t", None).unwrap();
1758 let id2 = manager.submit(&cx, "t", None).unwrap();
1759 let id3 = manager.submit(&cx, "t", None).unwrap();
1760
1761 manager.start_task(&id1).unwrap();
1763 manager.complete_task(&id1, serde_json::json!({}));
1764
1765 manager.start_task(&id2).unwrap();
1767 manager.fail_task(&id2, "error");
1768
1769 manager.cancel(&id3, None).unwrap();
1771
1772 assert_eq!(manager.total_count(), 3);
1773
1774 manager.cleanup_completed(std::time::Duration::from_secs(0));
1776 assert_eq!(manager.total_count(), 0);
1777 }
1778
1779 #[test]
1782 fn set_notification_sender_replaces_existing() {
1783 let manager = TaskManager::new_for_testing();
1784 manager.register_handler("t", |_cx, _params| async { Ok(serde_json::json!({})) });
1785
1786 let count1 = Arc::new(std::sync::atomic::AtomicUsize::new(0));
1787 let count2 = Arc::new(std::sync::atomic::AtomicUsize::new(0));
1788
1789 let c1 = Arc::clone(&count1);
1790 let sender1: TaskNotificationSender = Arc::new(move |_| {
1791 c1.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
1792 });
1793 manager.set_notification_sender(sender1);
1794
1795 let cx = Cx::for_testing();
1796 let _id1 = manager.submit(&cx, "t", None).unwrap();
1797 assert!(count1.load(std::sync::atomic::Ordering::SeqCst) > 0);
1798
1799 let c2 = Arc::clone(&count2);
1801 let sender2: TaskNotificationSender = Arc::new(move |_| {
1802 c2.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
1803 });
1804 manager.set_notification_sender(sender2);
1805
1806 let _id2 = manager.submit(&cx, "t", None).unwrap();
1807 assert!(count2.load(std::sync::atomic::Ordering::SeqCst) > 0);
1808 }
1809
1810 #[test]
1813 fn cancel_with_custom_reason() {
1814 let manager = TaskManager::new_for_testing();
1815 let cx = Cx::for_testing();
1816 manager.register_handler("t", |_cx, _params| async { Ok(serde_json::json!({})) });
1817 let id = manager.submit(&cx, "t", None).unwrap();
1818 manager.start_task(&id).unwrap();
1819 let info = manager.cancel(&id, Some("Timeout".to_string())).unwrap();
1820 assert_eq!(info.error, Some("Timeout".to_string()));
1821 let result = manager.get_result(&id).unwrap();
1822 assert_eq!(result.error, Some("Timeout".to_string()));
1823 }
1824
1825 #[test]
1828 fn can_transition_self_is_false() {
1829 assert!(!can_transition(TaskStatus::Pending, TaskStatus::Pending));
1832 assert!(!can_transition(TaskStatus::Running, TaskStatus::Running));
1833 assert!(!can_transition(
1834 TaskStatus::Completed,
1835 TaskStatus::Completed
1836 ));
1837 assert!(!can_transition(TaskStatus::Failed, TaskStatus::Failed));
1838 assert!(!can_transition(
1839 TaskStatus::Cancelled,
1840 TaskStatus::Cancelled
1841 ));
1842 }
1843
1844 #[test]
1847 fn transition_state_identity_pending_returns_true() {
1848 let task_id = TaskId::from_string("identity-test".to_string());
1849 let mut state = TaskState {
1850 info: TaskInfo {
1851 id: task_id,
1852 task_type: "t".to_string(),
1853 status: TaskStatus::Pending,
1854 progress: None,
1855 message: None,
1856 created_at: String::new(),
1857 started_at: None,
1858 completed_at: None,
1859 error: None,
1860 },
1861 cancel_requested: false,
1862 result: None,
1863 cx: Cx::for_testing(),
1864 };
1865 assert!(transition_state(&mut state, TaskStatus::Pending));
1866 assert_eq!(state.info.status, TaskStatus::Pending);
1867 }
1868
1869 #[test]
1872 fn list_tasks_no_filter_returns_all() {
1873 let manager = TaskManager::new_for_testing();
1874 let cx = Cx::for_testing();
1875 manager.register_handler("t", |_cx, _params| async { Ok(serde_json::json!({})) });
1876 let id1 = manager.submit(&cx, "t", None).unwrap();
1877 let _id2 = manager.submit(&cx, "t", None).unwrap();
1878 manager.start_task(&id1).unwrap();
1879 manager.complete_task(&id1, serde_json::json!({}));
1880 let all = manager.list_tasks(None);
1882 assert_eq!(all.len(), 2);
1883 }
1884
1885 #[test]
1888 fn cancel_notification_includes_error_and_result() {
1889 let manager = TaskManager::new_for_testing();
1890 manager.register_handler("t", |_cx, _params| async { Ok(serde_json::json!({})) });
1891
1892 let events: Arc<std::sync::Mutex<Vec<TaskStatusNotificationParams>>> =
1893 Arc::new(std::sync::Mutex::new(Vec::new()));
1894 let sender_events = Arc::clone(&events);
1895 let sender: TaskNotificationSender = Arc::new(move |request| {
1896 if request.method == "notifications/tasks/status" {
1897 let params: TaskStatusNotificationParams = request
1898 .params
1899 .as_ref()
1900 .and_then(|v| serde_json::from_value(v.clone()).ok())
1901 .unwrap();
1902 sender_events.lock().unwrap().push(params);
1903 }
1904 });
1905 manager.set_notification_sender(sender);
1906
1907 let cx = Cx::for_testing();
1908 let id = manager.submit(&cx, "t", None).unwrap();
1909 manager.cancel(&id, Some("user abort".to_string())).unwrap();
1910
1911 let recorded = events.lock().unwrap().clone();
1912 let last = recorded.last().unwrap();
1914 assert_eq!(last.status, TaskStatus::Cancelled);
1915 assert_eq!(last.error, Some("user abort".to_string()));
1916 assert!(last.result.is_some());
1917 let result = last.result.as_ref().unwrap();
1918 assert!(!result.success);
1919 }
1920
1921 #[test]
1924 fn complete_task_sets_progress_to_one() {
1925 let manager = TaskManager::new_for_testing();
1926 let cx = Cx::for_testing();
1927 manager.register_handler("t", |_cx, _params| async { Ok(serde_json::json!({})) });
1928 let id = manager.submit(&cx, "t", None).unwrap();
1929 manager.start_task(&id).unwrap();
1930 manager.update_progress(&id, 0.5, None);
1931 manager.complete_task(&id, serde_json::json!({}));
1932 let info = manager.get_info(&id).unwrap();
1933 assert_eq!(info.progress, Some(1.0));
1934 }
1935
1936 #[test]
1939 fn cleanup_completed_keeps_terminal_without_completed_at() {
1940 let manager = TaskManager::new_for_testing();
1941 let cx = Cx::for_testing();
1942 manager.register_handler("t", |_cx, _params| async { Ok(serde_json::json!({})) });
1943 let id = manager.submit(&cx, "t", None).unwrap();
1944 manager.start_task(&id).unwrap();
1945 manager.complete_task(&id, serde_json::json!({}));
1946
1947 {
1949 let mut tasks = manager.tasks.write().unwrap();
1950 tasks.get_mut(&id).unwrap().info.completed_at = None;
1951 }
1952
1953 manager.cleanup_completed(std::time::Duration::from_secs(0));
1955 assert_eq!(manager.total_count(), 1);
1956 }
1957
1958 #[test]
1959 fn cleanup_completed_keeps_terminal_with_unparseable_timestamp() {
1960 let manager = TaskManager::new_for_testing();
1961 let cx = Cx::for_testing();
1962 manager.register_handler("t", |_cx, _params| async { Ok(serde_json::json!({})) });
1963 let id = manager.submit(&cx, "t", None).unwrap();
1964 manager.start_task(&id).unwrap();
1965 manager.complete_task(&id, serde_json::json!({}));
1966
1967 {
1969 let mut tasks = manager.tasks.write().unwrap();
1970 tasks.get_mut(&id).unwrap().info.completed_at = Some("not-a-date".to_string());
1971 }
1972
1973 manager.cleanup_completed(std::time::Duration::from_secs(0));
1974 assert_eq!(manager.total_count(), 1);
1975 }
1976
1977 #[test]
1980 fn debug_output_with_tasks_and_handlers() {
1981 let manager = TaskManager::new_for_testing();
1982 manager.register_handler("type_a", |_cx, _params| async { Ok(serde_json::json!({})) });
1983 manager.register_handler("type_b", |_cx, _params| async { Ok(serde_json::json!({})) });
1984 let cx = Cx::for_testing();
1985 let _ = manager.submit(&cx, "type_a", None).unwrap();
1986 let _ = manager.submit(&cx, "type_b", None).unwrap();
1987
1988 let debug = format!("{:?}", manager);
1989 assert!(debug.contains("task_count: 2"));
1990 assert!(debug.contains("handler_count: 2"));
1991 }
1992
1993 #[test]
1996 fn multiple_handler_types_independent() {
1997 let manager = TaskManager::new_for_testing();
1998 let cx = Cx::for_testing();
1999 manager.register_handler("analyze", |_cx, _params| async {
2000 Ok(serde_json::json!({"type": "analyze"}))
2001 });
2002 manager.register_handler("summarize", |_cx, _params| async {
2003 Ok(serde_json::json!({"type": "summarize"}))
2004 });
2005
2006 let id_a = manager.submit(&cx, "analyze", None).unwrap();
2007 let id_s = manager.submit(&cx, "summarize", None).unwrap();
2008
2009 let info_a = manager.get_info(&id_a).unwrap();
2010 let info_s = manager.get_info(&id_s).unwrap();
2011 assert_eq!(info_a.task_type, "analyze");
2012 assert_eq!(info_s.task_type, "summarize");
2013 }
2014
2015 #[test]
2018 fn list_tasks_filter_failed() {
2019 let manager = TaskManager::new_for_testing();
2020 let cx = Cx::for_testing();
2021 manager.register_handler("t", |_cx, _params| async { Ok(serde_json::json!({})) });
2022
2023 let id = manager.submit(&cx, "t", None).unwrap();
2024 manager.start_task(&id).unwrap();
2025 manager.fail_task(&id, "err");
2026
2027 assert_eq!(manager.list_tasks(Some(TaskStatus::Failed)).len(), 1);
2028 assert_eq!(manager.list_tasks(Some(TaskStatus::Completed)).len(), 0);
2029 }
2030
2031 #[test]
2032 fn list_tasks_filter_cancelled() {
2033 let manager = TaskManager::new_for_testing();
2034 let cx = Cx::for_testing();
2035 manager.register_handler("t", |_cx, _params| async { Ok(serde_json::json!({})) });
2036
2037 let id = manager.submit(&cx, "t", None).unwrap();
2038 manager.cancel(&id, None).unwrap();
2039
2040 assert_eq!(manager.list_tasks(Some(TaskStatus::Cancelled)).len(), 1);
2041 assert_eq!(manager.list_tasks(Some(TaskStatus::Pending)).len(), 0);
2042 }
2043
2044 #[test]
2047 fn progress_notification_includes_message() {
2048 let manager = TaskManager::new_for_testing();
2049 manager.register_handler("t", |_cx, _params| async { Ok(serde_json::json!({})) });
2050
2051 let events: Arc<std::sync::Mutex<Vec<TaskStatusNotificationParams>>> =
2052 Arc::new(std::sync::Mutex::new(Vec::new()));
2053 let sender_events = Arc::clone(&events);
2054 let sender: TaskNotificationSender = Arc::new(move |request| {
2055 if request.method == "notifications/tasks/status" {
2056 let params: TaskStatusNotificationParams = request
2057 .params
2058 .as_ref()
2059 .and_then(|v| serde_json::from_value(v.clone()).ok())
2060 .unwrap();
2061 sender_events.lock().unwrap().push(params);
2062 }
2063 });
2064 manager.set_notification_sender(sender);
2065
2066 let cx = Cx::for_testing();
2067 let id = manager.submit(&cx, "t", None).unwrap();
2068 manager.start_task(&id).unwrap();
2069 manager.update_progress(&id, 0.75, Some("three quarters".to_string()));
2070
2071 let recorded = events.lock().unwrap().clone();
2072 let progress_event = recorded
2073 .iter()
2074 .find(|e| e.progress == Some(0.75))
2075 .expect("progress notification");
2076 assert_eq!(progress_event.message, Some("three quarters".to_string()));
2077 assert_eq!(progress_event.status, TaskStatus::Running);
2078 }
2079
2080 #[test]
2083 fn task_status_snapshot_includes_result() {
2084 let task_id = TaskId::from_string("snap-result");
2085 let state = TaskState {
2086 info: TaskInfo {
2087 id: task_id.clone(),
2088 task_type: "t".to_string(),
2089 status: TaskStatus::Completed,
2090 progress: Some(1.0),
2091 message: None,
2092 created_at: "now".to_string(),
2093 started_at: Some("now".to_string()),
2094 completed_at: Some("now".to_string()),
2095 error: None,
2096 },
2097 cancel_requested: false,
2098 result: Some(TaskResult {
2099 id: task_id,
2100 success: true,
2101 data: Some(serde_json::json!({"done": true})),
2102 error: None,
2103 }),
2104 cx: Cx::for_testing(),
2105 };
2106 let snapshot = TaskStatusSnapshot::from(&state);
2107 assert!(snapshot.result.is_some());
2108 let result = snapshot.result.unwrap();
2109 assert!(result.success);
2110 assert_eq!(result.data, Some(serde_json::json!({"done": true})));
2111 }
2112
2113 #[test]
2116 fn submit_unknown_task_type_error_message() {
2117 let manager = TaskManager::new_for_testing();
2118 let cx = Cx::for_testing();
2119 let err = manager.submit(&cx, "nonexistent_type", None).unwrap_err();
2120 assert!(err.message.contains("Unknown task type"));
2121 assert!(err.message.contains("nonexistent_type"));
2122 }
2123
2124 #[test]
2127 fn cancel_result_has_no_data() {
2128 let manager = TaskManager::new_for_testing();
2129 let cx = Cx::for_testing();
2130 manager.register_handler("t", |_cx, _params| async { Ok(serde_json::json!({})) });
2131 let id = manager.submit(&cx, "t", None).unwrap();
2132 manager.start_task(&id).unwrap();
2133 manager.cancel(&id, Some("abort".to_string())).unwrap();
2134 let result = manager.get_result(&id).unwrap();
2135 assert!(!result.success);
2136 assert!(result.data.is_none());
2137 assert_eq!(result.error, Some("abort".to_string()));
2138 }
2139
2140 #[test]
2143 fn cancel_completed_task_returns_error() {
2144 let manager = TaskManager::new_for_testing();
2145 let cx = Cx::for_testing();
2146 manager.register_handler("t", |_cx, _params| async { Ok(serde_json::json!({})) });
2147 let id = manager.submit(&cx, "t", None).unwrap();
2148 manager.start_task(&id).unwrap();
2149 manager.complete_task(&id, serde_json::json!({}));
2150 let err = manager.cancel(&id, None).unwrap_err();
2151 assert!(err.message.contains("terminal"));
2152 }
2153
2154 #[test]
2155 fn cancel_failed_task_returns_error() {
2156 let manager = TaskManager::new_for_testing();
2157 let cx = Cx::for_testing();
2158 manager.register_handler("t", |_cx, _params| async { Ok(serde_json::json!({})) });
2159 let id = manager.submit(&cx, "t", None).unwrap();
2160 manager.start_task(&id).unwrap();
2161 manager.fail_task(&id, "broke");
2162 let err = manager.cancel(&id, None).unwrap_err();
2163 assert!(err.message.contains("terminal"));
2164 }
2165
2166 #[test]
2167 fn fail_task_on_pending_is_ignored() {
2168 let manager = TaskManager::new_for_testing();
2169 let cx = Cx::for_testing();
2170 manager.register_handler("t", |_cx, _params| async { Ok(serde_json::json!({})) });
2171 let id = manager.submit(&cx, "t", None).unwrap();
2172 manager.fail_task(&id, "too early");
2174 let info = manager.get_info(&id).unwrap();
2175 assert_eq!(info.status, TaskStatus::Pending);
2176 assert!(manager.get_result(&id).is_none());
2177 }
2178
2179 #[test]
2180 fn complete_task_on_cancelled_is_ignored() {
2181 let manager = TaskManager::new_for_testing();
2182 let cx = Cx::for_testing();
2183 manager.register_handler("t", |_cx, _params| async { Ok(serde_json::json!({})) });
2184 let id = manager.submit(&cx, "t", None).unwrap();
2185 manager.start_task(&id).unwrap();
2186 manager.cancel(&id, Some("aborted".to_string())).unwrap();
2187 manager.complete_task(&id, serde_json::json!({"late": true}));
2189 let info = manager.get_info(&id).unwrap();
2190 assert_eq!(info.status, TaskStatus::Cancelled);
2191 }
2192
2193 #[test]
2194 fn update_progress_none_message_clears_previous() {
2195 let manager = TaskManager::new_for_testing();
2196 let cx = Cx::for_testing();
2197 manager.register_handler("t", |_cx, _params| async { Ok(serde_json::json!({})) });
2198 let id = manager.submit(&cx, "t", None).unwrap();
2199 manager.start_task(&id).unwrap();
2200 manager.update_progress(&id, 0.3, Some("step 1".to_string()));
2201 assert_eq!(
2202 manager.get_info(&id).unwrap().message,
2203 Some("step 1".to_string())
2204 );
2205 manager.update_progress(&id, 0.6, None);
2206 assert!(manager.get_info(&id).unwrap().message.is_none());
2207 }
2208
2209 #[test]
2210 fn no_notification_sender_does_not_panic() {
2211 let manager = TaskManager::new_for_testing();
2212 let cx = Cx::for_testing();
2213 manager.register_handler("t", |_cx, _params| async { Ok(serde_json::json!({})) });
2214 let id = manager.submit(&cx, "t", None).unwrap();
2216 manager.start_task(&id).unwrap();
2217 manager.update_progress(&id, 0.5, None);
2218 manager.complete_task(&id, serde_json::json!({}));
2219 assert_eq!(manager.get_info(&id).unwrap().status, TaskStatus::Completed);
2220 }
2221}