Skip to main content

fastmcp_server/
tasks.rs

1//! Background task manager (Docket/SEP-1686).
2//!
3//! Provides support for long-running background tasks that outlive individual
4//! request lifecycles. Tasks are managed in a dedicated region that survives
5//! until server shutdown.
6//!
7//! # Architecture
8//!
9//! ```text
10//! Server Region (root)
11//! ├── Session Region (per connection)
12//! │   └── Request Regions (tools/call, etc.)
13//! └── Background Task Region (managed by TaskManager)
14//!     ├── Task 1
15//!     ├── Task 2
16//!     └── ...
17//! ```
18//!
19//! # Usage
20//!
21//! ```ignore
22//! let task_manager = TaskManager::new();
23//!
24//! // Submit a background task
25//! let task_id = task_manager.submit(&cx, "long_analysis", Some(json!({"data": ...})))?;
26//!
27//! // Check status
28//! let info = task_manager.get_info(&task_id);
29//!
30//! // Cancel if needed
31//! task_manager.cancel(&task_id, Some("User requested"))?;
32//! ```
33
34use std::collections::HashMap;
35use std::sync::atomic::{AtomicU64, Ordering};
36use std::sync::{Arc, RwLock};
37
38use asupersync::runtime::{RuntimeBuilder, RuntimeHandle};
39use asupersync::{Budget, CancelKind, Cx};
40use fastmcp_core::logging::{debug, info, targets, warn};
41use fastmcp_core::{McpError, McpResult};
42use fastmcp_protocol::{
43    JsonRpcRequest, TaskId, TaskInfo, TaskResult, TaskStatus, TaskStatusNotificationParams,
44};
45
46/// Notification sender used for task status updates.
47pub type TaskNotificationSender = Arc<dyn Fn(JsonRpcRequest) + Send + Sync>;
48
49/// Callback type for task execution.
50///
51/// Task handlers receive the context and parameters, and return a result.
52pub type TaskHandler = Box<dyn Fn(&Cx, serde_json::Value) -> TaskFuture + Send + Sync + 'static>;
53
54/// Future type for task execution.
55pub type TaskFuture = std::pin::Pin<
56    Box<dyn std::future::Future<Output = McpResult<serde_json::Value>> + Send + 'static>,
57>;
58
59/// Internal state for a running task.
60struct TaskState {
61    /// Task information.
62    info: TaskInfo,
63    /// Whether cancellation has been requested.
64    cancel_requested: bool,
65    /// Task result once completed.
66    result: Option<TaskResult>,
67    /// Task-scoped cancellation context.
68    cx: Cx,
69}
70
71fn can_transition(from: TaskStatus, to: TaskStatus) -> bool {
72    matches!(
73        (from, to),
74        (
75            TaskStatus::Pending,
76            TaskStatus::Running | TaskStatus::Failed | TaskStatus::Cancelled
77        ) | (
78            TaskStatus::Running,
79            TaskStatus::Completed | TaskStatus::Failed | TaskStatus::Cancelled
80        )
81    )
82}
83
84fn transition_state(state: &mut TaskState, to: TaskStatus) -> bool {
85    let from = state.info.status;
86    if from == to {
87        return true;
88    }
89    if !can_transition(from, to) {
90        warn!(
91            target: targets::SERVER,
92            "task {} invalid transition {:?} -> {:?}",
93            state.info.id,
94            from,
95            to
96        );
97        return false;
98    }
99
100    state.info.status = to;
101    let now = chrono::Utc::now().to_rfc3339();
102    match to {
103        TaskStatus::Running => {
104            state.info.started_at = Some(now.clone());
105        }
106        TaskStatus::Completed | TaskStatus::Failed | TaskStatus::Cancelled => {
107            state.info.completed_at = Some(now.clone());
108        }
109        TaskStatus::Pending => {}
110    }
111
112    info!(
113        target: targets::SERVER,
114        "task {} status {:?} -> {:?} at {}",
115        state.info.id,
116        from,
117        to,
118        now
119    );
120    true
121}
122
123fn mark_task_failed_snapshot(
124    tasks: &Arc<RwLock<HashMap<TaskId, TaskState>>>,
125    task_id: &TaskId,
126    error_msg: String,
127    lock_context: &'static str,
128) -> Option<TaskStatusSnapshot> {
129    let mut tasks_guard = tasks.write().unwrap_or_else(|poisoned| {
130        warn!(
131            target: targets::SERVER,
132            "tasks lock poisoned in {}, recovering",
133            lock_context
134        );
135        poisoned.into_inner()
136    });
137
138    let state = tasks_guard.get_mut(task_id)?;
139    if state.cancel_requested || !transition_state(state, TaskStatus::Failed) {
140        return None;
141    }
142
143    state.info.error = Some(error_msg.clone());
144    state.result = Some(TaskResult {
145        id: task_id.clone(),
146        success: false,
147        data: None,
148        error: Some(error_msg),
149    });
150    Some(TaskStatusSnapshot::from(state))
151}
152
153fn build_runtime_handle() -> Option<RuntimeHandle> {
154    match RuntimeBuilder::multi_thread().build() {
155        Ok(runtime) => Some(runtime.handle()),
156        Err(multi_err) => {
157            warn!(
158                target: targets::SERVER,
159                "failed to initialize multi-thread runtime for tasks: {}; attempting current-thread fallback",
160                multi_err
161            );
162            match RuntimeBuilder::current_thread().build() {
163                Ok(runtime) => Some(runtime.handle()),
164                Err(single_err) => {
165                    warn!(
166                        target: targets::SERVER,
167                        "failed to initialize current-thread runtime fallback for tasks: {}",
168                        single_err
169                    );
170                    None
171                }
172            }
173        }
174    }
175}
176
177/// Background task manager.
178///
179/// Manages the lifecycle of background tasks including submission, status
180/// tracking, and cancellation.
181pub struct TaskManager {
182    /// Active and completed tasks by ID.
183    tasks: Arc<RwLock<HashMap<TaskId, TaskState>>>,
184    /// Registered task handlers by type.
185    handlers: Arc<RwLock<HashMap<String, TaskHandler>>>,
186    /// Counter for generating unique task IDs.
187    task_counter: AtomicU64,
188    /// Whether task list changes should trigger notifications.
189    list_changed_notifications: bool,
190    /// Background runtime handle for executing tasks.
191    runtime: Option<RuntimeHandle>,
192    /// Whether submitted tasks should execute immediately.
193    auto_execute: bool,
194    /// Optional notification sender for task status updates.
195    notification_sender: Arc<RwLock<Option<TaskNotificationSender>>>,
196}
197
198impl TaskManager {
199    /// Creates a new task manager.
200    #[must_use]
201    pub fn new() -> Self {
202        let runtime = build_runtime_handle();
203        if runtime.is_none() {
204            warn!(
205                target: targets::SERVER,
206                "TaskManager runtime unavailable; auto-executed tasks will fail until runtime becomes available"
207            );
208        }
209        Self {
210            tasks: Arc::new(RwLock::new(HashMap::new())),
211            handlers: Arc::new(RwLock::new(HashMap::new())),
212            task_counter: AtomicU64::new(0),
213            list_changed_notifications: false,
214            runtime,
215            auto_execute: true,
216            notification_sender: Arc::new(RwLock::new(None)),
217        }
218    }
219
220    /// Creates a new task manager with list change notifications enabled.
221    #[must_use]
222    pub fn with_list_changed_notifications() -> Self {
223        Self {
224            list_changed_notifications: true,
225            ..Self::new()
226        }
227    }
228
229    /// Creates a task manager configured for deterministic tests.
230    ///
231    /// Tasks are not executed automatically; tests can drive state manually.
232    #[must_use]
233    pub fn new_for_testing() -> Self {
234        let mut manager = Self::new();
235        manager.auto_execute = false;
236        manager
237    }
238
239    /// Converts this manager into a shared handle.
240    #[must_use]
241    pub fn into_shared(self) -> SharedTaskManager {
242        Arc::new(self)
243    }
244
245    /// Returns whether list change notifications are enabled.
246    #[must_use]
247    pub fn has_list_changed_notifications(&self) -> bool {
248        self.list_changed_notifications
249    }
250
251    /// Sets the notification sender for task status updates.
252    pub fn set_notification_sender(&self, sender: TaskNotificationSender) {
253        let mut guard = self.notification_sender.write().unwrap_or_else(|poisoned| {
254            warn!(target: targets::SERVER, "notification sender lock poisoned, recovering");
255            poisoned.into_inner()
256        });
257        *guard = Some(sender);
258    }
259
260    /// Registers a task handler for a specific task type.
261    ///
262    /// The handler will be invoked when a task of this type is submitted.
263    pub fn register_handler<F, Fut>(&self, task_type: impl Into<String>, handler: F)
264    where
265        F: Fn(&Cx, serde_json::Value) -> Fut + Send + Sync + 'static,
266        Fut: std::future::Future<Output = McpResult<serde_json::Value>> + Send + 'static,
267    {
268        let task_type = task_type.into();
269        let boxed_handler: TaskHandler = Box::new(move |cx, params| Box::pin(handler(cx, params)));
270
271        let mut handlers = self.handlers.write().unwrap_or_else(|poisoned| {
272            warn!(target: targets::SERVER, "handlers lock poisoned, recovering");
273            poisoned.into_inner()
274        });
275        handlers.insert(task_type, boxed_handler);
276    }
277
278    /// Submits a new background task.
279    ///
280    /// Returns the task ID for tracking. The task runs asynchronously in the
281    /// background region.
282    pub fn submit(
283        &self,
284        _cx: &Cx,
285        task_type: impl Into<String>,
286        params: Option<serde_json::Value>,
287    ) -> McpResult<TaskId> {
288        let task_type = task_type.into();
289
290        // Check if handler exists
291        {
292            let handlers = self.handlers.read().unwrap_or_else(|poisoned| {
293                warn!(target: targets::SERVER, "handlers lock poisoned, recovering");
294                poisoned.into_inner()
295            });
296            if !handlers.contains_key(&task_type) {
297                return Err(McpError::invalid_params(format!(
298                    "Unknown task type: {task_type}"
299                )));
300            }
301        }
302
303        // Generate unique task ID
304        let counter = self.task_counter.fetch_add(1, Ordering::SeqCst);
305        let task_id = TaskId::from_string(format!("task-{counter:08x}"));
306
307        // Create task info
308        let now = chrono::Utc::now().to_rfc3339();
309        let task_cx = Cx::for_request_with_budget(Budget::INFINITE);
310        let info = TaskInfo {
311            id: task_id.clone(),
312            task_type: task_type.clone(),
313            status: TaskStatus::Pending,
314            progress: None,
315            message: None,
316            created_at: now,
317            started_at: None,
318            completed_at: None,
319            error: None,
320        };
321
322        let info_snapshot = info.clone();
323
324        // Store task state
325        let state = TaskState {
326            info,
327            cancel_requested: false,
328            result: None,
329            cx: task_cx.clone(),
330        };
331
332        {
333            let mut tasks = self.tasks.write().unwrap_or_else(|poisoned| {
334                warn!(target: targets::SERVER, "tasks lock poisoned, recovering");
335                poisoned.into_inner()
336            });
337            tasks.insert(task_id.clone(), state);
338        }
339
340        self.notify_status(info_snapshot, None);
341
342        if self.auto_execute {
343            let params = params.unwrap_or_else(|| serde_json::json!({}));
344            self.spawn_task(task_id.clone(), task_type, task_cx, params);
345        }
346
347        Ok(task_id)
348    }
349
350    #[allow(clippy::too_many_lines)]
351    fn spawn_task(
352        &self,
353        task_id: TaskId,
354        task_type: String,
355        task_cx: Cx,
356        params: serde_json::Value,
357    ) {
358        let Some(runtime) = self.runtime.clone() else {
359            let failure_snapshot = mark_task_failed_snapshot(
360                &self.tasks,
361                &task_id,
362                "Task runtime unavailable".to_string(),
363                "spawn_task runtime unavailable",
364            );
365            self.notify_snapshot(failure_snapshot);
366            return;
367        };
368
369        let tasks = Arc::clone(&self.tasks);
370        let handlers = Arc::clone(&self.handlers);
371        let notification_sender = Arc::clone(&self.notification_sender);
372        let scheduled_task_id = task_id.clone();
373        let scheduling = runtime.try_spawn(async move {
374            let running_snapshot = {
375                let mut tasks_guard = tasks.write().unwrap_or_else(|poisoned| {
376                    warn!(target: targets::SERVER, "tasks lock poisoned in spawn_task, recovering");
377                    poisoned.into_inner()
378                });
379                match tasks_guard.get_mut(&task_id) {
380                    Some(state) => {
381                        if state.cancel_requested || !transition_state(state, TaskStatus::Running) {
382                            None
383                        } else {
384                            Some(TaskStatusSnapshot::from(state))
385                        }
386                    }
387                    None => None,
388                }
389            };
390
391            notify_snapshot(&notification_sender, running_snapshot);
392
393            let task_future = {
394                let handlers_guard = handlers.read().unwrap_or_else(|poisoned| {
395                    warn!(target: targets::SERVER, "handlers lock poisoned in spawn_task, recovering");
396                    poisoned.into_inner()
397                });
398                let Some(handler) = handlers_guard.get(&task_type) else {
399                    let failure_snapshot = mark_task_failed_snapshot(
400                        &tasks,
401                        &task_id,
402                        format!("Unknown task type: {task_type}"),
403                        "spawn_task failure",
404                    );
405                    notify_snapshot(&notification_sender, failure_snapshot);
406                    return;
407                };
408                (handler)(&task_cx, params)
409            };
410
411            let result = task_future.await;
412
413            let completion_snapshot = {
414                let mut tasks_guard = tasks.write().unwrap_or_else(|poisoned| {
415                    warn!(target: targets::SERVER, "tasks lock poisoned in spawn_task completion, recovering");
416                    poisoned.into_inner()
417                });
418                match tasks_guard.get_mut(&task_id) {
419                    Some(state) => {
420                        if state.cancel_requested {
421                            None
422                        } else {
423                            let mut snapshot = None;
424                            match result {
425                                Ok(data) => {
426                                    if transition_state(state, TaskStatus::Completed) {
427                                        state.info.progress = Some(1.0);
428                                        state.result = Some(TaskResult {
429                                            id: task_id.clone(),
430                                            success: true,
431                                            data: Some(data),
432                                            error: None,
433                                        });
434                                        snapshot = Some(TaskStatusSnapshot::from(state));
435                                    }
436                                }
437                                Err(err) => {
438                                    let error_msg = err.message;
439                                    if transition_state(state, TaskStatus::Failed) {
440                                        state.info.error = Some(error_msg.clone());
441                                        state.result = Some(TaskResult {
442                                            id: task_id.clone(),
443                                            success: false,
444                                            data: None,
445                                            error: Some(error_msg),
446                                        });
447                                        snapshot = Some(TaskStatusSnapshot::from(state));
448                                    }
449                                }
450                            }
451                            snapshot
452                        }
453                    }
454                    None => None,
455                }
456            };
457
458            notify_snapshot(&notification_sender, completion_snapshot);
459        });
460
461        if let Err(err) = scheduling {
462            warn!(
463                target: targets::SERVER,
464                "failed to schedule task {}: {}",
465                scheduled_task_id,
466                err
467            );
468            let failure_snapshot = mark_task_failed_snapshot(
469                &self.tasks,
470                &scheduled_task_id,
471                format!("Failed to schedule task: {err}"),
472                "spawn_task scheduling",
473            );
474            self.notify_snapshot(failure_snapshot);
475        }
476    }
477
478    /// Starts execution of a pending task.
479    ///
480    /// This is called internally to transition a task from Pending to Running.
481    pub fn start_task(&self, task_id: &TaskId) -> McpResult<()> {
482        let snapshot = {
483            let mut tasks = self.tasks.write().unwrap_or_else(|poisoned| {
484                warn!(target: targets::SERVER, "tasks lock poisoned in start_task, recovering");
485                poisoned.into_inner()
486            });
487            let state = tasks
488                .get_mut(task_id)
489                .ok_or_else(|| McpError::invalid_params(format!("Task not found: {task_id}")))?;
490
491            if state.info.status != TaskStatus::Pending {
492                return Err(McpError::invalid_params(format!(
493                    "Task {task_id} is not pending"
494                )));
495            }
496
497            if !transition_state(state, TaskStatus::Running) {
498                return Err(McpError::invalid_params(format!(
499                    "Task {task_id} cannot transition to running"
500                )));
501            }
502            Some(TaskStatusSnapshot::from(state))
503        };
504
505        self.notify_snapshot(snapshot);
506        Ok(())
507    }
508
509    /// Updates progress for a running task.
510    pub fn update_progress(&self, task_id: &TaskId, progress: f64, message: Option<String>) {
511        let snapshot = {
512            let mut tasks = self.tasks.write().unwrap_or_else(|poisoned| {
513                warn!(target: targets::SERVER, "tasks lock poisoned in update_progress, recovering");
514                poisoned.into_inner()
515            });
516            if let Some(state) = tasks.get_mut(task_id) {
517                if state.info.status != TaskStatus::Running {
518                    debug!(
519                        target: targets::SERVER,
520                        "task {} progress update ignored in state {:?}",
521                        task_id,
522                        state.info.status
523                    );
524                    return;
525                }
526                state.info.progress = Some(progress.clamp(0.0, 1.0));
527                state.info.message = message;
528                Some(TaskStatusSnapshot::from(state))
529            } else {
530                None
531            }
532        };
533
534        self.notify_snapshot(snapshot);
535    }
536
537    /// Completes a task with a successful result.
538    pub fn complete_task(&self, task_id: &TaskId, data: serde_json::Value) {
539        let snapshot = {
540            let mut tasks = self.tasks.write().unwrap_or_else(|poisoned| {
541                warn!(target: targets::SERVER, "tasks lock poisoned in complete_task, recovering");
542                poisoned.into_inner()
543            });
544            if let Some(state) = tasks.get_mut(task_id) {
545                if !transition_state(state, TaskStatus::Completed) {
546                    return;
547                }
548                state.info.progress = Some(1.0);
549                state.result = Some(TaskResult {
550                    id: task_id.clone(),
551                    success: true,
552                    data: Some(data),
553                    error: None,
554                });
555                Some(TaskStatusSnapshot::from(state))
556            } else {
557                None
558            }
559        };
560
561        self.notify_snapshot(snapshot);
562    }
563
564    /// Fails a task with an error.
565    pub fn fail_task(&self, task_id: &TaskId, error: impl Into<String>) {
566        let error = error.into();
567        let snapshot = {
568            let mut tasks = self.tasks.write().unwrap_or_else(|poisoned| {
569                warn!(target: targets::SERVER, "tasks lock poisoned in fail_task, recovering");
570                poisoned.into_inner()
571            });
572            if let Some(state) = tasks.get_mut(task_id) {
573                if !transition_state(state, TaskStatus::Failed) {
574                    return;
575                }
576                state.info.error = Some(error.clone());
577                state.result = Some(TaskResult {
578                    id: task_id.clone(),
579                    success: false,
580                    data: None,
581                    error: Some(error),
582                });
583                Some(TaskStatusSnapshot::from(state))
584            } else {
585                None
586            }
587        };
588
589        self.notify_snapshot(snapshot);
590    }
591
592    /// Gets information about a task.
593    #[must_use]
594    pub fn get_info(&self, task_id: &TaskId) -> Option<TaskInfo> {
595        let tasks = self.tasks.read().unwrap_or_else(|poisoned| {
596            warn!(target: targets::SERVER, "tasks lock poisoned in get_info, recovering");
597            poisoned.into_inner()
598        });
599        tasks.get(task_id).map(|s| s.info.clone())
600    }
601
602    /// Gets the result of a completed task.
603    #[must_use]
604    pub fn get_result(&self, task_id: &TaskId) -> Option<TaskResult> {
605        let tasks = self.tasks.read().unwrap_or_else(|poisoned| {
606            warn!(target: targets::SERVER, "tasks lock poisoned in get_result, recovering");
607            poisoned.into_inner()
608        });
609        tasks.get(task_id).and_then(|s| s.result.clone())
610    }
611
612    /// Lists all tasks, optionally filtered by status.
613    #[must_use]
614    pub fn list_tasks(&self, status_filter: Option<TaskStatus>) -> Vec<TaskInfo> {
615        let tasks = self.tasks.read().unwrap_or_else(|poisoned| {
616            warn!(target: targets::SERVER, "tasks lock poisoned in list_tasks, recovering");
617            poisoned.into_inner()
618        });
619        tasks
620            .values()
621            .filter(|s| status_filter.is_none_or(|f| s.info.status == f))
622            .map(|s| s.info.clone())
623            .collect()
624    }
625
626    /// Requests cancellation of a task.
627    ///
628    /// Returns true if the task exists and cancellation was requested.
629    /// The task may still be running until it checks for cancellation.
630    pub fn cancel(&self, task_id: &TaskId, reason: Option<String>) -> McpResult<TaskInfo> {
631        let snapshot = {
632            let mut tasks = self.tasks.write().unwrap_or_else(|poisoned| {
633                warn!(target: targets::SERVER, "tasks lock poisoned in cancel, recovering");
634                poisoned.into_inner()
635            });
636            let state = tasks
637                .get_mut(task_id)
638                .ok_or_else(|| McpError::invalid_params(format!("Task not found: {task_id}")))?;
639
640            // Can only cancel pending or running tasks
641            if state.info.status.is_terminal() {
642                return Err(McpError::invalid_params(format!(
643                    "Task {task_id} is already in terminal state: {:?}",
644                    state.info.status
645                )));
646            }
647
648            if !transition_state(state, TaskStatus::Cancelled) {
649                return Err(McpError::invalid_params(format!(
650                    "Task {task_id} cannot be cancelled from {:?}",
651                    state.info.status
652                )));
653            }
654
655            state.cancel_requested = true;
656
657            state.cx.cancel_with(CancelKind::User, None);
658            if !state.cx.is_cancel_requested() {
659                warn!(
660                    target: targets::SERVER,
661                    "task {} cancel signal not observed on context",
662                    task_id
663                );
664            }
665
666            let error_msg = reason.unwrap_or_else(|| "Cancelled by request".to_string());
667            state.info.error = Some(error_msg.clone());
668            state.result = Some(TaskResult {
669                id: task_id.clone(),
670                success: false,
671                data: None,
672                error: Some(error_msg),
673            });
674
675            let snapshot = TaskStatusSnapshot::from(state);
676            (snapshot, state.info.clone())
677        };
678
679        let (snapshot, info) = snapshot;
680        self.notify_snapshot(Some(snapshot));
681        Ok(info)
682    }
683
684    /// Checks if cancellation has been requested for a task.
685    #[must_use]
686    pub fn is_cancel_requested(&self, task_id: &TaskId) -> bool {
687        let tasks = self.tasks.read().unwrap_or_else(|poisoned| {
688            warn!(target: targets::SERVER, "tasks lock poisoned in is_cancel_requested, recovering");
689            poisoned.into_inner()
690        });
691        tasks.get(task_id).is_some_and(|s| s.cancel_requested)
692    }
693
694    /// Returns the number of active (non-terminal) tasks.
695    #[must_use]
696    pub fn active_count(&self) -> usize {
697        let tasks = self.tasks.read().unwrap_or_else(|poisoned| {
698            warn!(target: targets::SERVER, "tasks lock poisoned in active_count, recovering");
699            poisoned.into_inner()
700        });
701        tasks.values().filter(|s| s.info.status.is_active()).count()
702    }
703
704    /// Returns the total number of tasks.
705    #[must_use]
706    pub fn total_count(&self) -> usize {
707        let tasks = self.tasks.read().unwrap_or_else(|poisoned| {
708            warn!(target: targets::SERVER, "tasks lock poisoned in total_count, recovering");
709            poisoned.into_inner()
710        });
711        tasks.len()
712    }
713
714    /// Removes completed tasks older than the specified duration.
715    ///
716    /// This is useful for preventing unbounded memory growth from completed tasks.
717    pub fn cleanup_completed(&self, max_age: std::time::Duration) {
718        let cutoff = chrono::Utc::now() - chrono::Duration::from_std(max_age).unwrap_or_default();
719
720        let mut tasks = self.tasks.write().unwrap_or_else(|poisoned| {
721            warn!(target: targets::SERVER, "tasks lock poisoned in cleanup_completed, recovering");
722            poisoned.into_inner()
723        });
724        tasks.retain(|_, state| {
725            // Keep active tasks
726            if state.info.status.is_active() {
727                return true;
728            }
729
730            // Keep recent completed tasks
731            if let Some(ref completed) = state.info.completed_at {
732                if let Ok(parsed) = chrono::DateTime::parse_from_rfc3339(completed) {
733                    return parsed.with_timezone(&chrono::Utc) > cutoff;
734                }
735                return true;
736            }
737
738            true
739        });
740    }
741
742    fn notify_snapshot(&self, snapshot: Option<TaskStatusSnapshot>) {
743        if let Some(snapshot) = snapshot {
744            self.notify_status(snapshot.info, snapshot.result);
745        }
746    }
747
748    fn notify_status(&self, info: TaskInfo, result: Option<TaskResult>) {
749        let sender = {
750            let guard = self.notification_sender.read().unwrap_or_else(|poisoned| {
751                warn!(target: targets::SERVER, "notification sender lock poisoned in notify_status, recovering");
752                poisoned.into_inner()
753            });
754            guard.clone()
755        };
756        let Some(sender) = sender else {
757            return;
758        };
759
760        let params = TaskStatusNotificationParams {
761            id: info.id.clone(),
762            status: info.status,
763            progress: info.progress,
764            message: info.message.clone(),
765            error: info.error.clone(),
766            result,
767        };
768        let payload = match serde_json::to_value(params) {
769            Ok(value) => value,
770            Err(err) => {
771                warn!(
772                    target: targets::SERVER,
773                    "failed to serialize task status notification: {}",
774                    err
775                );
776                return;
777            }
778        };
779        sender(JsonRpcRequest::notification(
780            "notifications/tasks/status",
781            Some(payload),
782        ));
783    }
784}
785
786#[derive(Debug, Clone)]
787struct TaskStatusSnapshot {
788    info: TaskInfo,
789    result: Option<TaskResult>,
790}
791
792impl TaskStatusSnapshot {
793    fn from(state: &TaskState) -> Self {
794        Self {
795            info: state.info.clone(),
796            result: state.result.clone(),
797        }
798    }
799}
800
801fn notify_snapshot(
802    sender: &Arc<RwLock<Option<TaskNotificationSender>>>,
803    snapshot: Option<TaskStatusSnapshot>,
804) {
805    let Some(snapshot) = snapshot else {
806        return;
807    };
808    let sender = {
809        let guard = sender.read().unwrap_or_else(|poisoned| {
810            warn!(target: targets::SERVER, "notification sender lock poisoned in notify_snapshot, recovering");
811            poisoned.into_inner()
812        });
813        guard.clone()
814    };
815    let Some(sender) = sender else {
816        return;
817    };
818    let params = TaskStatusNotificationParams {
819        id: snapshot.info.id.clone(),
820        status: snapshot.info.status,
821        progress: snapshot.info.progress,
822        message: snapshot.info.message.clone(),
823        error: snapshot.info.error.clone(),
824        result: snapshot.result,
825    };
826    let payload = match serde_json::to_value(params) {
827        Ok(value) => value,
828        Err(err) => {
829            warn!(
830                target: targets::SERVER,
831                "failed to serialize task status notification: {}",
832                err
833            );
834            return;
835        }
836    };
837    sender(JsonRpcRequest::notification(
838        "notifications/tasks/status",
839        Some(payload),
840    ));
841}
842
843impl Default for TaskManager {
844    fn default() -> Self {
845        Self::new()
846    }
847}
848
849impl std::fmt::Debug for TaskManager {
850    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
851        // Use poison recovery to avoid panic during Debug formatting
852        let task_count = self
853            .tasks
854            .read()
855            .map(|g| g.len())
856            .unwrap_or_else(|poisoned| poisoned.into_inner().len());
857        let handler_count = self
858            .handlers
859            .read()
860            .map(|g| g.len())
861            .unwrap_or_else(|poisoned| poisoned.into_inner().len());
862        f.debug_struct("TaskManager")
863            .field("task_count", &task_count)
864            .field("handler_count", &handler_count)
865            .field("task_counter", &self.task_counter.load(Ordering::SeqCst))
866            .field(
867                "list_changed_notifications",
868                &self.list_changed_notifications,
869            )
870            .field("auto_execute", &self.auto_execute)
871            .finish_non_exhaustive()
872    }
873}
874
875/// Thread-safe handle to a TaskManager.
876pub type SharedTaskManager = Arc<TaskManager>;
877
878#[cfg(test)]
879mod tests {
880    use super::*;
881    use std::sync::Arc;
882    use std::thread;
883
884    #[test]
885    fn test_task_manager_creation() {
886        let manager = TaskManager::new();
887        assert_eq!(manager.total_count(), 0);
888        assert_eq!(manager.active_count(), 0);
889        assert!(!manager.has_list_changed_notifications());
890    }
891
892    #[test]
893    fn test_task_manager_with_notifications() {
894        let manager = TaskManager::with_list_changed_notifications();
895        assert!(manager.has_list_changed_notifications());
896    }
897
898    #[test]
899    fn test_register_handler() {
900        let manager = TaskManager::new();
901
902        manager.register_handler("test_task", |_cx, _params| async {
903            Ok(serde_json::json!({}))
904        });
905
906        // Submit should succeed now
907        let cx = Cx::for_testing();
908        let result = manager.submit(&cx, "test_task", None);
909        assert!(result.is_ok());
910    }
911
912    #[test]
913    fn test_submit_auto_execute_fails_when_runtime_unavailable() {
914        let mut manager = TaskManager::new_for_testing();
915        manager.auto_execute = true;
916        manager.runtime = None;
917
918        manager.register_handler("test_task", |_cx, _params| async {
919            Ok(serde_json::json!({}))
920        });
921
922        let cx = Cx::for_testing();
923        let task_id = manager.submit(&cx, "test_task", None).unwrap();
924
925        let info = manager.get_info(&task_id).unwrap();
926        assert_eq!(info.status, TaskStatus::Failed);
927        assert_eq!(info.error.as_deref(), Some("Task runtime unavailable"));
928
929        let result = manager.get_result(&task_id).unwrap();
930        assert!(!result.success);
931        assert_eq!(result.error.as_deref(), Some("Task runtime unavailable"));
932    }
933
934    #[test]
935    fn test_submit_unknown_task_type() {
936        let manager = TaskManager::new();
937        let cx = Cx::for_testing();
938
939        let result = manager.submit(&cx, "unknown_task", None);
940        assert!(result.is_err());
941    }
942
943    #[test]
944    fn test_task_lifecycle() {
945        let manager = TaskManager::new_for_testing();
946        let cx = Cx::for_testing();
947
948        manager.register_handler("test", |_cx, _params| async {
949            Ok(serde_json::json!({"done": true}))
950        });
951
952        // Submit
953        let task_id = manager.submit(&cx, "test", None).unwrap();
954
955        // Check initial state
956        let info = manager.get_info(&task_id).unwrap();
957        assert_eq!(info.status, TaskStatus::Pending);
958        assert!(info.started_at.is_none());
959
960        // Start
961        manager.start_task(&task_id).unwrap();
962        let info = manager.get_info(&task_id).unwrap();
963        assert_eq!(info.status, TaskStatus::Running);
964        assert!(info.started_at.is_some());
965
966        // Update progress
967        manager.update_progress(&task_id, 0.5, Some("Halfway done".into()));
968        let info = manager.get_info(&task_id).unwrap();
969        assert_eq!(info.progress, Some(0.5));
970        assert_eq!(info.message, Some("Halfway done".into()));
971
972        // Complete
973        manager.complete_task(&task_id, serde_json::json!({"result": 42}));
974        let info = manager.get_info(&task_id).unwrap();
975        assert_eq!(info.status, TaskStatus::Completed);
976        assert!(info.completed_at.is_some());
977
978        // Check result
979        let result = manager.get_result(&task_id).unwrap();
980        assert!(result.success);
981        assert_eq!(result.data, Some(serde_json::json!({"result": 42})));
982    }
983
984    #[test]
985    fn test_task_failure() {
986        let manager = TaskManager::new_for_testing();
987        let cx = Cx::for_testing();
988
989        manager.register_handler("fail_test", |_cx, _params| async {
990            Ok(serde_json::json!({}))
991        });
992
993        let task_id = manager.submit(&cx, "fail_test", None).unwrap();
994        manager.start_task(&task_id).unwrap();
995        manager.fail_task(&task_id, "Something went wrong");
996
997        let info = manager.get_info(&task_id).unwrap();
998        assert_eq!(info.status, TaskStatus::Failed);
999        assert_eq!(info.error, Some("Something went wrong".into()));
1000
1001        let result = manager.get_result(&task_id).unwrap();
1002        assert!(!result.success);
1003        assert_eq!(result.error, Some("Something went wrong".into()));
1004    }
1005
1006    #[test]
1007    fn test_task_cancellation() {
1008        let manager = TaskManager::new_for_testing();
1009        let cx = Cx::for_testing();
1010
1011        manager.register_handler("cancel_test", |_cx, _params| async {
1012            Ok(serde_json::json!({}))
1013        });
1014
1015        let task_id = manager.submit(&cx, "cancel_test", None).unwrap();
1016        manager.start_task(&task_id).unwrap();
1017
1018        // Cancel
1019        let info = manager
1020            .cancel(&task_id, Some("User cancelled".into()))
1021            .unwrap();
1022        assert_eq!(info.status, TaskStatus::Cancelled);
1023
1024        // Check cancel flag
1025        assert!(manager.is_cancel_requested(&task_id));
1026
1027        // Cannot cancel again
1028        let result = manager.cancel(&task_id, None);
1029        assert!(result.is_err());
1030    }
1031
1032    #[test]
1033    fn test_list_tasks() {
1034        let manager = TaskManager::new_for_testing();
1035        let cx = Cx::for_testing();
1036
1037        manager.register_handler("list_test", |_cx, _params| async {
1038            Ok(serde_json::json!({}))
1039        });
1040
1041        let task1 = manager.submit(&cx, "list_test", None).unwrap();
1042        let task2 = manager.submit(&cx, "list_test", None).unwrap();
1043        let _task3 = manager.submit(&cx, "list_test", None).unwrap();
1044
1045        // All pending initially
1046        assert_eq!(manager.list_tasks(Some(TaskStatus::Pending)).len(), 3);
1047        assert_eq!(manager.list_tasks(Some(TaskStatus::Running)).len(), 0);
1048
1049        // Start one
1050        manager.start_task(&task1).unwrap();
1051        assert_eq!(manager.list_tasks(Some(TaskStatus::Pending)).len(), 2);
1052        assert_eq!(manager.list_tasks(Some(TaskStatus::Running)).len(), 1);
1053
1054        // Complete one
1055        manager.start_task(&task2).unwrap();
1056        manager.complete_task(&task2, serde_json::json!({}));
1057        assert_eq!(manager.list_tasks(Some(TaskStatus::Completed)).len(), 1);
1058
1059        // All tasks
1060        assert_eq!(manager.list_tasks(None).len(), 3);
1061    }
1062
1063    #[test]
1064    fn test_active_count() {
1065        let manager = TaskManager::new_for_testing();
1066        let cx = Cx::for_testing();
1067
1068        manager.register_handler("count_test", |_cx, _params| async {
1069            Ok(serde_json::json!({}))
1070        });
1071
1072        let task1 = manager.submit(&cx, "count_test", None).unwrap();
1073        let task2 = manager.submit(&cx, "count_test", None).unwrap();
1074
1075        assert_eq!(manager.active_count(), 2);
1076        assert_eq!(manager.total_count(), 2);
1077
1078        manager.start_task(&task1).unwrap();
1079        assert_eq!(manager.active_count(), 2);
1080
1081        manager.complete_task(&task1, serde_json::json!({}));
1082        assert_eq!(manager.active_count(), 1);
1083
1084        manager.cancel(&task2, None).unwrap();
1085        assert_eq!(manager.active_count(), 0);
1086        assert_eq!(manager.total_count(), 2);
1087    }
1088
1089    #[test]
1090    fn test_progress_clamping() {
1091        let manager = TaskManager::new_for_testing();
1092        let cx = Cx::for_testing();
1093
1094        manager.register_handler("clamp_test", |_cx, _params| async {
1095            Ok(serde_json::json!({}))
1096        });
1097
1098        let task_id = manager.submit(&cx, "clamp_test", None).unwrap();
1099        manager.start_task(&task_id).unwrap();
1100
1101        // Progress should be clamped to [0.0, 1.0]
1102        manager.update_progress(&task_id, -0.5, None);
1103        assert_eq!(manager.get_info(&task_id).unwrap().progress, Some(0.0));
1104
1105        manager.update_progress(&task_id, 1.5, None);
1106        assert_eq!(manager.get_info(&task_id).unwrap().progress, Some(1.0));
1107
1108        manager.update_progress(&task_id, 0.75, None);
1109        assert_eq!(manager.get_info(&task_id).unwrap().progress, Some(0.75));
1110    }
1111
1112    #[test]
1113    fn test_invalid_transition_rejected() {
1114        let manager = TaskManager::new_for_testing();
1115        let cx = Cx::for_testing();
1116
1117        manager.register_handler("transition_test", |_cx, _params| async {
1118            Ok(serde_json::json!({}))
1119        });
1120
1121        let task_id = manager.submit(&cx, "transition_test", None).unwrap();
1122
1123        // Completing before running should be ignored.
1124        manager.complete_task(&task_id, serde_json::json!({"result": "noop"}));
1125        let info = manager.get_info(&task_id).unwrap();
1126        assert_eq!(info.status, TaskStatus::Pending);
1127
1128        manager.start_task(&task_id).unwrap();
1129        manager.complete_task(&task_id, serde_json::json!({"result": "ok"}));
1130        let info = manager.get_info(&task_id).unwrap();
1131        assert_eq!(info.status, TaskStatus::Completed);
1132
1133        // Starting after completion should fail.
1134        let result = manager.start_task(&task_id);
1135        assert!(result.is_err());
1136    }
1137
1138    #[test]
1139    fn test_concurrent_submissions() {
1140        let manager = Arc::new(TaskManager::new_for_testing());
1141        manager.register_handler("concurrent_test", |_cx, _params| async {
1142            Ok(serde_json::json!({}))
1143        });
1144
1145        let mut handles = Vec::new();
1146        for _ in 0..4 {
1147            let manager = Arc::clone(&manager);
1148            handles.push(thread::spawn(move || {
1149                let cx = Cx::for_testing();
1150                for _ in 0..10 {
1151                    let _ = manager.submit(&cx, "concurrent_test", None).unwrap();
1152                }
1153            }));
1154        }
1155
1156        for handle in handles {
1157            handle.join().expect("thread join failed");
1158        }
1159
1160        assert_eq!(manager.total_count(), 40);
1161        assert_eq!(manager.list_tasks(Some(TaskStatus::Pending)).len(), 40);
1162    }
1163
1164    #[test]
1165    fn test_task_status_notifications() {
1166        let manager = TaskManager::new_for_testing();
1167        manager.register_handler("notify_test", |_cx, _params| async {
1168            Ok(serde_json::json!({"ok": true}))
1169        });
1170
1171        let events: Arc<std::sync::Mutex<Vec<TaskStatusNotificationParams>>> =
1172            Arc::new(std::sync::Mutex::new(Vec::new()));
1173        let sender_events = Arc::clone(&events);
1174        let sender: TaskNotificationSender = Arc::new(move |request| {
1175            if request.method != "notifications/tasks/status" {
1176                return;
1177            }
1178            let params = request
1179                .params
1180                .as_ref()
1181                .and_then(|value| serde_json::from_value(value.clone()).ok())
1182                .expect("task status params");
1183            sender_events
1184                .lock()
1185                .expect("events lock poisoned")
1186                .push(params);
1187        });
1188        manager.set_notification_sender(sender);
1189
1190        let cx = Cx::for_testing();
1191        let task_id = manager.submit(&cx, "notify_test", None).unwrap();
1192        manager.start_task(&task_id).unwrap();
1193        manager.update_progress(&task_id, 0.5, Some("half".to_string()));
1194        manager.complete_task(&task_id, serde_json::json!({"result": 1}));
1195
1196        let recorded = events.lock().expect("events lock poisoned").clone();
1197        assert!(!recorded.is_empty(), "expected task status notifications");
1198        assert_eq!(recorded[0].id, task_id);
1199        assert_eq!(recorded[0].status, TaskStatus::Pending);
1200        assert_eq!(recorded[1].status, TaskStatus::Running);
1201        assert_eq!(recorded[2].progress, Some(0.5));
1202        assert_eq!(recorded.last().expect("last").status, TaskStatus::Completed);
1203    }
1204
1205    // ── can_transition ─────────────────────────────────────────────────
1206
1207    #[test]
1208    fn can_transition_valid_pairs() {
1209        assert!(can_transition(TaskStatus::Pending, TaskStatus::Running));
1210        assert!(can_transition(TaskStatus::Pending, TaskStatus::Cancelled));
1211        assert!(can_transition(TaskStatus::Running, TaskStatus::Completed));
1212        assert!(can_transition(TaskStatus::Running, TaskStatus::Failed));
1213        assert!(can_transition(TaskStatus::Running, TaskStatus::Cancelled));
1214    }
1215
1216    #[test]
1217    fn can_transition_invalid_pairs() {
1218        assert!(!can_transition(TaskStatus::Pending, TaskStatus::Completed));
1219        assert!(!can_transition(TaskStatus::Pending, TaskStatus::Failed));
1220        assert!(!can_transition(TaskStatus::Completed, TaskStatus::Running));
1221        assert!(!can_transition(TaskStatus::Completed, TaskStatus::Pending));
1222        assert!(!can_transition(
1223            TaskStatus::Completed,
1224            TaskStatus::Cancelled
1225        ));
1226        assert!(!can_transition(TaskStatus::Failed, TaskStatus::Running));
1227        assert!(!can_transition(TaskStatus::Cancelled, TaskStatus::Running));
1228    }
1229
1230    // ── Default / Debug / into_shared ──────────────────────────────────
1231
1232    #[test]
1233    fn default_creates_empty_manager() {
1234        let manager = TaskManager::default();
1235        assert_eq!(manager.total_count(), 0);
1236        assert!(!manager.has_list_changed_notifications());
1237    }
1238
1239    #[test]
1240    fn new_for_testing_disables_auto_execute() {
1241        let manager = TaskManager::new_for_testing();
1242        assert!(!manager.auto_execute);
1243    }
1244
1245    #[test]
1246    fn into_shared_returns_arc() {
1247        let manager = TaskManager::new_for_testing();
1248        let shared: SharedTaskManager = manager.into_shared();
1249        assert_eq!(shared.total_count(), 0);
1250    }
1251
1252    #[test]
1253    fn debug_output_contains_fields() {
1254        let manager = TaskManager::new_for_testing();
1255        let debug = format!("{:?}", manager);
1256        assert!(debug.contains("TaskManager"));
1257        assert!(debug.contains("task_count"));
1258        assert!(debug.contains("handler_count"));
1259        assert!(debug.contains("task_counter"));
1260        assert!(debug.contains("list_changed_notifications"));
1261        assert!(debug.contains("auto_execute"));
1262    }
1263
1264    // ── get_info / get_result for nonexistent tasks ────────────────────
1265
1266    #[test]
1267    fn get_info_nonexistent_returns_none() {
1268        let manager = TaskManager::new_for_testing();
1269        let fake_id = TaskId::from_string("nonexistent".to_string());
1270        assert!(manager.get_info(&fake_id).is_none());
1271    }
1272
1273    #[test]
1274    fn get_result_nonexistent_returns_none() {
1275        let manager = TaskManager::new_for_testing();
1276        let fake_id = TaskId::from_string("nonexistent".to_string());
1277        assert!(manager.get_result(&fake_id).is_none());
1278    }
1279
1280    #[test]
1281    fn get_result_pending_task_returns_none() {
1282        let manager = TaskManager::new_for_testing();
1283        let cx = Cx::for_testing();
1284        manager.register_handler("t", |_cx, _params| async { Ok(serde_json::json!({})) });
1285        let id = manager.submit(&cx, "t", None).unwrap();
1286        assert!(manager.get_result(&id).is_none());
1287    }
1288
1289    // ── is_cancel_requested edge cases ─────────────────────────────────
1290
1291    #[test]
1292    fn is_cancel_requested_nonexistent_returns_false() {
1293        let manager = TaskManager::new_for_testing();
1294        let fake_id = TaskId::from_string("nonexistent".to_string());
1295        assert!(!manager.is_cancel_requested(&fake_id));
1296    }
1297
1298    #[test]
1299    fn is_cancel_requested_before_cancel_returns_false() {
1300        let manager = TaskManager::new_for_testing();
1301        let cx = Cx::for_testing();
1302        manager.register_handler("t", |_cx, _params| async { Ok(serde_json::json!({})) });
1303        let id = manager.submit(&cx, "t", None).unwrap();
1304        assert!(!manager.is_cancel_requested(&id));
1305    }
1306
1307    // ── update_progress edge cases ─────────────────────────────────────
1308
1309    #[test]
1310    fn update_progress_on_pending_task_is_ignored() {
1311        let manager = TaskManager::new_for_testing();
1312        let cx = Cx::for_testing();
1313        manager.register_handler("t", |_cx, _params| async { Ok(serde_json::json!({})) });
1314        let id = manager.submit(&cx, "t", None).unwrap();
1315        // Task is pending, progress update should be ignored
1316        manager.update_progress(&id, 0.5, Some("test".to_string()));
1317        let info = manager.get_info(&id).unwrap();
1318        assert!(info.progress.is_none());
1319    }
1320
1321    #[test]
1322    fn update_progress_on_completed_task_is_ignored() {
1323        let manager = TaskManager::new_for_testing();
1324        let cx = Cx::for_testing();
1325        manager.register_handler("t", |_cx, _params| async { Ok(serde_json::json!({})) });
1326        let id = manager.submit(&cx, "t", None).unwrap();
1327        manager.start_task(&id).unwrap();
1328        manager.complete_task(&id, serde_json::json!({}));
1329        // Task is completed, progress update should be ignored
1330        manager.update_progress(&id, 0.1, None);
1331        let info = manager.get_info(&id).unwrap();
1332        assert_eq!(info.progress, Some(1.0)); // unchanged from completion
1333    }
1334
1335    // ── complete_task / fail_task on nonexistent ────────────────────────
1336
1337    #[test]
1338    fn complete_task_nonexistent_does_not_panic() {
1339        let manager = TaskManager::new_for_testing();
1340        let fake_id = TaskId::from_string("nonexistent".to_string());
1341        manager.complete_task(&fake_id, serde_json::json!({})); // should not panic
1342    }
1343
1344    #[test]
1345    fn fail_task_nonexistent_does_not_panic() {
1346        let manager = TaskManager::new_for_testing();
1347        let fake_id = TaskId::from_string("nonexistent".to_string());
1348        manager.fail_task(&fake_id, "error"); // should not panic
1349    }
1350
1351    // ── cancel edge cases ──────────────────────────────────────────────
1352
1353    #[test]
1354    fn cancel_nonexistent_task_returns_error() {
1355        let manager = TaskManager::new_for_testing();
1356        let fake_id = TaskId::from_string("nonexistent".to_string());
1357        let err = manager.cancel(&fake_id, None).unwrap_err();
1358        assert!(err.message.contains("not found"));
1359    }
1360
1361    #[test]
1362    fn cancel_pending_task_directly() {
1363        let manager = TaskManager::new_for_testing();
1364        let cx = Cx::for_testing();
1365        manager.register_handler("t", |_cx, _params| async { Ok(serde_json::json!({})) });
1366        let id = manager.submit(&cx, "t", None).unwrap();
1367        // Cancel from Pending (valid: Pending -> Cancelled)
1368        let info = manager.cancel(&id, None).unwrap();
1369        assert_eq!(info.status, TaskStatus::Cancelled);
1370        assert!(manager.is_cancel_requested(&id));
1371    }
1372
1373    #[test]
1374    fn cancel_with_default_reason() {
1375        let manager = TaskManager::new_for_testing();
1376        let cx = Cx::for_testing();
1377        manager.register_handler("t", |_cx, _params| async { Ok(serde_json::json!({})) });
1378        let id = manager.submit(&cx, "t", None).unwrap();
1379        let info = manager.cancel(&id, None).unwrap();
1380        assert_eq!(info.error, Some("Cancelled by request".to_string()));
1381    }
1382
1383    // ── task ID sequencing ─────────────────────────────────────────────
1384
1385    #[test]
1386    fn task_ids_are_sequential() {
1387        let manager = TaskManager::new_for_testing();
1388        let cx = Cx::for_testing();
1389        manager.register_handler("t", |_cx, _params| async { Ok(serde_json::json!({})) });
1390        let id1 = manager.submit(&cx, "t", None).unwrap();
1391        let id2 = manager.submit(&cx, "t", None).unwrap();
1392        assert_ne!(id1, id2);
1393        assert!(id1.0.starts_with("task-"));
1394        assert!(id2.0.starts_with("task-"));
1395    }
1396
1397    // ── start_task edge cases ──────────────────────────────────────────
1398
1399    #[test]
1400    fn start_task_nonexistent_returns_error() {
1401        let manager = TaskManager::new_for_testing();
1402        let fake_id = TaskId::from_string("nonexistent".to_string());
1403        let err = manager.start_task(&fake_id).unwrap_err();
1404        assert!(err.message.contains("not found"));
1405    }
1406
1407    #[test]
1408    fn start_task_already_running_returns_error() {
1409        let manager = TaskManager::new_for_testing();
1410        let cx = Cx::for_testing();
1411        manager.register_handler("t", |_cx, _params| async { Ok(serde_json::json!({})) });
1412        let id = manager.submit(&cx, "t", None).unwrap();
1413        manager.start_task(&id).unwrap();
1414        let err = manager.start_task(&id).unwrap_err();
1415        assert!(err.message.contains("not pending"));
1416    }
1417
1418    // ── cleanup_completed ──────────────────────────────────────────────
1419
1420    #[test]
1421    fn cleanup_completed_removes_old_terminal_tasks() {
1422        let manager = TaskManager::new_for_testing();
1423        let cx = Cx::for_testing();
1424        manager.register_handler("t", |_cx, _params| async { Ok(serde_json::json!({})) });
1425
1426        let id = manager.submit(&cx, "t", None).unwrap();
1427        manager.start_task(&id).unwrap();
1428        manager.complete_task(&id, serde_json::json!({}));
1429        assert_eq!(manager.total_count(), 1);
1430
1431        // Cleanup with 0 duration removes all completed tasks
1432        manager.cleanup_completed(std::time::Duration::from_secs(0));
1433        assert_eq!(manager.total_count(), 0);
1434    }
1435
1436    #[test]
1437    fn cleanup_completed_keeps_active_tasks() {
1438        let manager = TaskManager::new_for_testing();
1439        let cx = Cx::for_testing();
1440        manager.register_handler("t", |_cx, _params| async { Ok(serde_json::json!({})) });
1441
1442        let id1 = manager.submit(&cx, "t", None).unwrap();
1443        let id2 = manager.submit(&cx, "t", None).unwrap();
1444        manager.start_task(&id1).unwrap();
1445        manager.complete_task(&id1, serde_json::json!({}));
1446        // id2 is still pending (active)
1447
1448        manager.cleanup_completed(std::time::Duration::from_secs(0));
1449        assert_eq!(manager.total_count(), 1); // only id2 remains
1450        assert!(manager.get_info(&id2).is_some());
1451    }
1452
1453    #[test]
1454    fn cleanup_completed_keeps_recent_tasks() {
1455        let manager = TaskManager::new_for_testing();
1456        let cx = Cx::for_testing();
1457        manager.register_handler("t", |_cx, _params| async { Ok(serde_json::json!({})) });
1458
1459        let id = manager.submit(&cx, "t", None).unwrap();
1460        manager.start_task(&id).unwrap();
1461        manager.complete_task(&id, serde_json::json!({}));
1462
1463        // Cleanup with large duration keeps recently completed
1464        manager.cleanup_completed(std::time::Duration::from_secs(3600));
1465        assert_eq!(manager.total_count(), 1);
1466    }
1467
1468    // ── identity transition ────────────────────────────────────────────
1469
1470    #[test]
1471    fn transition_same_state_returns_true() {
1472        // Create a minimal TaskState to test transition_state
1473        let task_id = TaskId::from_string("test".to_string());
1474        let mut state = TaskState {
1475            info: TaskInfo {
1476                id: task_id,
1477                task_type: "t".to_string(),
1478                status: TaskStatus::Running,
1479                progress: None,
1480                message: None,
1481                created_at: String::new(),
1482                started_at: None,
1483                completed_at: None,
1484                error: None,
1485            },
1486            cancel_requested: false,
1487            result: None,
1488            cx: Cx::for_testing(),
1489        };
1490        // Same state transition returns true
1491        assert!(transition_state(&mut state, TaskStatus::Running));
1492    }
1493
1494    // ── submit with params ─────────────────────────────────────────────
1495
1496    #[test]
1497    fn submit_with_none_params_creates_task() {
1498        let manager = TaskManager::new_for_testing();
1499        let cx = Cx::for_testing();
1500        manager.register_handler("t", |_cx, _params| async { Ok(serde_json::json!({})) });
1501        let id = manager.submit(&cx, "t", None).unwrap();
1502        let info = manager.get_info(&id).unwrap();
1503        assert_eq!(info.task_type, "t");
1504        assert_eq!(info.status, TaskStatus::Pending);
1505        assert!(info.started_at.is_none());
1506        assert!(info.completed_at.is_none());
1507        assert!(info.error.is_none());
1508    }
1509
1510    #[test]
1511    fn submit_with_some_params_creates_task() {
1512        let manager = TaskManager::new_for_testing();
1513        let cx = Cx::for_testing();
1514        manager.register_handler("t", |_cx, _params| async { Ok(serde_json::json!({})) });
1515        let id = manager
1516            .submit(&cx, "t", Some(serde_json::json!({"key": "value"})))
1517            .unwrap();
1518        assert!(manager.get_info(&id).is_some());
1519    }
1520
1521    // ── fail_task sets result ──────────────────────────────────────────
1522
1523    #[test]
1524    fn fail_task_sets_error_result() {
1525        let manager = TaskManager::new_for_testing();
1526        let cx = Cx::for_testing();
1527        manager.register_handler("t", |_cx, _params| async { Ok(serde_json::json!({})) });
1528        let id = manager.submit(&cx, "t", None).unwrap();
1529        manager.start_task(&id).unwrap();
1530        manager.fail_task(&id, "boom");
1531        let result = manager.get_result(&id).unwrap();
1532        assert!(!result.success);
1533        assert_eq!(result.error, Some("boom".to_string()));
1534        assert!(result.data.is_none());
1535    }
1536
1537    // ── update_progress on nonexistent task ──────────────────────────────
1538
1539    #[test]
1540    fn update_progress_nonexistent_does_not_panic() {
1541        let manager = TaskManager::new_for_testing();
1542        let fake_id = TaskId::from_string("nonexistent".to_string());
1543        manager.update_progress(&fake_id, 0.5, None); // should not panic
1544    }
1545
1546    // ── fail_task on already-terminal task ───────────────────────────────
1547
1548    #[test]
1549    fn fail_task_on_completed_is_ignored() {
1550        let manager = TaskManager::new_for_testing();
1551        let cx = Cx::for_testing();
1552        manager.register_handler("t", |_cx, _params| async { Ok(serde_json::json!({})) });
1553        let id = manager.submit(&cx, "t", None).unwrap();
1554        manager.start_task(&id).unwrap();
1555        manager.complete_task(&id, serde_json::json!({"done": true}));
1556        // Attempt to fail a completed task - should be ignored
1557        manager.fail_task(&id, "too late");
1558        let info = manager.get_info(&id).unwrap();
1559        assert_eq!(info.status, TaskStatus::Completed);
1560        let result = manager.get_result(&id).unwrap();
1561        assert!(result.success);
1562    }
1563
1564    // ── complete_task on already-terminal task ───────────────────────────
1565
1566    #[test]
1567    fn complete_task_on_failed_is_ignored() {
1568        let manager = TaskManager::new_for_testing();
1569        let cx = Cx::for_testing();
1570        manager.register_handler("t", |_cx, _params| async { Ok(serde_json::json!({})) });
1571        let id = manager.submit(&cx, "t", None).unwrap();
1572        manager.start_task(&id).unwrap();
1573        manager.fail_task(&id, "something broke");
1574        // Attempt to complete a failed task - should be ignored
1575        manager.complete_task(&id, serde_json::json!({"late": true}));
1576        let info = manager.get_info(&id).unwrap();
1577        assert_eq!(info.status, TaskStatus::Failed);
1578        let result = manager.get_result(&id).unwrap();
1579        assert!(!result.success);
1580    }
1581
1582    // ── register_handler replaces existing handler ──────────────────────
1583
1584    #[test]
1585    fn register_handler_replaces_existing() {
1586        let manager = TaskManager::new_for_testing();
1587        manager.register_handler("t", |_cx, _params| async {
1588            Ok(serde_json::json!({"v": 1}))
1589        });
1590        manager.register_handler("t", |_cx, _params| async {
1591            Ok(serde_json::json!({"v": 2}))
1592        });
1593        // Should succeed with the new handler
1594        let cx = Cx::for_testing();
1595        let id = manager.submit(&cx, "t", None).unwrap();
1596        assert!(manager.get_info(&id).is_some());
1597    }
1598
1599    // ── transition_state timestamps ─────────────────────────────────────
1600
1601    #[test]
1602    fn transition_to_running_sets_started_at() {
1603        let task_id = TaskId::from_string("ts-test".to_string());
1604        let mut state = TaskState {
1605            info: TaskInfo {
1606                id: task_id,
1607                task_type: "t".to_string(),
1608                status: TaskStatus::Pending,
1609                progress: None,
1610                message: None,
1611                created_at: String::new(),
1612                started_at: None,
1613                completed_at: None,
1614                error: None,
1615            },
1616            cancel_requested: false,
1617            result: None,
1618            cx: Cx::for_testing(),
1619        };
1620        assert!(state.info.started_at.is_none());
1621        assert!(transition_state(&mut state, TaskStatus::Running));
1622        assert!(state.info.started_at.is_some());
1623    }
1624
1625    #[test]
1626    fn transition_to_completed_sets_completed_at() {
1627        let task_id = TaskId::from_string("ts-test".to_string());
1628        let mut state = TaskState {
1629            info: TaskInfo {
1630                id: task_id,
1631                task_type: "t".to_string(),
1632                status: TaskStatus::Running,
1633                progress: None,
1634                message: None,
1635                created_at: String::new(),
1636                started_at: Some("earlier".to_string()),
1637                completed_at: None,
1638                error: None,
1639            },
1640            cancel_requested: false,
1641            result: None,
1642            cx: Cx::for_testing(),
1643        };
1644        assert!(state.info.completed_at.is_none());
1645        assert!(transition_state(&mut state, TaskStatus::Completed));
1646        assert!(state.info.completed_at.is_some());
1647    }
1648
1649    #[test]
1650    fn transition_to_failed_sets_completed_at() {
1651        let task_id = TaskId::from_string("ts-test".to_string());
1652        let mut state = TaskState {
1653            info: TaskInfo {
1654                id: task_id,
1655                task_type: "t".to_string(),
1656                status: TaskStatus::Running,
1657                progress: None,
1658                message: None,
1659                created_at: String::new(),
1660                started_at: Some("earlier".to_string()),
1661                completed_at: None,
1662                error: None,
1663            },
1664            cancel_requested: false,
1665            result: None,
1666            cx: Cx::for_testing(),
1667        };
1668        assert!(transition_state(&mut state, TaskStatus::Failed));
1669        assert!(state.info.completed_at.is_some());
1670    }
1671
1672    #[test]
1673    fn transition_to_cancelled_sets_completed_at() {
1674        let task_id = TaskId::from_string("ts-test".to_string());
1675        let mut state = TaskState {
1676            info: TaskInfo {
1677                id: task_id,
1678                task_type: "t".to_string(),
1679                status: TaskStatus::Running,
1680                progress: None,
1681                message: None,
1682                created_at: String::new(),
1683                started_at: Some("earlier".to_string()),
1684                completed_at: None,
1685                error: None,
1686            },
1687            cancel_requested: false,
1688            result: None,
1689            cx: Cx::for_testing(),
1690        };
1691        assert!(transition_state(&mut state, TaskStatus::Cancelled));
1692        assert!(state.info.completed_at.is_some());
1693    }
1694
1695    #[test]
1696    fn transition_invalid_returns_false() {
1697        let task_id = TaskId::from_string("ts-test".to_string());
1698        let mut state = TaskState {
1699            info: TaskInfo {
1700                id: task_id,
1701                task_type: "t".to_string(),
1702                status: TaskStatus::Pending,
1703                progress: None,
1704                message: None,
1705                created_at: String::new(),
1706                started_at: None,
1707                completed_at: None,
1708                error: None,
1709            },
1710            cancel_requested: false,
1711            result: None,
1712            cx: Cx::for_testing(),
1713        };
1714        // Pending -> Completed is invalid
1715        assert!(!transition_state(&mut state, TaskStatus::Completed));
1716        // State should remain Pending
1717        assert_eq!(state.info.status, TaskStatus::Pending);
1718    }
1719
1720    // ── TaskStatusSnapshot ──────────────────────────────────────────────
1721
1722    #[test]
1723    fn task_status_snapshot_debug_and_clone() {
1724        let task_id = TaskId::from_string("snap-test".to_string());
1725        let state = TaskState {
1726            info: TaskInfo {
1727                id: task_id,
1728                task_type: "t".to_string(),
1729                status: TaskStatus::Running,
1730                progress: Some(0.5),
1731                message: Some("testing".to_string()),
1732                created_at: "now".to_string(),
1733                started_at: Some("now".to_string()),
1734                completed_at: None,
1735                error: None,
1736            },
1737            cancel_requested: false,
1738            result: None,
1739            cx: Cx::for_testing(),
1740        };
1741        let snapshot = TaskStatusSnapshot::from(&state);
1742        let debug = format!("{:?}", snapshot);
1743        assert!(debug.contains("TaskStatusSnapshot"));
1744        let cloned = snapshot.clone();
1745        assert_eq!(cloned.info.status, TaskStatus::Running);
1746        assert!(cloned.result.is_none());
1747    }
1748
1749    // ── cleanup with failed/cancelled tasks ─────────────────────────────
1750
1751    #[test]
1752    fn cleanup_completed_removes_failed_and_cancelled() {
1753        let manager = TaskManager::new_for_testing();
1754        let cx = Cx::for_testing();
1755        manager.register_handler("t", |_cx, _params| async { Ok(serde_json::json!({})) });
1756
1757        let id1 = manager.submit(&cx, "t", None).unwrap();
1758        let id2 = manager.submit(&cx, "t", None).unwrap();
1759        let id3 = manager.submit(&cx, "t", None).unwrap();
1760
1761        // Complete one
1762        manager.start_task(&id1).unwrap();
1763        manager.complete_task(&id1, serde_json::json!({}));
1764
1765        // Fail one
1766        manager.start_task(&id2).unwrap();
1767        manager.fail_task(&id2, "error");
1768
1769        // Cancel one
1770        manager.cancel(&id3, None).unwrap();
1771
1772        assert_eq!(manager.total_count(), 3);
1773
1774        // Cleanup with 0 duration should remove all terminal tasks
1775        manager.cleanup_completed(std::time::Duration::from_secs(0));
1776        assert_eq!(manager.total_count(), 0);
1777    }
1778
1779    // ── set_notification_sender replaces sender ─────────────────────────
1780
1781    #[test]
1782    fn set_notification_sender_replaces_existing() {
1783        let manager = TaskManager::new_for_testing();
1784        manager.register_handler("t", |_cx, _params| async { Ok(serde_json::json!({})) });
1785
1786        let count1 = Arc::new(std::sync::atomic::AtomicUsize::new(0));
1787        let count2 = Arc::new(std::sync::atomic::AtomicUsize::new(0));
1788
1789        let c1 = Arc::clone(&count1);
1790        let sender1: TaskNotificationSender = Arc::new(move |_| {
1791            c1.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
1792        });
1793        manager.set_notification_sender(sender1);
1794
1795        let cx = Cx::for_testing();
1796        let _id1 = manager.submit(&cx, "t", None).unwrap();
1797        assert!(count1.load(std::sync::atomic::Ordering::SeqCst) > 0);
1798
1799        // Replace sender
1800        let c2 = Arc::clone(&count2);
1801        let sender2: TaskNotificationSender = Arc::new(move |_| {
1802            c2.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
1803        });
1804        manager.set_notification_sender(sender2);
1805
1806        let _id2 = manager.submit(&cx, "t", None).unwrap();
1807        assert!(count2.load(std::sync::atomic::Ordering::SeqCst) > 0);
1808    }
1809
1810    // ── cancel with custom reason ───────────────────────────────────────
1811
1812    #[test]
1813    fn cancel_with_custom_reason() {
1814        let manager = TaskManager::new_for_testing();
1815        let cx = Cx::for_testing();
1816        manager.register_handler("t", |_cx, _params| async { Ok(serde_json::json!({})) });
1817        let id = manager.submit(&cx, "t", None).unwrap();
1818        manager.start_task(&id).unwrap();
1819        let info = manager.cancel(&id, Some("Timeout".to_string())).unwrap();
1820        assert_eq!(info.error, Some("Timeout".to_string()));
1821        let result = manager.get_result(&id).unwrap();
1822        assert_eq!(result.error, Some("Timeout".to_string()));
1823    }
1824
1825    // ── can_transition self-transitions ──────────────────────────────────
1826
1827    #[test]
1828    fn can_transition_self_is_false() {
1829        // Self-transitions are not in the match arms, so can_transition returns false,
1830        // but transition_state handles identity specially (returns true without changing state).
1831        assert!(!can_transition(TaskStatus::Pending, TaskStatus::Pending));
1832        assert!(!can_transition(TaskStatus::Running, TaskStatus::Running));
1833        assert!(!can_transition(
1834            TaskStatus::Completed,
1835            TaskStatus::Completed
1836        ));
1837        assert!(!can_transition(TaskStatus::Failed, TaskStatus::Failed));
1838        assert!(!can_transition(
1839            TaskStatus::Cancelled,
1840            TaskStatus::Cancelled
1841        ));
1842    }
1843
1844    // ── transition_state with Pending -> Pending (identity) ─────────────
1845
1846    #[test]
1847    fn transition_state_identity_pending_returns_true() {
1848        let task_id = TaskId::from_string("identity-test".to_string());
1849        let mut state = TaskState {
1850            info: TaskInfo {
1851                id: task_id,
1852                task_type: "t".to_string(),
1853                status: TaskStatus::Pending,
1854                progress: None,
1855                message: None,
1856                created_at: String::new(),
1857                started_at: None,
1858                completed_at: None,
1859                error: None,
1860            },
1861            cancel_requested: false,
1862            result: None,
1863            cx: Cx::for_testing(),
1864        };
1865        assert!(transition_state(&mut state, TaskStatus::Pending));
1866        assert_eq!(state.info.status, TaskStatus::Pending);
1867    }
1868
1869    // ── list_tasks with no filter ───────────────────────────────────────
1870
1871    #[test]
1872    fn list_tasks_no_filter_returns_all() {
1873        let manager = TaskManager::new_for_testing();
1874        let cx = Cx::for_testing();
1875        manager.register_handler("t", |_cx, _params| async { Ok(serde_json::json!({})) });
1876        let id1 = manager.submit(&cx, "t", None).unwrap();
1877        let _id2 = manager.submit(&cx, "t", None).unwrap();
1878        manager.start_task(&id1).unwrap();
1879        manager.complete_task(&id1, serde_json::json!({}));
1880        // id1 is Completed, id2 is Pending
1881        let all = manager.list_tasks(None);
1882        assert_eq!(all.len(), 2);
1883    }
1884
1885    // ── notification sender status content ──────────────────────────────
1886
1887    #[test]
1888    fn cancel_notification_includes_error_and_result() {
1889        let manager = TaskManager::new_for_testing();
1890        manager.register_handler("t", |_cx, _params| async { Ok(serde_json::json!({})) });
1891
1892        let events: Arc<std::sync::Mutex<Vec<TaskStatusNotificationParams>>> =
1893            Arc::new(std::sync::Mutex::new(Vec::new()));
1894        let sender_events = Arc::clone(&events);
1895        let sender: TaskNotificationSender = Arc::new(move |request| {
1896            if request.method == "notifications/tasks/status" {
1897                let params: TaskStatusNotificationParams = request
1898                    .params
1899                    .as_ref()
1900                    .and_then(|v| serde_json::from_value(v.clone()).ok())
1901                    .unwrap();
1902                sender_events.lock().unwrap().push(params);
1903            }
1904        });
1905        manager.set_notification_sender(sender);
1906
1907        let cx = Cx::for_testing();
1908        let id = manager.submit(&cx, "t", None).unwrap();
1909        manager.cancel(&id, Some("user abort".to_string())).unwrap();
1910
1911        let recorded = events.lock().unwrap().clone();
1912        // Last notification should be the cancellation
1913        let last = recorded.last().unwrap();
1914        assert_eq!(last.status, TaskStatus::Cancelled);
1915        assert_eq!(last.error, Some("user abort".to_string()));
1916        assert!(last.result.is_some());
1917        let result = last.result.as_ref().unwrap();
1918        assert!(!result.success);
1919    }
1920
1921    // ── complete sets progress to 1.0 ───────────────────────────────────
1922
1923    #[test]
1924    fn complete_task_sets_progress_to_one() {
1925        let manager = TaskManager::new_for_testing();
1926        let cx = Cx::for_testing();
1927        manager.register_handler("t", |_cx, _params| async { Ok(serde_json::json!({})) });
1928        let id = manager.submit(&cx, "t", None).unwrap();
1929        manager.start_task(&id).unwrap();
1930        manager.update_progress(&id, 0.5, None);
1931        manager.complete_task(&id, serde_json::json!({}));
1932        let info = manager.get_info(&id).unwrap();
1933        assert_eq!(info.progress, Some(1.0));
1934    }
1935
1936    // ── cleanup_completed — edge cases ─────────────────────────────────
1937
1938    #[test]
1939    fn cleanup_completed_keeps_terminal_without_completed_at() {
1940        let manager = TaskManager::new_for_testing();
1941        let cx = Cx::for_testing();
1942        manager.register_handler("t", |_cx, _params| async { Ok(serde_json::json!({})) });
1943        let id = manager.submit(&cx, "t", None).unwrap();
1944        manager.start_task(&id).unwrap();
1945        manager.complete_task(&id, serde_json::json!({}));
1946
1947        // Manually remove completed_at to simulate edge case
1948        {
1949            let mut tasks = manager.tasks.write().unwrap();
1950            tasks.get_mut(&id).unwrap().info.completed_at = None;
1951        }
1952
1953        // Cleanup should keep the task (no completed_at → can't determine age)
1954        manager.cleanup_completed(std::time::Duration::from_secs(0));
1955        assert_eq!(manager.total_count(), 1);
1956    }
1957
1958    #[test]
1959    fn cleanup_completed_keeps_terminal_with_unparseable_timestamp() {
1960        let manager = TaskManager::new_for_testing();
1961        let cx = Cx::for_testing();
1962        manager.register_handler("t", |_cx, _params| async { Ok(serde_json::json!({})) });
1963        let id = manager.submit(&cx, "t", None).unwrap();
1964        manager.start_task(&id).unwrap();
1965        manager.complete_task(&id, serde_json::json!({}));
1966
1967        // Set completed_at to unparseable value
1968        {
1969            let mut tasks = manager.tasks.write().unwrap();
1970            tasks.get_mut(&id).unwrap().info.completed_at = Some("not-a-date".to_string());
1971        }
1972
1973        manager.cleanup_completed(std::time::Duration::from_secs(0));
1974        assert_eq!(manager.total_count(), 1);
1975    }
1976
1977    // ── Debug with populated state ──────────────────────────────────────
1978
1979    #[test]
1980    fn debug_output_with_tasks_and_handlers() {
1981        let manager = TaskManager::new_for_testing();
1982        manager.register_handler("type_a", |_cx, _params| async { Ok(serde_json::json!({})) });
1983        manager.register_handler("type_b", |_cx, _params| async { Ok(serde_json::json!({})) });
1984        let cx = Cx::for_testing();
1985        let _ = manager.submit(&cx, "type_a", None).unwrap();
1986        let _ = manager.submit(&cx, "type_b", None).unwrap();
1987
1988        let debug = format!("{:?}", manager);
1989        assert!(debug.contains("task_count: 2"));
1990        assert!(debug.contains("handler_count: 2"));
1991    }
1992
1993    // ── Multiple handler types ──────────────────────────────────────────
1994
1995    #[test]
1996    fn multiple_handler_types_independent() {
1997        let manager = TaskManager::new_for_testing();
1998        let cx = Cx::for_testing();
1999        manager.register_handler("analyze", |_cx, _params| async {
2000            Ok(serde_json::json!({"type": "analyze"}))
2001        });
2002        manager.register_handler("summarize", |_cx, _params| async {
2003            Ok(serde_json::json!({"type": "summarize"}))
2004        });
2005
2006        let id_a = manager.submit(&cx, "analyze", None).unwrap();
2007        let id_s = manager.submit(&cx, "summarize", None).unwrap();
2008
2009        let info_a = manager.get_info(&id_a).unwrap();
2010        let info_s = manager.get_info(&id_s).unwrap();
2011        assert_eq!(info_a.task_type, "analyze");
2012        assert_eq!(info_s.task_type, "summarize");
2013    }
2014
2015    // ── list_tasks filters for all terminal statuses ────────────────────
2016
2017    #[test]
2018    fn list_tasks_filter_failed() {
2019        let manager = TaskManager::new_for_testing();
2020        let cx = Cx::for_testing();
2021        manager.register_handler("t", |_cx, _params| async { Ok(serde_json::json!({})) });
2022
2023        let id = manager.submit(&cx, "t", None).unwrap();
2024        manager.start_task(&id).unwrap();
2025        manager.fail_task(&id, "err");
2026
2027        assert_eq!(manager.list_tasks(Some(TaskStatus::Failed)).len(), 1);
2028        assert_eq!(manager.list_tasks(Some(TaskStatus::Completed)).len(), 0);
2029    }
2030
2031    #[test]
2032    fn list_tasks_filter_cancelled() {
2033        let manager = TaskManager::new_for_testing();
2034        let cx = Cx::for_testing();
2035        manager.register_handler("t", |_cx, _params| async { Ok(serde_json::json!({})) });
2036
2037        let id = manager.submit(&cx, "t", None).unwrap();
2038        manager.cancel(&id, None).unwrap();
2039
2040        assert_eq!(manager.list_tasks(Some(TaskStatus::Cancelled)).len(), 1);
2041        assert_eq!(manager.list_tasks(Some(TaskStatus::Pending)).len(), 0);
2042    }
2043
2044    // ── notification content for progress ────────────────────────────────
2045
2046    #[test]
2047    fn progress_notification_includes_message() {
2048        let manager = TaskManager::new_for_testing();
2049        manager.register_handler("t", |_cx, _params| async { Ok(serde_json::json!({})) });
2050
2051        let events: Arc<std::sync::Mutex<Vec<TaskStatusNotificationParams>>> =
2052            Arc::new(std::sync::Mutex::new(Vec::new()));
2053        let sender_events = Arc::clone(&events);
2054        let sender: TaskNotificationSender = Arc::new(move |request| {
2055            if request.method == "notifications/tasks/status" {
2056                let params: TaskStatusNotificationParams = request
2057                    .params
2058                    .as_ref()
2059                    .and_then(|v| serde_json::from_value(v.clone()).ok())
2060                    .unwrap();
2061                sender_events.lock().unwrap().push(params);
2062            }
2063        });
2064        manager.set_notification_sender(sender);
2065
2066        let cx = Cx::for_testing();
2067        let id = manager.submit(&cx, "t", None).unwrap();
2068        manager.start_task(&id).unwrap();
2069        manager.update_progress(&id, 0.75, Some("three quarters".to_string()));
2070
2071        let recorded = events.lock().unwrap().clone();
2072        let progress_event = recorded
2073            .iter()
2074            .find(|e| e.progress == Some(0.75))
2075            .expect("progress notification");
2076        assert_eq!(progress_event.message, Some("three quarters".to_string()));
2077        assert_eq!(progress_event.status, TaskStatus::Running);
2078    }
2079
2080    // ── TaskStatusSnapshot with result ────────────────────────────────────
2081
2082    #[test]
2083    fn task_status_snapshot_includes_result() {
2084        let task_id = TaskId::from_string("snap-result");
2085        let state = TaskState {
2086            info: TaskInfo {
2087                id: task_id.clone(),
2088                task_type: "t".to_string(),
2089                status: TaskStatus::Completed,
2090                progress: Some(1.0),
2091                message: None,
2092                created_at: "now".to_string(),
2093                started_at: Some("now".to_string()),
2094                completed_at: Some("now".to_string()),
2095                error: None,
2096            },
2097            cancel_requested: false,
2098            result: Some(TaskResult {
2099                id: task_id,
2100                success: true,
2101                data: Some(serde_json::json!({"done": true})),
2102                error: None,
2103            }),
2104            cx: Cx::for_testing(),
2105        };
2106        let snapshot = TaskStatusSnapshot::from(&state);
2107        assert!(snapshot.result.is_some());
2108        let result = snapshot.result.unwrap();
2109        assert!(result.success);
2110        assert_eq!(result.data, Some(serde_json::json!({"done": true})));
2111    }
2112
2113    // ── submit error message ──────────────────────────────────────────────
2114
2115    #[test]
2116    fn submit_unknown_task_type_error_message() {
2117        let manager = TaskManager::new_for_testing();
2118        let cx = Cx::for_testing();
2119        let err = manager.submit(&cx, "nonexistent_type", None).unwrap_err();
2120        assert!(err.message.contains("Unknown task type"));
2121        assert!(err.message.contains("nonexistent_type"));
2122    }
2123
2124    // ── cancel result data ───────────────────────────────────────────────
2125
2126    #[test]
2127    fn cancel_result_has_no_data() {
2128        let manager = TaskManager::new_for_testing();
2129        let cx = Cx::for_testing();
2130        manager.register_handler("t", |_cx, _params| async { Ok(serde_json::json!({})) });
2131        let id = manager.submit(&cx, "t", None).unwrap();
2132        manager.start_task(&id).unwrap();
2133        manager.cancel(&id, Some("abort".to_string())).unwrap();
2134        let result = manager.get_result(&id).unwrap();
2135        assert!(!result.success);
2136        assert!(result.data.is_none());
2137        assert_eq!(result.error, Some("abort".to_string()));
2138    }
2139
2140    // ── Additional coverage — uncovered terminal-state cancel paths ──
2141
2142    #[test]
2143    fn cancel_completed_task_returns_error() {
2144        let manager = TaskManager::new_for_testing();
2145        let cx = Cx::for_testing();
2146        manager.register_handler("t", |_cx, _params| async { Ok(serde_json::json!({})) });
2147        let id = manager.submit(&cx, "t", None).unwrap();
2148        manager.start_task(&id).unwrap();
2149        manager.complete_task(&id, serde_json::json!({}));
2150        let err = manager.cancel(&id, None).unwrap_err();
2151        assert!(err.message.contains("terminal"));
2152    }
2153
2154    #[test]
2155    fn cancel_failed_task_returns_error() {
2156        let manager = TaskManager::new_for_testing();
2157        let cx = Cx::for_testing();
2158        manager.register_handler("t", |_cx, _params| async { Ok(serde_json::json!({})) });
2159        let id = manager.submit(&cx, "t", None).unwrap();
2160        manager.start_task(&id).unwrap();
2161        manager.fail_task(&id, "broke");
2162        let err = manager.cancel(&id, None).unwrap_err();
2163        assert!(err.message.contains("terminal"));
2164    }
2165
2166    #[test]
2167    fn fail_task_on_pending_is_ignored() {
2168        let manager = TaskManager::new_for_testing();
2169        let cx = Cx::for_testing();
2170        manager.register_handler("t", |_cx, _params| async { Ok(serde_json::json!({})) });
2171        let id = manager.submit(&cx, "t", None).unwrap();
2172        // Pending -> Failed is not a valid transition
2173        manager.fail_task(&id, "too early");
2174        let info = manager.get_info(&id).unwrap();
2175        assert_eq!(info.status, TaskStatus::Pending);
2176        assert!(manager.get_result(&id).is_none());
2177    }
2178
2179    #[test]
2180    fn complete_task_on_cancelled_is_ignored() {
2181        let manager = TaskManager::new_for_testing();
2182        let cx = Cx::for_testing();
2183        manager.register_handler("t", |_cx, _params| async { Ok(serde_json::json!({})) });
2184        let id = manager.submit(&cx, "t", None).unwrap();
2185        manager.start_task(&id).unwrap();
2186        manager.cancel(&id, Some("aborted".to_string())).unwrap();
2187        // Cancelled -> Completed is not valid
2188        manager.complete_task(&id, serde_json::json!({"late": true}));
2189        let info = manager.get_info(&id).unwrap();
2190        assert_eq!(info.status, TaskStatus::Cancelled);
2191    }
2192
2193    #[test]
2194    fn update_progress_none_message_clears_previous() {
2195        let manager = TaskManager::new_for_testing();
2196        let cx = Cx::for_testing();
2197        manager.register_handler("t", |_cx, _params| async { Ok(serde_json::json!({})) });
2198        let id = manager.submit(&cx, "t", None).unwrap();
2199        manager.start_task(&id).unwrap();
2200        manager.update_progress(&id, 0.3, Some("step 1".to_string()));
2201        assert_eq!(
2202            manager.get_info(&id).unwrap().message,
2203            Some("step 1".to_string())
2204        );
2205        manager.update_progress(&id, 0.6, None);
2206        assert!(manager.get_info(&id).unwrap().message.is_none());
2207    }
2208
2209    #[test]
2210    fn no_notification_sender_does_not_panic() {
2211        let manager = TaskManager::new_for_testing();
2212        let cx = Cx::for_testing();
2213        manager.register_handler("t", |_cx, _params| async { Ok(serde_json::json!({})) });
2214        // No notification sender set — all operations should still work
2215        let id = manager.submit(&cx, "t", None).unwrap();
2216        manager.start_task(&id).unwrap();
2217        manager.update_progress(&id, 0.5, None);
2218        manager.complete_task(&id, serde_json::json!({}));
2219        assert_eq!(manager.get_info(&id).unwrap().status, TaskStatus::Completed);
2220    }
2221}