Skip to main content

fastmcp_server/
tasks.rs

1//! Background task manager (Docket/SEP-1686).
2//!
3//! Provides support for long-running background tasks that outlive individual
4//! request lifecycles. Tasks are managed in a dedicated region that survives
5//! until server shutdown.
6//!
7//! # Architecture
8//!
9//! ```text
10//! Server Region (root)
11//! ├── Session Region (per connection)
12//! │   └── Request Regions (tools/call, etc.)
13//! └── Background Task Region (managed by TaskManager)
14//!     ├── Task 1
15//!     ├── Task 2
16//!     └── ...
17//! ```
18//!
19//! # Usage
20//!
21//! ```ignore
22//! let task_manager = TaskManager::new();
23//!
24//! // Submit a background task
25//! let task_id = task_manager.submit(&cx, "long_analysis", Some(json!({"data": ...})))?;
26//!
27//! // Check status
28//! let info = task_manager.get_info(&task_id);
29//!
30//! // Cancel if needed
31//! task_manager.cancel(&task_id, Some("User requested"))?;
32//! ```
33
34use std::collections::HashMap;
35use std::sync::atomic::{AtomicU64, Ordering};
36use std::sync::{Arc, RwLock};
37
38use asupersync::runtime::{RuntimeBuilder, RuntimeHandle};
39use asupersync::{Budget, CancelKind, Cx};
40use fastmcp_core::logging::{debug, info, targets, warn};
41use fastmcp_core::{McpError, McpResult};
42use fastmcp_protocol::{
43    JsonRpcRequest, TaskId, TaskInfo, TaskResult, TaskStatus, TaskStatusNotificationParams,
44};
45
46/// Notification sender used for task status updates.
47pub type TaskNotificationSender = Arc<dyn Fn(JsonRpcRequest) + Send + Sync>;
48
49/// Callback type for task execution.
50///
51/// Task handlers receive the context and parameters, and return a result.
52pub type TaskHandler = Box<dyn Fn(&Cx, serde_json::Value) -> TaskFuture + Send + Sync + 'static>;
53
54/// Future type for task execution.
55pub type TaskFuture = std::pin::Pin<
56    Box<dyn std::future::Future<Output = McpResult<serde_json::Value>> + Send + 'static>,
57>;
58
59/// Internal state for a running task.
60struct TaskState {
61    /// Task information.
62    info: TaskInfo,
63    /// Whether cancellation has been requested.
64    cancel_requested: bool,
65    /// Task result once completed.
66    result: Option<TaskResult>,
67    /// Task-scoped cancellation context.
68    cx: Cx,
69}
70
71fn can_transition(from: TaskStatus, to: TaskStatus) -> bool {
72    matches!(
73        (from, to),
74        (
75            TaskStatus::Pending,
76            TaskStatus::Running | TaskStatus::Failed | TaskStatus::Cancelled
77        ) | (
78            TaskStatus::Running,
79            TaskStatus::Completed | TaskStatus::Failed | TaskStatus::Cancelled
80        )
81    )
82}
83
84fn transition_state(state: &mut TaskState, to: TaskStatus) -> bool {
85    let from = state.info.status;
86    if from == to {
87        return true;
88    }
89    if !can_transition(from, to) {
90        warn!(
91            target: targets::SERVER,
92            "task {} invalid transition {:?} -> {:?}",
93            state.info.id,
94            from,
95            to
96        );
97        return false;
98    }
99
100    state.info.status = to;
101    let now = chrono::Utc::now().to_rfc3339();
102    match to {
103        TaskStatus::Running => {
104            state.info.started_at = Some(now.clone());
105        }
106        TaskStatus::Completed | TaskStatus::Failed | TaskStatus::Cancelled => {
107            state.info.completed_at = Some(now.clone());
108        }
109        TaskStatus::Pending => {}
110    }
111
112    info!(
113        target: targets::SERVER,
114        "task {} status {:?} -> {:?} at {}",
115        state.info.id,
116        from,
117        to,
118        now
119    );
120    true
121}
122
123fn mark_task_failed_snapshot(
124    tasks: &Arc<RwLock<HashMap<TaskId, TaskState>>>,
125    task_id: &TaskId,
126    error_msg: String,
127    lock_context: &'static str,
128) -> Option<TaskStatusSnapshot> {
129    let mut tasks_guard = tasks.write().unwrap_or_else(|poisoned| {
130        warn!(
131            target: targets::SERVER,
132            "tasks lock poisoned in {}, recovering",
133            lock_context
134        );
135        poisoned.into_inner()
136    });
137
138    let state = tasks_guard.get_mut(task_id)?;
139    if state.cancel_requested || !transition_state(state, TaskStatus::Failed) {
140        return None;
141    }
142
143    state.info.error = Some(error_msg.clone());
144    state.result = Some(TaskResult {
145        id: task_id.clone(),
146        success: false,
147        data: None,
148        error: Some(error_msg),
149    });
150    Some(TaskStatusSnapshot::from(state))
151}
152
153fn build_runtime_handle() -> Option<RuntimeHandle> {
154    match RuntimeBuilder::multi_thread().build() {
155        Ok(runtime) => Some(runtime.handle()),
156        Err(multi_err) => {
157            warn!(
158                target: targets::SERVER,
159                "failed to initialize multi-thread runtime for tasks: {}; attempting current-thread fallback",
160                multi_err
161            );
162            match RuntimeBuilder::current_thread().build() {
163                Ok(runtime) => Some(runtime.handle()),
164                Err(single_err) => {
165                    warn!(
166                        target: targets::SERVER,
167                        "failed to initialize current-thread runtime fallback for tasks: {}",
168                        single_err
169                    );
170                    None
171                }
172            }
173        }
174    }
175}
176
177/// Background task manager.
178///
179/// Manages the lifecycle of background tasks including submission, status
180/// tracking, and cancellation.
181pub struct TaskManager {
182    /// Active and completed tasks by ID.
183    tasks: Arc<RwLock<HashMap<TaskId, TaskState>>>,
184    /// Registered task handlers by type.
185    handlers: Arc<RwLock<HashMap<String, TaskHandler>>>,
186    /// Counter for generating unique task IDs.
187    task_counter: AtomicU64,
188    /// Whether task list changes should trigger notifications.
189    list_changed_notifications: bool,
190    /// Background runtime handle for executing tasks.
191    runtime: Option<RuntimeHandle>,
192    /// Whether submitted tasks should execute immediately.
193    auto_execute: bool,
194    /// Optional notification sender for task status updates.
195    notification_sender: Arc<RwLock<Option<TaskNotificationSender>>>,
196}
197
198impl TaskManager {
199    /// Creates a new task manager.
200    #[must_use]
201    pub fn new() -> Self {
202        let runtime = build_runtime_handle();
203        if runtime.is_none() {
204            warn!(
205                target: targets::SERVER,
206                "TaskManager runtime unavailable; auto-executed tasks will fail until runtime becomes available"
207            );
208        }
209        Self {
210            tasks: Arc::new(RwLock::new(HashMap::new())),
211            handlers: Arc::new(RwLock::new(HashMap::new())),
212            task_counter: AtomicU64::new(0),
213            list_changed_notifications: false,
214            runtime,
215            auto_execute: true,
216            notification_sender: Arc::new(RwLock::new(None)),
217        }
218    }
219
220    /// Creates a new task manager with list change notifications enabled.
221    #[must_use]
222    pub fn with_list_changed_notifications() -> Self {
223        Self {
224            list_changed_notifications: true,
225            ..Self::new()
226        }
227    }
228
229    /// Creates a task manager configured for deterministic tests.
230    ///
231    /// Tasks are not executed automatically; tests can drive state manually.
232    #[must_use]
233    pub fn new_for_testing() -> Self {
234        let mut manager = Self::new();
235        manager.auto_execute = false;
236        manager
237    }
238
239    /// Converts this manager into a shared handle.
240    #[must_use]
241    pub fn into_shared(self) -> SharedTaskManager {
242        Arc::new(self)
243    }
244
245    /// Returns whether list change notifications are enabled.
246    #[must_use]
247    pub fn has_list_changed_notifications(&self) -> bool {
248        self.list_changed_notifications
249    }
250
251    /// Sets the notification sender for task status updates.
252    pub fn set_notification_sender(&self, sender: TaskNotificationSender) {
253        let mut guard = self.notification_sender.write().unwrap_or_else(|poisoned| {
254            warn!(target: targets::SERVER, "notification sender lock poisoned, recovering");
255            poisoned.into_inner()
256        });
257        *guard = Some(sender);
258    }
259
260    /// Registers a task handler for a specific task type.
261    ///
262    /// The handler will be invoked when a task of this type is submitted.
263    pub fn register_handler<F, Fut>(&self, task_type: impl Into<String>, handler: F)
264    where
265        F: Fn(&Cx, serde_json::Value) -> Fut + Send + Sync + 'static,
266        Fut: std::future::Future<Output = McpResult<serde_json::Value>> + Send + 'static,
267    {
268        let task_type = task_type.into();
269        let boxed_handler: TaskHandler = Box::new(move |cx, params| Box::pin(handler(cx, params)));
270
271        let mut handlers = self.handlers.write().unwrap_or_else(|poisoned| {
272            warn!(target: targets::SERVER, "handlers lock poisoned, recovering");
273            poisoned.into_inner()
274        });
275        handlers.insert(task_type, boxed_handler);
276    }
277
278    /// Submits a new background task.
279    ///
280    /// Returns the task ID for tracking. The task runs asynchronously in the
281    /// background region.
282    pub fn submit(
283        &self,
284        _cx: &Cx,
285        task_type: impl Into<String>,
286        params: Option<serde_json::Value>,
287    ) -> McpResult<TaskId> {
288        let task_type = task_type.into();
289
290        // Check if handler exists
291        {
292            let handlers = self.handlers.read().unwrap_or_else(|poisoned| {
293                warn!(target: targets::SERVER, "handlers lock poisoned, recovering");
294                poisoned.into_inner()
295            });
296            if !handlers.contains_key(&task_type) {
297                return Err(McpError::invalid_params(format!(
298                    "Unknown task type: {task_type}"
299                )));
300            }
301        }
302
303        // Generate unique task ID
304        let counter = self.task_counter.fetch_add(1, Ordering::SeqCst);
305        let task_id = TaskId::from_string(format!("task-{counter:08x}"));
306
307        // Create task info
308        let now = chrono::Utc::now().to_rfc3339();
309        let task_cx = Cx::for_request_with_budget(Budget::INFINITE);
310        let info = TaskInfo {
311            id: task_id.clone(),
312            task_type: task_type.clone(),
313            status: TaskStatus::Pending,
314            progress: None,
315            message: None,
316            created_at: now,
317            started_at: None,
318            completed_at: None,
319            error: None,
320        };
321
322        let info_snapshot = info.clone();
323
324        // Store task state
325        let state = TaskState {
326            info,
327            cancel_requested: false,
328            result: None,
329            cx: task_cx.clone(),
330        };
331
332        {
333            let mut tasks = self.tasks.write().unwrap_or_else(|poisoned| {
334                warn!(target: targets::SERVER, "tasks lock poisoned, recovering");
335                poisoned.into_inner()
336            });
337            tasks.insert(task_id.clone(), state);
338        }
339
340        self.notify_status(info_snapshot, None);
341
342        if self.auto_execute {
343            let params = params.unwrap_or_else(|| serde_json::json!({}));
344            self.spawn_task(task_id.clone(), task_type, task_cx, params);
345        }
346
347        Ok(task_id)
348    }
349
350    #[allow(clippy::too_many_lines)]
351    fn spawn_task(
352        &self,
353        task_id: TaskId,
354        task_type: String,
355        task_cx: Cx,
356        params: serde_json::Value,
357    ) {
358        let Some(runtime) = self.runtime.clone() else {
359            let failure_snapshot = mark_task_failed_snapshot(
360                &self.tasks,
361                &task_id,
362                "Task runtime unavailable".to_string(),
363                "spawn_task runtime unavailable",
364            );
365            self.notify_snapshot(failure_snapshot);
366            return;
367        };
368
369        let tasks = Arc::clone(&self.tasks);
370        let handlers = Arc::clone(&self.handlers);
371        let notification_sender = Arc::clone(&self.notification_sender);
372        let scheduled_task_id = task_id.clone();
373        let scheduling = runtime.try_spawn(async move {
374            let running_snapshot = {
375                let mut tasks_guard = tasks.write().unwrap_or_else(|poisoned| {
376                    warn!(target: targets::SERVER, "tasks lock poisoned in spawn_task, recovering");
377                    poisoned.into_inner()
378                });
379                match tasks_guard.get_mut(&task_id) {
380                    Some(state) => {
381                        if state.cancel_requested || !transition_state(state, TaskStatus::Running) {
382                            None
383                        } else {
384                            Some(TaskStatusSnapshot::from(state))
385                        }
386                    }
387                    None => None,
388                }
389            };
390
391            let should_start = running_snapshot.is_some();
392            notify_snapshot(&notification_sender, running_snapshot);
393
394            if !should_start {
395                return;
396            }
397
398            let task_future = {
399                let handlers_guard = handlers.read().unwrap_or_else(|poisoned| {
400                    warn!(target: targets::SERVER, "handlers lock poisoned in spawn_task, recovering");
401                    poisoned.into_inner()
402                });
403                let Some(handler) = handlers_guard.get(&task_type) else {
404                    let failure_snapshot = mark_task_failed_snapshot(
405                        &tasks,
406                        &task_id,
407                        format!("Unknown task type: {task_type}"),
408                        "spawn_task failure",
409                    );
410                    notify_snapshot(&notification_sender, failure_snapshot);
411                    return;
412                };
413                (handler)(&task_cx, params)
414            };
415
416            let result = task_future.await;
417
418            let completion_snapshot = {
419                let mut tasks_guard = tasks.write().unwrap_or_else(|poisoned| {
420                    warn!(target: targets::SERVER, "tasks lock poisoned in spawn_task completion, recovering");
421                    poisoned.into_inner()
422                });
423                match tasks_guard.get_mut(&task_id) {
424                    Some(state) => {
425                        if state.cancel_requested {
426                            None
427                        } else {
428                            let mut snapshot = None;
429                            match result {
430                                Ok(data) => {
431                                    if transition_state(state, TaskStatus::Completed) {
432                                        state.info.progress = Some(1.0);
433                                        state.result = Some(TaskResult {
434                                            id: task_id.clone(),
435                                            success: true,
436                                            data: Some(data),
437                                            error: None,
438                                        });
439                                        snapshot = Some(TaskStatusSnapshot::from(state));
440                                    }
441                                }
442                                Err(err) => {
443                                    let error_msg = err.message;
444                                    if transition_state(state, TaskStatus::Failed) {
445                                        state.info.error = Some(error_msg.clone());
446                                        state.result = Some(TaskResult {
447                                            id: task_id.clone(),
448                                            success: false,
449                                            data: None,
450                                            error: Some(error_msg),
451                                        });
452                                        snapshot = Some(TaskStatusSnapshot::from(state));
453                                    }
454                                }
455                            }
456                            snapshot
457                        }
458                    }
459                    None => None,
460                }
461            };
462
463            notify_snapshot(&notification_sender, completion_snapshot);
464        });
465
466        if let Err(err) = scheduling {
467            warn!(
468                target: targets::SERVER,
469                "failed to schedule task {}: {}",
470                scheduled_task_id,
471                err
472            );
473            let failure_snapshot = mark_task_failed_snapshot(
474                &self.tasks,
475                &scheduled_task_id,
476                format!("Failed to schedule task: {err}"),
477                "spawn_task scheduling",
478            );
479            self.notify_snapshot(failure_snapshot);
480        }
481    }
482
483    /// Starts execution of a pending task.
484    ///
485    /// This is called internally to transition a task from Pending to Running.
486    pub fn start_task(&self, task_id: &TaskId) -> McpResult<()> {
487        let snapshot = {
488            let mut tasks = self.tasks.write().unwrap_or_else(|poisoned| {
489                warn!(target: targets::SERVER, "tasks lock poisoned in start_task, recovering");
490                poisoned.into_inner()
491            });
492            let state = tasks
493                .get_mut(task_id)
494                .ok_or_else(|| McpError::invalid_params(format!("Task not found: {task_id}")))?;
495
496            if state.info.status != TaskStatus::Pending {
497                return Err(McpError::invalid_params(format!(
498                    "Task {task_id} is not pending"
499                )));
500            }
501
502            if !transition_state(state, TaskStatus::Running) {
503                return Err(McpError::invalid_params(format!(
504                    "Task {task_id} cannot transition to running"
505                )));
506            }
507            Some(TaskStatusSnapshot::from(state))
508        };
509
510        self.notify_snapshot(snapshot);
511        Ok(())
512    }
513
514    /// Updates progress for a running task.
515    pub fn update_progress(&self, task_id: &TaskId, progress: f64, message: Option<String>) {
516        let snapshot = {
517            let mut tasks = self.tasks.write().unwrap_or_else(|poisoned| {
518                warn!(target: targets::SERVER, "tasks lock poisoned in update_progress, recovering");
519                poisoned.into_inner()
520            });
521            if let Some(state) = tasks.get_mut(task_id) {
522                if state.info.status != TaskStatus::Running {
523                    debug!(
524                        target: targets::SERVER,
525                        "task {} progress update ignored in state {:?}",
526                        task_id,
527                        state.info.status
528                    );
529                    return;
530                }
531                state.info.progress = Some(progress.clamp(0.0, 1.0));
532                state.info.message = message;
533                Some(TaskStatusSnapshot::from(state))
534            } else {
535                None
536            }
537        };
538
539        self.notify_snapshot(snapshot);
540    }
541
542    /// Completes a task with a successful result.
543    pub fn complete_task(&self, task_id: &TaskId, data: serde_json::Value) {
544        let snapshot = {
545            let mut tasks = self.tasks.write().unwrap_or_else(|poisoned| {
546                warn!(target: targets::SERVER, "tasks lock poisoned in complete_task, recovering");
547                poisoned.into_inner()
548            });
549            if let Some(state) = tasks.get_mut(task_id) {
550                if !transition_state(state, TaskStatus::Completed) {
551                    return;
552                }
553                state.info.progress = Some(1.0);
554                state.result = Some(TaskResult {
555                    id: task_id.clone(),
556                    success: true,
557                    data: Some(data),
558                    error: None,
559                });
560                Some(TaskStatusSnapshot::from(state))
561            } else {
562                None
563            }
564        };
565
566        self.notify_snapshot(snapshot);
567    }
568
569    /// Fails a task with an error.
570    pub fn fail_task(&self, task_id: &TaskId, error: impl Into<String>) {
571        let error = error.into();
572        let snapshot = {
573            let mut tasks = self.tasks.write().unwrap_or_else(|poisoned| {
574                warn!(target: targets::SERVER, "tasks lock poisoned in fail_task, recovering");
575                poisoned.into_inner()
576            });
577            if let Some(state) = tasks.get_mut(task_id) {
578                if !transition_state(state, TaskStatus::Failed) {
579                    return;
580                }
581                state.info.error = Some(error.clone());
582                state.result = Some(TaskResult {
583                    id: task_id.clone(),
584                    success: false,
585                    data: None,
586                    error: Some(error),
587                });
588                Some(TaskStatusSnapshot::from(state))
589            } else {
590                None
591            }
592        };
593
594        self.notify_snapshot(snapshot);
595    }
596
597    /// Gets information about a task.
598    #[must_use]
599    pub fn get_info(&self, task_id: &TaskId) -> Option<TaskInfo> {
600        let tasks = self.tasks.read().unwrap_or_else(|poisoned| {
601            warn!(target: targets::SERVER, "tasks lock poisoned in get_info, recovering");
602            poisoned.into_inner()
603        });
604        tasks.get(task_id).map(|s| s.info.clone())
605    }
606
607    /// Gets the result of a completed task.
608    #[must_use]
609    pub fn get_result(&self, task_id: &TaskId) -> Option<TaskResult> {
610        let tasks = self.tasks.read().unwrap_or_else(|poisoned| {
611            warn!(target: targets::SERVER, "tasks lock poisoned in get_result, recovering");
612            poisoned.into_inner()
613        });
614        tasks.get(task_id).and_then(|s| s.result.clone())
615    }
616
617    /// Lists all tasks, optionally filtered by status.
618    #[must_use]
619    pub fn list_tasks(&self, status_filter: Option<TaskStatus>) -> Vec<TaskInfo> {
620        let tasks = self.tasks.read().unwrap_or_else(|poisoned| {
621            warn!(target: targets::SERVER, "tasks lock poisoned in list_tasks, recovering");
622            poisoned.into_inner()
623        });
624        tasks
625            .values()
626            .filter(|s| status_filter.is_none_or(|f| s.info.status == f))
627            .map(|s| s.info.clone())
628            .collect()
629    }
630
631    /// Requests cancellation of a task.
632    ///
633    /// Returns true if the task exists and cancellation was requested.
634    /// The task may still be running until it checks for cancellation.
635    pub fn cancel(&self, task_id: &TaskId, reason: Option<String>) -> McpResult<TaskInfo> {
636        let snapshot = {
637            let mut tasks = self.tasks.write().unwrap_or_else(|poisoned| {
638                warn!(target: targets::SERVER, "tasks lock poisoned in cancel, recovering");
639                poisoned.into_inner()
640            });
641            let state = tasks
642                .get_mut(task_id)
643                .ok_or_else(|| McpError::invalid_params(format!("Task not found: {task_id}")))?;
644
645            // Can only cancel pending or running tasks
646            if state.info.status.is_terminal() {
647                return Err(McpError::invalid_params(format!(
648                    "Task {task_id} is already in terminal state: {:?}",
649                    state.info.status
650                )));
651            }
652
653            if !transition_state(state, TaskStatus::Cancelled) {
654                return Err(McpError::invalid_params(format!(
655                    "Task {task_id} cannot be cancelled from {:?}",
656                    state.info.status
657                )));
658            }
659
660            state.cancel_requested = true;
661
662            state.cx.cancel_with(CancelKind::User, None);
663            if !state.cx.is_cancel_requested() {
664                warn!(
665                    target: targets::SERVER,
666                    "task {} cancel signal not observed on context",
667                    task_id
668                );
669            }
670
671            let error_msg = reason.unwrap_or_else(|| "Cancelled by request".to_string());
672            state.info.error = Some(error_msg.clone());
673            state.result = Some(TaskResult {
674                id: task_id.clone(),
675                success: false,
676                data: None,
677                error: Some(error_msg),
678            });
679
680            let snapshot = TaskStatusSnapshot::from(state);
681            (snapshot, state.info.clone())
682        };
683
684        let (snapshot, info) = snapshot;
685        self.notify_snapshot(Some(snapshot));
686        Ok(info)
687    }
688
689    /// Checks if cancellation has been requested for a task.
690    #[must_use]
691    pub fn is_cancel_requested(&self, task_id: &TaskId) -> bool {
692        let tasks = self.tasks.read().unwrap_or_else(|poisoned| {
693            warn!(target: targets::SERVER, "tasks lock poisoned in is_cancel_requested, recovering");
694            poisoned.into_inner()
695        });
696        tasks.get(task_id).is_some_and(|s| s.cancel_requested)
697    }
698
699    /// Returns the number of active (non-terminal) tasks.
700    #[must_use]
701    pub fn active_count(&self) -> usize {
702        let tasks = self.tasks.read().unwrap_or_else(|poisoned| {
703            warn!(target: targets::SERVER, "tasks lock poisoned in active_count, recovering");
704            poisoned.into_inner()
705        });
706        tasks.values().filter(|s| s.info.status.is_active()).count()
707    }
708
709    /// Returns the total number of tasks.
710    #[must_use]
711    pub fn total_count(&self) -> usize {
712        let tasks = self.tasks.read().unwrap_or_else(|poisoned| {
713            warn!(target: targets::SERVER, "tasks lock poisoned in total_count, recovering");
714            poisoned.into_inner()
715        });
716        tasks.len()
717    }
718
719    /// Removes completed tasks older than the specified duration.
720    ///
721    /// This is useful for preventing unbounded memory growth from completed tasks.
722    pub fn cleanup_completed(&self, max_age: std::time::Duration) {
723        let cutoff = chrono::Utc::now() - chrono::Duration::from_std(max_age).unwrap_or_default();
724
725        let mut tasks = self.tasks.write().unwrap_or_else(|poisoned| {
726            warn!(target: targets::SERVER, "tasks lock poisoned in cleanup_completed, recovering");
727            poisoned.into_inner()
728        });
729        tasks.retain(|_, state| {
730            // Keep active tasks
731            if state.info.status.is_active() {
732                return true;
733            }
734
735            // Keep recent completed tasks
736            if let Some(ref completed) = state.info.completed_at {
737                if let Ok(parsed) = chrono::DateTime::parse_from_rfc3339(completed) {
738                    return parsed.with_timezone(&chrono::Utc) > cutoff;
739                }
740                return true;
741            }
742
743            true
744        });
745    }
746
747    fn notify_snapshot(&self, snapshot: Option<TaskStatusSnapshot>) {
748        if let Some(snapshot) = snapshot {
749            self.notify_status(snapshot.info, snapshot.result);
750        }
751    }
752
753    fn notify_status(&self, info: TaskInfo, result: Option<TaskResult>) {
754        let sender = {
755            let guard = self.notification_sender.read().unwrap_or_else(|poisoned| {
756                warn!(target: targets::SERVER, "notification sender lock poisoned in notify_status, recovering");
757                poisoned.into_inner()
758            });
759            guard.clone()
760        };
761        let Some(sender) = sender else {
762            return;
763        };
764
765        let params = TaskStatusNotificationParams {
766            id: info.id.clone(),
767            status: info.status,
768            progress: info.progress,
769            message: info.message.clone(),
770            error: info.error.clone(),
771            result,
772        };
773        let payload = match serde_json::to_value(params) {
774            Ok(value) => value,
775            Err(err) => {
776                warn!(
777                    target: targets::SERVER,
778                    "failed to serialize task status notification: {}",
779                    err
780                );
781                return;
782            }
783        };
784        sender(JsonRpcRequest::notification(
785            "notifications/tasks/status",
786            Some(payload),
787        ));
788    }
789}
790
791#[derive(Debug, Clone)]
792struct TaskStatusSnapshot {
793    info: TaskInfo,
794    result: Option<TaskResult>,
795}
796
797impl TaskStatusSnapshot {
798    fn from(state: &TaskState) -> Self {
799        Self {
800            info: state.info.clone(),
801            result: state.result.clone(),
802        }
803    }
804}
805
806fn notify_snapshot(
807    sender: &Arc<RwLock<Option<TaskNotificationSender>>>,
808    snapshot: Option<TaskStatusSnapshot>,
809) {
810    let Some(snapshot) = snapshot else {
811        return;
812    };
813    let sender = {
814        let guard = sender.read().unwrap_or_else(|poisoned| {
815            warn!(target: targets::SERVER, "notification sender lock poisoned in notify_snapshot, recovering");
816            poisoned.into_inner()
817        });
818        guard.clone()
819    };
820    let Some(sender) = sender else {
821        return;
822    };
823    let params = TaskStatusNotificationParams {
824        id: snapshot.info.id.clone(),
825        status: snapshot.info.status,
826        progress: snapshot.info.progress,
827        message: snapshot.info.message.clone(),
828        error: snapshot.info.error.clone(),
829        result: snapshot.result,
830    };
831    let payload = match serde_json::to_value(params) {
832        Ok(value) => value,
833        Err(err) => {
834            warn!(
835                target: targets::SERVER,
836                "failed to serialize task status notification: {}",
837                err
838            );
839            return;
840        }
841    };
842    sender(JsonRpcRequest::notification(
843        "notifications/tasks/status",
844        Some(payload),
845    ));
846}
847
848impl Default for TaskManager {
849    fn default() -> Self {
850        Self::new()
851    }
852}
853
854impl std::fmt::Debug for TaskManager {
855    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
856        // Use poison recovery to avoid panic during Debug formatting
857        let task_count = self
858            .tasks
859            .read()
860            .map(|g| g.len())
861            .unwrap_or_else(|poisoned| poisoned.into_inner().len());
862        let handler_count = self
863            .handlers
864            .read()
865            .map(|g| g.len())
866            .unwrap_or_else(|poisoned| poisoned.into_inner().len());
867        f.debug_struct("TaskManager")
868            .field("task_count", &task_count)
869            .field("handler_count", &handler_count)
870            .field("task_counter", &self.task_counter.load(Ordering::SeqCst))
871            .field(
872                "list_changed_notifications",
873                &self.list_changed_notifications,
874            )
875            .field("auto_execute", &self.auto_execute)
876            .finish_non_exhaustive()
877    }
878}
879
880/// Thread-safe handle to a TaskManager.
881pub type SharedTaskManager = Arc<TaskManager>;
882
883#[cfg(test)]
884mod tests {
885    use super::*;
886    use std::sync::Arc;
887    use std::thread;
888    use std::time::Duration;
889
890    #[test]
891    fn test_task_manager_creation() {
892        let manager = TaskManager::new();
893        assert_eq!(manager.total_count(), 0);
894        assert_eq!(manager.active_count(), 0);
895        assert!(!manager.has_list_changed_notifications());
896    }
897
898    #[test]
899    fn test_task_manager_with_notifications() {
900        let manager = TaskManager::with_list_changed_notifications();
901        assert!(manager.has_list_changed_notifications());
902    }
903
904    #[test]
905    fn test_register_handler() {
906        let manager = TaskManager::new();
907
908        manager.register_handler("test_task", |_cx, _params| async {
909            Ok(serde_json::json!({}))
910        });
911
912        // Submit should succeed now
913        let cx = Cx::for_testing();
914        let result = manager.submit(&cx, "test_task", None);
915        assert!(result.is_ok());
916    }
917
918    #[test]
919    fn test_submit_auto_execute_fails_when_runtime_unavailable() {
920        let mut manager = TaskManager::new_for_testing();
921        manager.auto_execute = true;
922        manager.runtime = None;
923
924        manager.register_handler("test_task", |_cx, _params| async {
925            Ok(serde_json::json!({}))
926        });
927
928        let cx = Cx::for_testing();
929        let task_id = manager.submit(&cx, "test_task", None).unwrap();
930
931        let info = manager.get_info(&task_id).unwrap();
932        assert_eq!(info.status, TaskStatus::Failed);
933        assert_eq!(info.error.as_deref(), Some("Task runtime unavailable"));
934
935        let result = manager.get_result(&task_id).unwrap();
936        assert!(!result.success);
937        assert_eq!(result.error.as_deref(), Some("Task runtime unavailable"));
938    }
939
940    #[test]
941    fn test_submit_unknown_task_type() {
942        let manager = TaskManager::new();
943        let cx = Cx::for_testing();
944
945        let result = manager.submit(&cx, "unknown_task", None);
946        assert!(result.is_err());
947    }
948
949    #[test]
950    fn test_task_lifecycle() {
951        let manager = TaskManager::new_for_testing();
952        let cx = Cx::for_testing();
953
954        manager.register_handler("test", |_cx, _params| async {
955            Ok(serde_json::json!({"done": true}))
956        });
957
958        // Submit
959        let task_id = manager.submit(&cx, "test", None).unwrap();
960
961        // Check initial state
962        let info = manager.get_info(&task_id).unwrap();
963        assert_eq!(info.status, TaskStatus::Pending);
964        assert!(info.started_at.is_none());
965
966        // Start
967        manager.start_task(&task_id).unwrap();
968        let info = manager.get_info(&task_id).unwrap();
969        assert_eq!(info.status, TaskStatus::Running);
970        assert!(info.started_at.is_some());
971
972        // Update progress
973        manager.update_progress(&task_id, 0.5, Some("Halfway done".into()));
974        let info = manager.get_info(&task_id).unwrap();
975        assert_eq!(info.progress, Some(0.5));
976        assert_eq!(info.message, Some("Halfway done".into()));
977
978        // Complete
979        manager.complete_task(&task_id, serde_json::json!({"result": 42}));
980        let info = manager.get_info(&task_id).unwrap();
981        assert_eq!(info.status, TaskStatus::Completed);
982        assert!(info.completed_at.is_some());
983
984        // Check result
985        let result = manager.get_result(&task_id).unwrap();
986        assert!(result.success);
987        assert_eq!(result.data, Some(serde_json::json!({"result": 42})));
988    }
989
990    #[test]
991    fn test_task_failure() {
992        let manager = TaskManager::new_for_testing();
993        let cx = Cx::for_testing();
994
995        manager.register_handler("fail_test", |_cx, _params| async {
996            Ok(serde_json::json!({}))
997        });
998
999        let task_id = manager.submit(&cx, "fail_test", None).unwrap();
1000        manager.start_task(&task_id).unwrap();
1001        manager.fail_task(&task_id, "Something went wrong");
1002
1003        let info = manager.get_info(&task_id).unwrap();
1004        assert_eq!(info.status, TaskStatus::Failed);
1005        assert_eq!(info.error, Some("Something went wrong".into()));
1006
1007        let result = manager.get_result(&task_id).unwrap();
1008        assert!(!result.success);
1009        assert_eq!(result.error, Some("Something went wrong".into()));
1010    }
1011
1012    #[test]
1013    fn test_task_cancellation() {
1014        let manager = TaskManager::new_for_testing();
1015        let cx = Cx::for_testing();
1016
1017        manager.register_handler("cancel_test", |_cx, _params| async {
1018            Ok(serde_json::json!({}))
1019        });
1020
1021        let task_id = manager.submit(&cx, "cancel_test", None).unwrap();
1022        manager.start_task(&task_id).unwrap();
1023
1024        // Cancel
1025        let info = manager
1026            .cancel(&task_id, Some("User cancelled".into()))
1027            .unwrap();
1028        assert_eq!(info.status, TaskStatus::Cancelled);
1029
1030        // Check cancel flag
1031        assert!(manager.is_cancel_requested(&task_id));
1032
1033        // Cannot cancel again
1034        let result = manager.cancel(&task_id, None);
1035        assert!(result.is_err());
1036    }
1037
1038    #[test]
1039    fn test_list_tasks() {
1040        let manager = TaskManager::new_for_testing();
1041        let cx = Cx::for_testing();
1042
1043        manager.register_handler("list_test", |_cx, _params| async {
1044            Ok(serde_json::json!({}))
1045        });
1046
1047        let task1 = manager.submit(&cx, "list_test", None).unwrap();
1048        let task2 = manager.submit(&cx, "list_test", None).unwrap();
1049        let _task3 = manager.submit(&cx, "list_test", None).unwrap();
1050
1051        // All pending initially
1052        assert_eq!(manager.list_tasks(Some(TaskStatus::Pending)).len(), 3);
1053        assert_eq!(manager.list_tasks(Some(TaskStatus::Running)).len(), 0);
1054
1055        // Start one
1056        manager.start_task(&task1).unwrap();
1057        assert_eq!(manager.list_tasks(Some(TaskStatus::Pending)).len(), 2);
1058        assert_eq!(manager.list_tasks(Some(TaskStatus::Running)).len(), 1);
1059
1060        // Complete one
1061        manager.start_task(&task2).unwrap();
1062        manager.complete_task(&task2, serde_json::json!({}));
1063        assert_eq!(manager.list_tasks(Some(TaskStatus::Completed)).len(), 1);
1064
1065        // All tasks
1066        assert_eq!(manager.list_tasks(None).len(), 3);
1067    }
1068
1069    #[test]
1070    fn test_active_count() {
1071        let manager = TaskManager::new_for_testing();
1072        let cx = Cx::for_testing();
1073
1074        manager.register_handler("count_test", |_cx, _params| async {
1075            Ok(serde_json::json!({}))
1076        });
1077
1078        let task1 = manager.submit(&cx, "count_test", None).unwrap();
1079        let task2 = manager.submit(&cx, "count_test", None).unwrap();
1080
1081        assert_eq!(manager.active_count(), 2);
1082        assert_eq!(manager.total_count(), 2);
1083
1084        manager.start_task(&task1).unwrap();
1085        assert_eq!(manager.active_count(), 2);
1086
1087        manager.complete_task(&task1, serde_json::json!({}));
1088        assert_eq!(manager.active_count(), 1);
1089
1090        manager.cancel(&task2, None).unwrap();
1091        assert_eq!(manager.active_count(), 0);
1092        assert_eq!(manager.total_count(), 2);
1093    }
1094
1095    #[test]
1096    fn test_progress_clamping() {
1097        let manager = TaskManager::new_for_testing();
1098        let cx = Cx::for_testing();
1099
1100        manager.register_handler("clamp_test", |_cx, _params| async {
1101            Ok(serde_json::json!({}))
1102        });
1103
1104        let task_id = manager.submit(&cx, "clamp_test", None).unwrap();
1105        manager.start_task(&task_id).unwrap();
1106
1107        // Progress should be clamped to [0.0, 1.0]
1108        manager.update_progress(&task_id, -0.5, None);
1109        assert_eq!(manager.get_info(&task_id).unwrap().progress, Some(0.0));
1110
1111        manager.update_progress(&task_id, 1.5, None);
1112        assert_eq!(manager.get_info(&task_id).unwrap().progress, Some(1.0));
1113
1114        manager.update_progress(&task_id, 0.75, None);
1115        assert_eq!(manager.get_info(&task_id).unwrap().progress, Some(0.75));
1116    }
1117
1118    #[test]
1119    fn test_invalid_transition_rejected() {
1120        let manager = TaskManager::new_for_testing();
1121        let cx = Cx::for_testing();
1122
1123        manager.register_handler("transition_test", |_cx, _params| async {
1124            Ok(serde_json::json!({}))
1125        });
1126
1127        let task_id = manager.submit(&cx, "transition_test", None).unwrap();
1128
1129        // Completing before running should be ignored.
1130        manager.complete_task(&task_id, serde_json::json!({"result": "noop"}));
1131        let info = manager.get_info(&task_id).unwrap();
1132        assert_eq!(info.status, TaskStatus::Pending);
1133
1134        manager.start_task(&task_id).unwrap();
1135        manager.complete_task(&task_id, serde_json::json!({"result": "ok"}));
1136        let info = manager.get_info(&task_id).unwrap();
1137        assert_eq!(info.status, TaskStatus::Completed);
1138
1139        // Starting after completion should fail.
1140        let result = manager.start_task(&task_id);
1141        assert!(result.is_err());
1142    }
1143
1144    #[test]
1145    fn test_concurrent_submissions() {
1146        let manager = Arc::new(TaskManager::new_for_testing());
1147        manager.register_handler("concurrent_test", |_cx, _params| async {
1148            Ok(serde_json::json!({}))
1149        });
1150
1151        let mut handles = Vec::new();
1152        for _ in 0..4 {
1153            let manager = Arc::clone(&manager);
1154            handles.push(thread::spawn(move || {
1155                let cx = Cx::for_testing();
1156                for _ in 0..10 {
1157                    let _ = manager.submit(&cx, "concurrent_test", None).unwrap();
1158                }
1159            }));
1160        }
1161
1162        for handle in handles {
1163            handle.join().expect("thread join failed");
1164        }
1165
1166        assert_eq!(manager.total_count(), 40);
1167        assert_eq!(manager.list_tasks(Some(TaskStatus::Pending)).len(), 40);
1168    }
1169
1170    #[test]
1171    fn test_task_status_notifications() {
1172        let manager = TaskManager::new_for_testing();
1173        manager.register_handler("notify_test", |_cx, _params| async {
1174            Ok(serde_json::json!({"ok": true}))
1175        });
1176
1177        let events: Arc<std::sync::Mutex<Vec<TaskStatusNotificationParams>>> =
1178            Arc::new(std::sync::Mutex::new(Vec::new()));
1179        let sender_events = Arc::clone(&events);
1180        let sender: TaskNotificationSender = Arc::new(move |request| {
1181            if request.method != "notifications/tasks/status" {
1182                return;
1183            }
1184            let params = request
1185                .params
1186                .as_ref()
1187                .and_then(|value| serde_json::from_value(value.clone()).ok())
1188                .expect("task status params");
1189            sender_events
1190                .lock()
1191                .expect("events lock poisoned")
1192                .push(params);
1193        });
1194        manager.set_notification_sender(sender);
1195
1196        let cx = Cx::for_testing();
1197        let task_id = manager.submit(&cx, "notify_test", None).unwrap();
1198        manager.start_task(&task_id).unwrap();
1199        manager.update_progress(&task_id, 0.5, Some("half".to_string()));
1200        manager.complete_task(&task_id, serde_json::json!({"result": 1}));
1201
1202        let recorded = events.lock().expect("events lock poisoned").clone();
1203        assert!(!recorded.is_empty(), "expected task status notifications");
1204        assert_eq!(recorded[0].id, task_id);
1205        assert_eq!(recorded[0].status, TaskStatus::Pending);
1206        assert_eq!(recorded[1].status, TaskStatus::Running);
1207        assert_eq!(recorded[2].progress, Some(0.5));
1208        assert_eq!(recorded.last().expect("last").status, TaskStatus::Completed);
1209    }
1210
1211    // ── can_transition ─────────────────────────────────────────────────
1212
1213    #[test]
1214    fn can_transition_valid_pairs() {
1215        assert!(can_transition(TaskStatus::Pending, TaskStatus::Running));
1216        assert!(can_transition(TaskStatus::Pending, TaskStatus::Failed));
1217        assert!(can_transition(TaskStatus::Pending, TaskStatus::Cancelled));
1218        assert!(can_transition(TaskStatus::Running, TaskStatus::Completed));
1219        assert!(can_transition(TaskStatus::Running, TaskStatus::Failed));
1220        assert!(can_transition(TaskStatus::Running, TaskStatus::Cancelled));
1221    }
1222
1223    #[test]
1224    fn can_transition_invalid_pairs() {
1225        assert!(!can_transition(TaskStatus::Pending, TaskStatus::Completed));
1226        assert!(!can_transition(TaskStatus::Completed, TaskStatus::Running));
1227        assert!(!can_transition(TaskStatus::Completed, TaskStatus::Pending));
1228        assert!(!can_transition(
1229            TaskStatus::Completed,
1230            TaskStatus::Cancelled
1231        ));
1232        assert!(!can_transition(TaskStatus::Failed, TaskStatus::Running));
1233        assert!(!can_transition(TaskStatus::Cancelled, TaskStatus::Running));
1234    }
1235
1236    // ── Default / Debug / into_shared ──────────────────────────────────
1237
1238    #[test]
1239    fn default_creates_empty_manager() {
1240        let manager = TaskManager::default();
1241        assert_eq!(manager.total_count(), 0);
1242        assert!(!manager.has_list_changed_notifications());
1243    }
1244
1245    #[test]
1246    fn new_for_testing_disables_auto_execute() {
1247        let manager = TaskManager::new_for_testing();
1248        assert!(!manager.auto_execute);
1249    }
1250
1251    #[test]
1252    fn into_shared_returns_arc() {
1253        let manager = TaskManager::new_for_testing();
1254        let shared: SharedTaskManager = manager.into_shared();
1255        assert_eq!(shared.total_count(), 0);
1256    }
1257
1258    #[test]
1259    fn debug_output_contains_fields() {
1260        let manager = TaskManager::new_for_testing();
1261        let debug = format!("{:?}", manager);
1262        assert!(debug.contains("TaskManager"));
1263        assert!(debug.contains("task_count"));
1264        assert!(debug.contains("handler_count"));
1265        assert!(debug.contains("task_counter"));
1266        assert!(debug.contains("list_changed_notifications"));
1267        assert!(debug.contains("auto_execute"));
1268    }
1269
1270    // ── get_info / get_result for nonexistent tasks ────────────────────
1271
1272    #[test]
1273    fn get_info_nonexistent_returns_none() {
1274        let manager = TaskManager::new_for_testing();
1275        let fake_id = TaskId::from_string("nonexistent".to_string());
1276        assert!(manager.get_info(&fake_id).is_none());
1277    }
1278
1279    #[test]
1280    fn get_result_nonexistent_returns_none() {
1281        let manager = TaskManager::new_for_testing();
1282        let fake_id = TaskId::from_string("nonexistent".to_string());
1283        assert!(manager.get_result(&fake_id).is_none());
1284    }
1285
1286    #[test]
1287    fn get_result_pending_task_returns_none() {
1288        let manager = TaskManager::new_for_testing();
1289        let cx = Cx::for_testing();
1290        manager.register_handler("t", |_cx, _params| async { Ok(serde_json::json!({})) });
1291        let id = manager.submit(&cx, "t", None).unwrap();
1292        assert!(manager.get_result(&id).is_none());
1293    }
1294
1295    // ── is_cancel_requested edge cases ─────────────────────────────────
1296
1297    #[test]
1298    fn is_cancel_requested_nonexistent_returns_false() {
1299        let manager = TaskManager::new_for_testing();
1300        let fake_id = TaskId::from_string("nonexistent".to_string());
1301        assert!(!manager.is_cancel_requested(&fake_id));
1302    }
1303
1304    #[test]
1305    fn is_cancel_requested_before_cancel_returns_false() {
1306        let manager = TaskManager::new_for_testing();
1307        let cx = Cx::for_testing();
1308        manager.register_handler("t", |_cx, _params| async { Ok(serde_json::json!({})) });
1309        let id = manager.submit(&cx, "t", None).unwrap();
1310        assert!(!manager.is_cancel_requested(&id));
1311    }
1312
1313    // ── update_progress edge cases ─────────────────────────────────────
1314
1315    #[test]
1316    fn update_progress_on_pending_task_is_ignored() {
1317        let manager = TaskManager::new_for_testing();
1318        let cx = Cx::for_testing();
1319        manager.register_handler("t", |_cx, _params| async { Ok(serde_json::json!({})) });
1320        let id = manager.submit(&cx, "t", None).unwrap();
1321        // Task is pending, progress update should be ignored
1322        manager.update_progress(&id, 0.5, Some("test".to_string()));
1323        let info = manager.get_info(&id).unwrap();
1324        assert!(info.progress.is_none());
1325    }
1326
1327    #[test]
1328    fn update_progress_on_completed_task_is_ignored() {
1329        let manager = TaskManager::new_for_testing();
1330        let cx = Cx::for_testing();
1331        manager.register_handler("t", |_cx, _params| async { Ok(serde_json::json!({})) });
1332        let id = manager.submit(&cx, "t", None).unwrap();
1333        manager.start_task(&id).unwrap();
1334        manager.complete_task(&id, serde_json::json!({}));
1335        // Task is completed, progress update should be ignored
1336        manager.update_progress(&id, 0.1, None);
1337        let info = manager.get_info(&id).unwrap();
1338        assert_eq!(info.progress, Some(1.0)); // unchanged from completion
1339    }
1340
1341    // ── complete_task / fail_task on nonexistent ────────────────────────
1342
1343    #[test]
1344    fn complete_task_nonexistent_does_not_panic() {
1345        let manager = TaskManager::new_for_testing();
1346        let fake_id = TaskId::from_string("nonexistent".to_string());
1347        manager.complete_task(&fake_id, serde_json::json!({})); // should not panic
1348    }
1349
1350    #[test]
1351    fn fail_task_nonexistent_does_not_panic() {
1352        let manager = TaskManager::new_for_testing();
1353        let fake_id = TaskId::from_string("nonexistent".to_string());
1354        manager.fail_task(&fake_id, "error"); // should not panic
1355    }
1356
1357    // ── cancel edge cases ──────────────────────────────────────────────
1358
1359    #[test]
1360    fn cancel_nonexistent_task_returns_error() {
1361        let manager = TaskManager::new_for_testing();
1362        let fake_id = TaskId::from_string("nonexistent".to_string());
1363        let err = manager.cancel(&fake_id, None).unwrap_err();
1364        assert!(err.message.contains("not found"));
1365    }
1366
1367    #[test]
1368    fn cancel_pending_task_directly() {
1369        let manager = TaskManager::new_for_testing();
1370        let cx = Cx::for_testing();
1371        manager.register_handler("t", |_cx, _params| async { Ok(serde_json::json!({})) });
1372        let id = manager.submit(&cx, "t", None).unwrap();
1373        // Cancel from Pending (valid: Pending -> Cancelled)
1374        let info = manager.cancel(&id, None).unwrap();
1375        assert_eq!(info.status, TaskStatus::Cancelled);
1376        assert!(manager.is_cancel_requested(&id));
1377    }
1378
1379    #[test]
1380    fn cancel_with_default_reason() {
1381        let manager = TaskManager::new_for_testing();
1382        let cx = Cx::for_testing();
1383        manager.register_handler("t", |_cx, _params| async { Ok(serde_json::json!({})) });
1384        let id = manager.submit(&cx, "t", None).unwrap();
1385        let info = manager.cancel(&id, None).unwrap();
1386        assert_eq!(info.error, Some("Cancelled by request".to_string()));
1387    }
1388
1389    // ── task ID sequencing ─────────────────────────────────────────────
1390
1391    #[test]
1392    fn task_ids_are_sequential() {
1393        let manager = TaskManager::new_for_testing();
1394        let cx = Cx::for_testing();
1395        manager.register_handler("t", |_cx, _params| async { Ok(serde_json::json!({})) });
1396        let id1 = manager.submit(&cx, "t", None).unwrap();
1397        let id2 = manager.submit(&cx, "t", None).unwrap();
1398        assert_ne!(id1, id2);
1399        assert!(id1.0.starts_with("task-"));
1400        assert!(id2.0.starts_with("task-"));
1401    }
1402
1403    // ── start_task edge cases ──────────────────────────────────────────
1404
1405    #[test]
1406    fn start_task_nonexistent_returns_error() {
1407        let manager = TaskManager::new_for_testing();
1408        let fake_id = TaskId::from_string("nonexistent".to_string());
1409        let err = manager.start_task(&fake_id).unwrap_err();
1410        assert!(err.message.contains("not found"));
1411    }
1412
1413    #[test]
1414    fn start_task_already_running_returns_error() {
1415        let manager = TaskManager::new_for_testing();
1416        let cx = Cx::for_testing();
1417        manager.register_handler("t", |_cx, _params| async { Ok(serde_json::json!({})) });
1418        let id = manager.submit(&cx, "t", None).unwrap();
1419        manager.start_task(&id).unwrap();
1420        let err = manager.start_task(&id).unwrap_err();
1421        assert!(err.message.contains("not pending"));
1422    }
1423
1424    // ── cleanup_completed ──────────────────────────────────────────────
1425
1426    #[test]
1427    fn cleanup_completed_removes_old_terminal_tasks() {
1428        let manager = TaskManager::new_for_testing();
1429        let cx = Cx::for_testing();
1430        manager.register_handler("t", |_cx, _params| async { Ok(serde_json::json!({})) });
1431
1432        let id = manager.submit(&cx, "t", None).unwrap();
1433        manager.start_task(&id).unwrap();
1434        manager.complete_task(&id, serde_json::json!({}));
1435        assert_eq!(manager.total_count(), 1);
1436
1437        // Cleanup with 0 duration removes all completed tasks
1438        manager.cleanup_completed(std::time::Duration::from_secs(0));
1439        assert_eq!(manager.total_count(), 0);
1440    }
1441
1442    #[test]
1443    fn cleanup_completed_keeps_active_tasks() {
1444        let manager = TaskManager::new_for_testing();
1445        let cx = Cx::for_testing();
1446        manager.register_handler("t", |_cx, _params| async { Ok(serde_json::json!({})) });
1447
1448        let id1 = manager.submit(&cx, "t", None).unwrap();
1449        let id2 = manager.submit(&cx, "t", None).unwrap();
1450        manager.start_task(&id1).unwrap();
1451        manager.complete_task(&id1, serde_json::json!({}));
1452        // id2 is still pending (active)
1453
1454        manager.cleanup_completed(std::time::Duration::from_secs(0));
1455        assert_eq!(manager.total_count(), 1); // only id2 remains
1456        assert!(manager.get_info(&id2).is_some());
1457    }
1458
1459    #[test]
1460    fn cleanup_completed_keeps_recent_tasks() {
1461        let manager = TaskManager::new_for_testing();
1462        let cx = Cx::for_testing();
1463        manager.register_handler("t", |_cx, _params| async { Ok(serde_json::json!({})) });
1464
1465        let id = manager.submit(&cx, "t", None).unwrap();
1466        manager.start_task(&id).unwrap();
1467        manager.complete_task(&id, serde_json::json!({}));
1468
1469        // Cleanup with large duration keeps recently completed
1470        manager.cleanup_completed(std::time::Duration::from_secs(3600));
1471        assert_eq!(manager.total_count(), 1);
1472    }
1473
1474    // ── identity transition ────────────────────────────────────────────
1475
1476    #[test]
1477    fn transition_same_state_returns_true() {
1478        // Create a minimal TaskState to test transition_state
1479        let task_id = TaskId::from_string("test".to_string());
1480        let mut state = TaskState {
1481            info: TaskInfo {
1482                id: task_id,
1483                task_type: "t".to_string(),
1484                status: TaskStatus::Running,
1485                progress: None,
1486                message: None,
1487                created_at: String::new(),
1488                started_at: None,
1489                completed_at: None,
1490                error: None,
1491            },
1492            cancel_requested: false,
1493            result: None,
1494            cx: Cx::for_testing(),
1495        };
1496        // Same state transition returns true
1497        assert!(transition_state(&mut state, TaskStatus::Running));
1498    }
1499
1500    // ── submit with params ─────────────────────────────────────────────
1501
1502    #[test]
1503    fn submit_with_none_params_creates_task() {
1504        let manager = TaskManager::new_for_testing();
1505        let cx = Cx::for_testing();
1506        manager.register_handler("t", |_cx, _params| async { Ok(serde_json::json!({})) });
1507        let id = manager.submit(&cx, "t", None).unwrap();
1508        let info = manager.get_info(&id).unwrap();
1509        assert_eq!(info.task_type, "t");
1510        assert_eq!(info.status, TaskStatus::Pending);
1511        assert!(info.started_at.is_none());
1512        assert!(info.completed_at.is_none());
1513        assert!(info.error.is_none());
1514    }
1515
1516    #[test]
1517    fn submit_with_some_params_creates_task() {
1518        let manager = TaskManager::new_for_testing();
1519        let cx = Cx::for_testing();
1520        manager.register_handler("t", |_cx, _params| async { Ok(serde_json::json!({})) });
1521        let id = manager
1522            .submit(&cx, "t", Some(serde_json::json!({"key": "value"})))
1523            .unwrap();
1524        assert!(manager.get_info(&id).is_some());
1525    }
1526
1527    // ── fail_task sets result ──────────────────────────────────────────
1528
1529    #[test]
1530    fn fail_task_sets_error_result() {
1531        let manager = TaskManager::new_for_testing();
1532        let cx = Cx::for_testing();
1533        manager.register_handler("t", |_cx, _params| async { Ok(serde_json::json!({})) });
1534        let id = manager.submit(&cx, "t", None).unwrap();
1535        manager.start_task(&id).unwrap();
1536        manager.fail_task(&id, "boom");
1537        let result = manager.get_result(&id).unwrap();
1538        assert!(!result.success);
1539        assert_eq!(result.error, Some("boom".to_string()));
1540        assert!(result.data.is_none());
1541    }
1542
1543    // ── update_progress on nonexistent task ──────────────────────────────
1544
1545    #[test]
1546    fn update_progress_nonexistent_does_not_panic() {
1547        let manager = TaskManager::new_for_testing();
1548        let fake_id = TaskId::from_string("nonexistent".to_string());
1549        manager.update_progress(&fake_id, 0.5, None); // should not panic
1550    }
1551
1552    // ── fail_task on already-terminal task ───────────────────────────────
1553
1554    #[test]
1555    fn fail_task_on_completed_is_ignored() {
1556        let manager = TaskManager::new_for_testing();
1557        let cx = Cx::for_testing();
1558        manager.register_handler("t", |_cx, _params| async { Ok(serde_json::json!({})) });
1559        let id = manager.submit(&cx, "t", None).unwrap();
1560        manager.start_task(&id).unwrap();
1561        manager.complete_task(&id, serde_json::json!({"done": true}));
1562        // Attempt to fail a completed task - should be ignored
1563        manager.fail_task(&id, "too late");
1564        let info = manager.get_info(&id).unwrap();
1565        assert_eq!(info.status, TaskStatus::Completed);
1566        let result = manager.get_result(&id).unwrap();
1567        assert!(result.success);
1568    }
1569
1570    // ── complete_task on already-terminal task ───────────────────────────
1571
1572    #[test]
1573    fn complete_task_on_failed_is_ignored() {
1574        let manager = TaskManager::new_for_testing();
1575        let cx = Cx::for_testing();
1576        manager.register_handler("t", |_cx, _params| async { Ok(serde_json::json!({})) });
1577        let id = manager.submit(&cx, "t", None).unwrap();
1578        manager.start_task(&id).unwrap();
1579        manager.fail_task(&id, "something broke");
1580        // Attempt to complete a failed task - should be ignored
1581        manager.complete_task(&id, serde_json::json!({"late": true}));
1582        let info = manager.get_info(&id).unwrap();
1583        assert_eq!(info.status, TaskStatus::Failed);
1584        let result = manager.get_result(&id).unwrap();
1585        assert!(!result.success);
1586    }
1587
1588    // ── register_handler replaces existing handler ──────────────────────
1589
1590    #[test]
1591    fn register_handler_replaces_existing() {
1592        let manager = TaskManager::new_for_testing();
1593        manager.register_handler("t", |_cx, _params| async {
1594            Ok(serde_json::json!({"v": 1}))
1595        });
1596        manager.register_handler("t", |_cx, _params| async {
1597            Ok(serde_json::json!({"v": 2}))
1598        });
1599        // Should succeed with the new handler
1600        let cx = Cx::for_testing();
1601        let id = manager.submit(&cx, "t", None).unwrap();
1602        assert!(manager.get_info(&id).is_some());
1603    }
1604
1605    // ── transition_state timestamps ─────────────────────────────────────
1606
1607    #[test]
1608    fn transition_to_running_sets_started_at() {
1609        let task_id = TaskId::from_string("ts-test".to_string());
1610        let mut state = TaskState {
1611            info: TaskInfo {
1612                id: task_id,
1613                task_type: "t".to_string(),
1614                status: TaskStatus::Pending,
1615                progress: None,
1616                message: None,
1617                created_at: String::new(),
1618                started_at: None,
1619                completed_at: None,
1620                error: None,
1621            },
1622            cancel_requested: false,
1623            result: None,
1624            cx: Cx::for_testing(),
1625        };
1626        assert!(state.info.started_at.is_none());
1627        assert!(transition_state(&mut state, TaskStatus::Running));
1628        assert!(state.info.started_at.is_some());
1629    }
1630
1631    #[test]
1632    fn transition_to_completed_sets_completed_at() {
1633        let task_id = TaskId::from_string("ts-test".to_string());
1634        let mut state = TaskState {
1635            info: TaskInfo {
1636                id: task_id,
1637                task_type: "t".to_string(),
1638                status: TaskStatus::Running,
1639                progress: None,
1640                message: None,
1641                created_at: String::new(),
1642                started_at: Some("earlier".to_string()),
1643                completed_at: None,
1644                error: None,
1645            },
1646            cancel_requested: false,
1647            result: None,
1648            cx: Cx::for_testing(),
1649        };
1650        assert!(state.info.completed_at.is_none());
1651        assert!(transition_state(&mut state, TaskStatus::Completed));
1652        assert!(state.info.completed_at.is_some());
1653    }
1654
1655    #[test]
1656    fn transition_to_failed_sets_completed_at() {
1657        let task_id = TaskId::from_string("ts-test".to_string());
1658        let mut state = TaskState {
1659            info: TaskInfo {
1660                id: task_id,
1661                task_type: "t".to_string(),
1662                status: TaskStatus::Running,
1663                progress: None,
1664                message: None,
1665                created_at: String::new(),
1666                started_at: Some("earlier".to_string()),
1667                completed_at: None,
1668                error: None,
1669            },
1670            cancel_requested: false,
1671            result: None,
1672            cx: Cx::for_testing(),
1673        };
1674        assert!(transition_state(&mut state, TaskStatus::Failed));
1675        assert!(state.info.completed_at.is_some());
1676    }
1677
1678    #[test]
1679    fn transition_to_cancelled_sets_completed_at() {
1680        let task_id = TaskId::from_string("ts-test".to_string());
1681        let mut state = TaskState {
1682            info: TaskInfo {
1683                id: task_id,
1684                task_type: "t".to_string(),
1685                status: TaskStatus::Running,
1686                progress: None,
1687                message: None,
1688                created_at: String::new(),
1689                started_at: Some("earlier".to_string()),
1690                completed_at: None,
1691                error: None,
1692            },
1693            cancel_requested: false,
1694            result: None,
1695            cx: Cx::for_testing(),
1696        };
1697        assert!(transition_state(&mut state, TaskStatus::Cancelled));
1698        assert!(state.info.completed_at.is_some());
1699    }
1700
1701    #[test]
1702    fn transition_invalid_returns_false() {
1703        let task_id = TaskId::from_string("ts-test".to_string());
1704        let mut state = TaskState {
1705            info: TaskInfo {
1706                id: task_id,
1707                task_type: "t".to_string(),
1708                status: TaskStatus::Pending,
1709                progress: None,
1710                message: None,
1711                created_at: String::new(),
1712                started_at: None,
1713                completed_at: None,
1714                error: None,
1715            },
1716            cancel_requested: false,
1717            result: None,
1718            cx: Cx::for_testing(),
1719        };
1720        // Pending -> Completed is invalid
1721        assert!(!transition_state(&mut state, TaskStatus::Completed));
1722        // State should remain Pending
1723        assert_eq!(state.info.status, TaskStatus::Pending);
1724    }
1725
1726    // ── TaskStatusSnapshot ──────────────────────────────────────────────
1727
1728    #[test]
1729    fn task_status_snapshot_debug_and_clone() {
1730        let task_id = TaskId::from_string("snap-test".to_string());
1731        let state = TaskState {
1732            info: TaskInfo {
1733                id: task_id,
1734                task_type: "t".to_string(),
1735                status: TaskStatus::Running,
1736                progress: Some(0.5),
1737                message: Some("testing".to_string()),
1738                created_at: "now".to_string(),
1739                started_at: Some("now".to_string()),
1740                completed_at: None,
1741                error: None,
1742            },
1743            cancel_requested: false,
1744            result: None,
1745            cx: Cx::for_testing(),
1746        };
1747        let snapshot = TaskStatusSnapshot::from(&state);
1748        let debug = format!("{:?}", snapshot);
1749        assert!(debug.contains("TaskStatusSnapshot"));
1750        let cloned = snapshot.clone();
1751        assert_eq!(cloned.info.status, TaskStatus::Running);
1752        assert!(cloned.result.is_none());
1753    }
1754
1755    // ── cleanup with failed/cancelled tasks ─────────────────────────────
1756
1757    #[test]
1758    fn cleanup_completed_removes_failed_and_cancelled() {
1759        let manager = TaskManager::new_for_testing();
1760        let cx = Cx::for_testing();
1761        manager.register_handler("t", |_cx, _params| async { Ok(serde_json::json!({})) });
1762
1763        let id1 = manager.submit(&cx, "t", None).unwrap();
1764        let id2 = manager.submit(&cx, "t", None).unwrap();
1765        let id3 = manager.submit(&cx, "t", None).unwrap();
1766
1767        // Complete one
1768        manager.start_task(&id1).unwrap();
1769        manager.complete_task(&id1, serde_json::json!({}));
1770
1771        // Fail one
1772        manager.start_task(&id2).unwrap();
1773        manager.fail_task(&id2, "error");
1774
1775        // Cancel one
1776        manager.cancel(&id3, None).unwrap();
1777
1778        assert_eq!(manager.total_count(), 3);
1779
1780        // Cleanup with 0 duration should remove all terminal tasks
1781        manager.cleanup_completed(std::time::Duration::from_secs(0));
1782        assert_eq!(manager.total_count(), 0);
1783    }
1784
1785    // ── set_notification_sender replaces sender ─────────────────────────
1786
1787    #[test]
1788    fn set_notification_sender_replaces_existing() {
1789        let manager = TaskManager::new_for_testing();
1790        manager.register_handler("t", |_cx, _params| async { Ok(serde_json::json!({})) });
1791
1792        let count1 = Arc::new(std::sync::atomic::AtomicUsize::new(0));
1793        let count2 = Arc::new(std::sync::atomic::AtomicUsize::new(0));
1794
1795        let c1 = Arc::clone(&count1);
1796        let sender1: TaskNotificationSender = Arc::new(move |_| {
1797            c1.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
1798        });
1799        manager.set_notification_sender(sender1);
1800
1801        let cx = Cx::for_testing();
1802        let _id1 = manager.submit(&cx, "t", None).unwrap();
1803        assert!(count1.load(std::sync::atomic::Ordering::SeqCst) > 0);
1804
1805        // Replace sender
1806        let c2 = Arc::clone(&count2);
1807        let sender2: TaskNotificationSender = Arc::new(move |_| {
1808            c2.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
1809        });
1810        manager.set_notification_sender(sender2);
1811
1812        let _id2 = manager.submit(&cx, "t", None).unwrap();
1813        assert!(count2.load(std::sync::atomic::Ordering::SeqCst) > 0);
1814    }
1815
1816    // ── cancel with custom reason ───────────────────────────────────────
1817
1818    #[test]
1819    fn cancel_with_custom_reason() {
1820        let manager = TaskManager::new_for_testing();
1821        let cx = Cx::for_testing();
1822        manager.register_handler("t", |_cx, _params| async { Ok(serde_json::json!({})) });
1823        let id = manager.submit(&cx, "t", None).unwrap();
1824        manager.start_task(&id).unwrap();
1825        let info = manager.cancel(&id, Some("Timeout".to_string())).unwrap();
1826        assert_eq!(info.error, Some("Timeout".to_string()));
1827        let result = manager.get_result(&id).unwrap();
1828        assert_eq!(result.error, Some("Timeout".to_string()));
1829    }
1830
1831    // ── can_transition self-transitions ──────────────────────────────────
1832
1833    #[test]
1834    fn can_transition_self_is_false() {
1835        // Self-transitions are not in the match arms, so can_transition returns false,
1836        // but transition_state handles identity specially (returns true without changing state).
1837        assert!(!can_transition(TaskStatus::Pending, TaskStatus::Pending));
1838        assert!(!can_transition(TaskStatus::Running, TaskStatus::Running));
1839        assert!(!can_transition(
1840            TaskStatus::Completed,
1841            TaskStatus::Completed
1842        ));
1843        assert!(!can_transition(TaskStatus::Failed, TaskStatus::Failed));
1844        assert!(!can_transition(
1845            TaskStatus::Cancelled,
1846            TaskStatus::Cancelled
1847        ));
1848    }
1849
1850    // ── transition_state with Pending -> Pending (identity) ─────────────
1851
1852    #[test]
1853    fn transition_state_identity_pending_returns_true() {
1854        let task_id = TaskId::from_string("identity-test".to_string());
1855        let mut state = TaskState {
1856            info: TaskInfo {
1857                id: task_id,
1858                task_type: "t".to_string(),
1859                status: TaskStatus::Pending,
1860                progress: None,
1861                message: None,
1862                created_at: String::new(),
1863                started_at: None,
1864                completed_at: None,
1865                error: None,
1866            },
1867            cancel_requested: false,
1868            result: None,
1869            cx: Cx::for_testing(),
1870        };
1871        assert!(transition_state(&mut state, TaskStatus::Pending));
1872        assert_eq!(state.info.status, TaskStatus::Pending);
1873    }
1874
1875    // ── list_tasks with no filter ───────────────────────────────────────
1876
1877    #[test]
1878    fn list_tasks_no_filter_returns_all() {
1879        let manager = TaskManager::new_for_testing();
1880        let cx = Cx::for_testing();
1881        manager.register_handler("t", |_cx, _params| async { Ok(serde_json::json!({})) });
1882        let id1 = manager.submit(&cx, "t", None).unwrap();
1883        let _id2 = manager.submit(&cx, "t", None).unwrap();
1884        manager.start_task(&id1).unwrap();
1885        manager.complete_task(&id1, serde_json::json!({}));
1886        // id1 is Completed, id2 is Pending
1887        let all = manager.list_tasks(None);
1888        assert_eq!(all.len(), 2);
1889    }
1890
1891    // ── notification sender status content ──────────────────────────────
1892
1893    #[test]
1894    fn cancel_notification_includes_error_and_result() {
1895        let manager = TaskManager::new_for_testing();
1896        manager.register_handler("t", |_cx, _params| async { Ok(serde_json::json!({})) });
1897
1898        let events: Arc<std::sync::Mutex<Vec<TaskStatusNotificationParams>>> =
1899            Arc::new(std::sync::Mutex::new(Vec::new()));
1900        let sender_events = Arc::clone(&events);
1901        let sender: TaskNotificationSender = Arc::new(move |request| {
1902            if request.method == "notifications/tasks/status" {
1903                let params: TaskStatusNotificationParams = request
1904                    .params
1905                    .as_ref()
1906                    .and_then(|v| serde_json::from_value(v.clone()).ok())
1907                    .unwrap();
1908                sender_events.lock().unwrap().push(params);
1909            }
1910        });
1911        manager.set_notification_sender(sender);
1912
1913        let cx = Cx::for_testing();
1914        let id = manager.submit(&cx, "t", None).unwrap();
1915        manager.cancel(&id, Some("user abort".to_string())).unwrap();
1916
1917        let recorded = events.lock().unwrap().clone();
1918        // Last notification should be the cancellation
1919        let last = recorded.last().unwrap();
1920        assert_eq!(last.status, TaskStatus::Cancelled);
1921        assert_eq!(last.error, Some("user abort".to_string()));
1922        assert!(last.result.is_some());
1923        let result = last.result.as_ref().unwrap();
1924        assert!(!result.success);
1925    }
1926
1927    // ── complete sets progress to 1.0 ───────────────────────────────────
1928
1929    #[test]
1930    fn complete_task_sets_progress_to_one() {
1931        let manager = TaskManager::new_for_testing();
1932        let cx = Cx::for_testing();
1933        manager.register_handler("t", |_cx, _params| async { Ok(serde_json::json!({})) });
1934        let id = manager.submit(&cx, "t", None).unwrap();
1935        manager.start_task(&id).unwrap();
1936        manager.update_progress(&id, 0.5, None);
1937        manager.complete_task(&id, serde_json::json!({}));
1938        let info = manager.get_info(&id).unwrap();
1939        assert_eq!(info.progress, Some(1.0));
1940    }
1941
1942    // ── cleanup_completed — edge cases ─────────────────────────────────
1943
1944    #[test]
1945    fn cleanup_completed_keeps_terminal_without_completed_at() {
1946        let manager = TaskManager::new_for_testing();
1947        let cx = Cx::for_testing();
1948        manager.register_handler("t", |_cx, _params| async { Ok(serde_json::json!({})) });
1949        let id = manager.submit(&cx, "t", None).unwrap();
1950        manager.start_task(&id).unwrap();
1951        manager.complete_task(&id, serde_json::json!({}));
1952
1953        // Manually remove completed_at to simulate edge case
1954        {
1955            let mut tasks = manager.tasks.write().unwrap();
1956            tasks.get_mut(&id).unwrap().info.completed_at = None;
1957        }
1958
1959        // Cleanup should keep the task (no completed_at → can't determine age)
1960        manager.cleanup_completed(std::time::Duration::from_secs(0));
1961        assert_eq!(manager.total_count(), 1);
1962    }
1963
1964    #[test]
1965    fn cleanup_completed_keeps_terminal_with_unparseable_timestamp() {
1966        let manager = TaskManager::new_for_testing();
1967        let cx = Cx::for_testing();
1968        manager.register_handler("t", |_cx, _params| async { Ok(serde_json::json!({})) });
1969        let id = manager.submit(&cx, "t", None).unwrap();
1970        manager.start_task(&id).unwrap();
1971        manager.complete_task(&id, serde_json::json!({}));
1972
1973        // Set completed_at to unparseable value
1974        {
1975            let mut tasks = manager.tasks.write().unwrap();
1976            tasks.get_mut(&id).unwrap().info.completed_at = Some("not-a-date".to_string());
1977        }
1978
1979        manager.cleanup_completed(std::time::Duration::from_secs(0));
1980        assert_eq!(manager.total_count(), 1);
1981    }
1982
1983    // ── Debug with populated state ──────────────────────────────────────
1984
1985    #[test]
1986    fn debug_output_with_tasks_and_handlers() {
1987        let manager = TaskManager::new_for_testing();
1988        manager.register_handler("type_a", |_cx, _params| async { Ok(serde_json::json!({})) });
1989        manager.register_handler("type_b", |_cx, _params| async { Ok(serde_json::json!({})) });
1990        let cx = Cx::for_testing();
1991        let _ = manager.submit(&cx, "type_a", None).unwrap();
1992        let _ = manager.submit(&cx, "type_b", None).unwrap();
1993
1994        let debug = format!("{:?}", manager);
1995        assert!(debug.contains("task_count: 2"));
1996        assert!(debug.contains("handler_count: 2"));
1997    }
1998
1999    // ── Multiple handler types ──────────────────────────────────────────
2000
2001    #[test]
2002    fn multiple_handler_types_independent() {
2003        let manager = TaskManager::new_for_testing();
2004        let cx = Cx::for_testing();
2005        manager.register_handler("analyze", |_cx, _params| async {
2006            Ok(serde_json::json!({"type": "analyze"}))
2007        });
2008        manager.register_handler("summarize", |_cx, _params| async {
2009            Ok(serde_json::json!({"type": "summarize"}))
2010        });
2011
2012        let id_a = manager.submit(&cx, "analyze", None).unwrap();
2013        let id_s = manager.submit(&cx, "summarize", None).unwrap();
2014
2015        let info_a = manager.get_info(&id_a).unwrap();
2016        let info_s = manager.get_info(&id_s).unwrap();
2017        assert_eq!(info_a.task_type, "analyze");
2018        assert_eq!(info_s.task_type, "summarize");
2019    }
2020
2021    // ── list_tasks filters for all terminal statuses ────────────────────
2022
2023    #[test]
2024    fn list_tasks_filter_failed() {
2025        let manager = TaskManager::new_for_testing();
2026        let cx = Cx::for_testing();
2027        manager.register_handler("t", |_cx, _params| async { Ok(serde_json::json!({})) });
2028
2029        let id = manager.submit(&cx, "t", None).unwrap();
2030        manager.start_task(&id).unwrap();
2031        manager.fail_task(&id, "err");
2032
2033        assert_eq!(manager.list_tasks(Some(TaskStatus::Failed)).len(), 1);
2034        assert_eq!(manager.list_tasks(Some(TaskStatus::Completed)).len(), 0);
2035    }
2036
2037    #[test]
2038    fn list_tasks_filter_cancelled() {
2039        let manager = TaskManager::new_for_testing();
2040        let cx = Cx::for_testing();
2041        manager.register_handler("t", |_cx, _params| async { Ok(serde_json::json!({})) });
2042
2043        let id = manager.submit(&cx, "t", None).unwrap();
2044        manager.cancel(&id, None).unwrap();
2045
2046        assert_eq!(manager.list_tasks(Some(TaskStatus::Cancelled)).len(), 1);
2047        assert_eq!(manager.list_tasks(Some(TaskStatus::Pending)).len(), 0);
2048    }
2049
2050    // ── notification content for progress ────────────────────────────────
2051
2052    #[test]
2053    fn progress_notification_includes_message() {
2054        let manager = TaskManager::new_for_testing();
2055        manager.register_handler("t", |_cx, _params| async { Ok(serde_json::json!({})) });
2056
2057        let events: Arc<std::sync::Mutex<Vec<TaskStatusNotificationParams>>> =
2058            Arc::new(std::sync::Mutex::new(Vec::new()));
2059        let sender_events = Arc::clone(&events);
2060        let sender: TaskNotificationSender = Arc::new(move |request| {
2061            if request.method == "notifications/tasks/status" {
2062                let params: TaskStatusNotificationParams = request
2063                    .params
2064                    .as_ref()
2065                    .and_then(|v| serde_json::from_value(v.clone()).ok())
2066                    .unwrap();
2067                sender_events.lock().unwrap().push(params);
2068            }
2069        });
2070        manager.set_notification_sender(sender);
2071
2072        let cx = Cx::for_testing();
2073        let id = manager.submit(&cx, "t", None).unwrap();
2074        manager.start_task(&id).unwrap();
2075        manager.update_progress(&id, 0.75, Some("three quarters".to_string()));
2076
2077        let recorded = events.lock().unwrap().clone();
2078        let progress_event = recorded
2079            .iter()
2080            .find(|e| e.progress == Some(0.75))
2081            .expect("progress notification");
2082        assert_eq!(progress_event.message, Some("three quarters".to_string()));
2083        assert_eq!(progress_event.status, TaskStatus::Running);
2084    }
2085
2086    // ── TaskStatusSnapshot with result ────────────────────────────────────
2087
2088    #[test]
2089    fn task_status_snapshot_includes_result() {
2090        let task_id = TaskId::from_string("snap-result");
2091        let state = TaskState {
2092            info: TaskInfo {
2093                id: task_id.clone(),
2094                task_type: "t".to_string(),
2095                status: TaskStatus::Completed,
2096                progress: Some(1.0),
2097                message: None,
2098                created_at: "now".to_string(),
2099                started_at: Some("now".to_string()),
2100                completed_at: Some("now".to_string()),
2101                error: None,
2102            },
2103            cancel_requested: false,
2104            result: Some(TaskResult {
2105                id: task_id,
2106                success: true,
2107                data: Some(serde_json::json!({"done": true})),
2108                error: None,
2109            }),
2110            cx: Cx::for_testing(),
2111        };
2112        let snapshot = TaskStatusSnapshot::from(&state);
2113        assert!(snapshot.result.is_some());
2114        let result = snapshot.result.unwrap();
2115        assert!(result.success);
2116        assert_eq!(result.data, Some(serde_json::json!({"done": true})));
2117    }
2118
2119    // ── submit error message ──────────────────────────────────────────────
2120
2121    #[test]
2122    fn submit_unknown_task_type_error_message() {
2123        let manager = TaskManager::new_for_testing();
2124        let cx = Cx::for_testing();
2125        let err = manager.submit(&cx, "nonexistent_type", None).unwrap_err();
2126        assert!(err.message.contains("Unknown task type"));
2127        assert!(err.message.contains("nonexistent_type"));
2128    }
2129
2130    // ── cancel result data ───────────────────────────────────────────────
2131
2132    #[test]
2133    fn cancel_result_has_no_data() {
2134        let manager = TaskManager::new_for_testing();
2135        let cx = Cx::for_testing();
2136        manager.register_handler("t", |_cx, _params| async { Ok(serde_json::json!({})) });
2137        let id = manager.submit(&cx, "t", None).unwrap();
2138        manager.start_task(&id).unwrap();
2139        manager.cancel(&id, Some("abort".to_string())).unwrap();
2140        let result = manager.get_result(&id).unwrap();
2141        assert!(!result.success);
2142        assert!(result.data.is_none());
2143        assert_eq!(result.error, Some("abort".to_string()));
2144    }
2145
2146    // ── Additional coverage — uncovered terminal-state cancel paths ──
2147
2148    #[test]
2149    fn cancel_completed_task_returns_error() {
2150        let manager = TaskManager::new_for_testing();
2151        let cx = Cx::for_testing();
2152        manager.register_handler("t", |_cx, _params| async { Ok(serde_json::json!({})) });
2153        let id = manager.submit(&cx, "t", None).unwrap();
2154        manager.start_task(&id).unwrap();
2155        manager.complete_task(&id, serde_json::json!({}));
2156        let err = manager.cancel(&id, None).unwrap_err();
2157        assert!(err.message.contains("terminal"));
2158    }
2159
2160    #[test]
2161    fn cancel_failed_task_returns_error() {
2162        let manager = TaskManager::new_for_testing();
2163        let cx = Cx::for_testing();
2164        manager.register_handler("t", |_cx, _params| async { Ok(serde_json::json!({})) });
2165        let id = manager.submit(&cx, "t", None).unwrap();
2166        manager.start_task(&id).unwrap();
2167        manager.fail_task(&id, "broke");
2168        let err = manager.cancel(&id, None).unwrap_err();
2169        assert!(err.message.contains("terminal"));
2170    }
2171
2172    #[test]
2173    fn fail_task_on_pending_records_failure() {
2174        let manager = TaskManager::new_for_testing();
2175        let cx = Cx::for_testing();
2176        manager.register_handler("t", |_cx, _params| async { Ok(serde_json::json!({})) });
2177        let id = manager.submit(&cx, "t", None).unwrap();
2178        manager.fail_task(&id, "too early");
2179        let info = manager.get_info(&id).unwrap();
2180        assert_eq!(info.status, TaskStatus::Failed);
2181        assert_eq!(info.error.as_deref(), Some("too early"));
2182        assert!(info.completed_at.is_some());
2183
2184        let result = manager
2185            .get_result(&id)
2186            .expect("failed task should record a result");
2187        assert!(!result.success);
2188        assert_eq!(result.error.as_deref(), Some("too early"));
2189    }
2190
2191    #[test]
2192    fn spawn_task_skips_handler_for_pre_failed_pending_task() {
2193        let manager = TaskManager::new();
2194        let task_runs = Arc::new(AtomicU64::new(0));
2195        let task_type = "never-run".to_string();
2196        let task_id = TaskId::from_string("task-prefailed");
2197        let task_cx = Cx::for_request_with_budget(Budget::INFINITE);
2198        let now = chrono::Utc::now().to_rfc3339();
2199
2200        manager.register_handler(task_type.clone(), {
2201            let task_runs = Arc::clone(&task_runs);
2202            move |_cx, _params| {
2203                let task_runs = Arc::clone(&task_runs);
2204                async move {
2205                    task_runs.fetch_add(1, Ordering::SeqCst);
2206                    Ok(serde_json::json!({"unexpected": true}))
2207                }
2208            }
2209        });
2210
2211        {
2212            let mut tasks = manager.tasks.write().unwrap_or_else(|poisoned| {
2213                warn!(target: targets::SERVER, "tasks lock poisoned in test, recovering");
2214                poisoned.into_inner()
2215            });
2216            tasks.insert(
2217                task_id.clone(),
2218                TaskState {
2219                    info: TaskInfo {
2220                        id: task_id.clone(),
2221                        task_type: task_type.clone(),
2222                        status: TaskStatus::Failed,
2223                        progress: None,
2224                        message: None,
2225                        created_at: now,
2226                        started_at: None,
2227                        completed_at: Some(chrono::Utc::now().to_rfc3339()),
2228                        error: Some("prefailed".to_string()),
2229                    },
2230                    cancel_requested: false,
2231                    result: Some(TaskResult {
2232                        id: task_id.clone(),
2233                        success: false,
2234                        data: None,
2235                        error: Some("prefailed".to_string()),
2236                    }),
2237                    cx: task_cx.clone(),
2238                },
2239            );
2240        }
2241
2242        manager.spawn_task(task_id.clone(), task_type, task_cx, serde_json::json!({}));
2243
2244        let deadline = std::time::Instant::now() + Duration::from_secs(1);
2245        while std::time::Instant::now() < deadline {
2246            if task_runs.load(Ordering::SeqCst) > 0 {
2247                break;
2248            }
2249            thread::sleep(Duration::from_millis(10));
2250        }
2251
2252        assert_eq!(
2253            task_runs.load(Ordering::SeqCst),
2254            0,
2255            "pre-failed pending task must not execute its handler"
2256        );
2257
2258        let info = manager
2259            .get_info(&task_id)
2260            .expect("prefailed task should remain present");
2261        assert_eq!(info.status, TaskStatus::Failed);
2262        assert_eq!(info.error.as_deref(), Some("prefailed"));
2263    }
2264
2265    #[test]
2266    fn complete_task_on_cancelled_is_ignored() {
2267        let manager = TaskManager::new_for_testing();
2268        let cx = Cx::for_testing();
2269        manager.register_handler("t", |_cx, _params| async { Ok(serde_json::json!({})) });
2270        let id = manager.submit(&cx, "t", None).unwrap();
2271        manager.start_task(&id).unwrap();
2272        manager.cancel(&id, Some("aborted".to_string())).unwrap();
2273        // Cancelled -> Completed is not valid
2274        manager.complete_task(&id, serde_json::json!({"late": true}));
2275        let info = manager.get_info(&id).unwrap();
2276        assert_eq!(info.status, TaskStatus::Cancelled);
2277    }
2278
2279    #[test]
2280    fn update_progress_none_message_clears_previous() {
2281        let manager = TaskManager::new_for_testing();
2282        let cx = Cx::for_testing();
2283        manager.register_handler("t", |_cx, _params| async { Ok(serde_json::json!({})) });
2284        let id = manager.submit(&cx, "t", None).unwrap();
2285        manager.start_task(&id).unwrap();
2286        manager.update_progress(&id, 0.3, Some("step 1".to_string()));
2287        assert_eq!(
2288            manager.get_info(&id).unwrap().message,
2289            Some("step 1".to_string())
2290        );
2291        manager.update_progress(&id, 0.6, None);
2292        assert!(manager.get_info(&id).unwrap().message.is_none());
2293    }
2294
2295    #[test]
2296    fn no_notification_sender_does_not_panic() {
2297        let manager = TaskManager::new_for_testing();
2298        let cx = Cx::for_testing();
2299        manager.register_handler("t", |_cx, _params| async { Ok(serde_json::json!({})) });
2300        // No notification sender set — all operations should still work
2301        let id = manager.submit(&cx, "t", None).unwrap();
2302        manager.start_task(&id).unwrap();
2303        manager.update_progress(&id, 0.5, None);
2304        manager.complete_task(&id, serde_json::json!({}));
2305        assert_eq!(manager.get_info(&id).unwrap().status, TaskStatus::Completed);
2306    }
2307}