Skip to main content

fastmcp_server/
docket.rs

1//! Docket: Distributed task queue for FastMCP.
2//!
3//! Provides distributed task queue capabilities with support for multiple backends:
4//! - **Memory**: In-process queue for testing and development
5//! - **Redis**: Distributed queue for production deployments
6//!
7//! # Architecture
8//!
9//! ```text
10//! ┌────────────────────────────────────────────────────────────┐
11//! │                        Docket                              │
12//! │  ┌─────────────────┐  ┌─────────────────┐                  │
13//! │  │   DocketClient  │  │   Worker Pool   │                  │
14//! │  │   (task submit) │  │   (processing)  │                  │
15//! │  └────────┬────────┘  └────────┬────────┘                  │
16//! │           │                    │                           │
17//! │           └──────────┬─────────┘                           │
18//! │                      ▼                                     │
19//! │           ┌────────────────────┐                           │
20//! │           │   DocketBackend    │                           │
21//! │           └─────────┬──────────┘                           │
22//! │                     │                                      │
23//! │       ┌─────────────┼─────────────┐                        │
24//! │       ▼             ▼             ▼                        │
25//! │  ┌─────────┐  ┌─────────┐  ┌─────────────┐                 │
26//! │  │ Memory  │  │  Redis  │  │   Future    │                 │
27//! │  │ Backend │  │ Backend │  │  Backends   │                 │
28//! │  └─────────┘  └─────────┘  └─────────────┘                 │
29//! └────────────────────────────────────────────────────────────┘
30//! ```
31//!
32//! # Usage
33//!
34//! ```ignore
35//! use fastmcp_server::docket::{Docket, DocketSettings, Worker};
36//!
37//! // Create with memory backend (testing/development)
38//! let docket = Docket::memory();
39//!
40//! // Create with Redis backend (production)
41//! let settings = DocketSettings::redis("redis://localhost:6379");
42//! let docket = Docket::new(settings)?;
43//!
44//! // Submit a task
45//! let task_id = docket.submit("process_data", json!({"input": "data"})).await?;
46//!
47//! // Create a worker
48//! let worker = docket.worker()
49//!     .subscribe("process_data", |task| async move {
50//!         // Process the task
51//!         Ok(json!({"result": "processed"}))
52//!     })
53//!     .build();
54//!
55//! // Start processing
56//! worker.run().await?;
57//! ```
58
59use std::collections::{HashMap, VecDeque};
60use std::future::Future;
61use std::pin::Pin;
62use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
63use std::sync::{Arc, RwLock};
64use std::time::Duration;
65
66use asupersync::Cx;
67use fastmcp_core::McpError;
68use fastmcp_core::logging::{debug, info, targets, warn};
69use fastmcp_protocol::{TaskId, TaskInfo, TaskResult, TaskStatus};
70use serde::{Deserialize, Serialize};
71
72// ============================================================================
73// Configuration
74// ============================================================================
75
76/// Settings for configuring a Docket instance.
77#[derive(Debug, Clone)]
78pub struct DocketSettings {
79    /// Backend type (memory or redis).
80    pub backend: DocketBackendType,
81    /// Queue name prefix for namespacing.
82    pub queue_prefix: String,
83    /// Task visibility timeout (how long before unacked task is requeued).
84    pub visibility_timeout: Duration,
85    /// Default task timeout.
86    pub default_task_timeout: Duration,
87    /// Maximum retry count for failed tasks.
88    pub max_retries: u32,
89    /// Delay between retries (with exponential backoff).
90    pub retry_delay: Duration,
91    /// Worker poll interval when queue is empty.
92    pub poll_interval: Duration,
93}
94
95impl Default for DocketSettings {
96    fn default() -> Self {
97        Self {
98            backend: DocketBackendType::Memory,
99            queue_prefix: "fastmcp:docket".to_string(),
100            visibility_timeout: Duration::from_secs(30),
101            default_task_timeout: Duration::from_secs(300),
102            max_retries: 3,
103            retry_delay: Duration::from_secs(1),
104            poll_interval: Duration::from_millis(100),
105        }
106    }
107}
108
109impl DocketSettings {
110    /// Creates settings for memory backend (testing/development).
111    #[must_use]
112    pub fn memory() -> Self {
113        Self::default()
114    }
115
116    /// Creates settings for Redis backend.
117    #[must_use]
118    pub fn redis(url: impl Into<String>) -> Self {
119        Self {
120            backend: DocketBackendType::Redis(RedisSettings {
121                url: url.into(),
122                pool_size: 10,
123                connect_timeout: Duration::from_secs(5),
124            }),
125            ..Self::default()
126        }
127    }
128
129    /// Sets the queue prefix.
130    #[must_use]
131    pub fn with_queue_prefix(mut self, prefix: impl Into<String>) -> Self {
132        self.queue_prefix = prefix.into();
133        self
134    }
135
136    /// Sets the visibility timeout.
137    #[must_use]
138    pub fn with_visibility_timeout(mut self, timeout: Duration) -> Self {
139        self.visibility_timeout = timeout;
140        self
141    }
142
143    /// Sets the maximum retry count.
144    #[must_use]
145    pub fn with_max_retries(mut self, retries: u32) -> Self {
146        self.max_retries = retries;
147        self
148    }
149
150    /// Sets the poll interval.
151    #[must_use]
152    pub fn with_poll_interval(mut self, interval: Duration) -> Self {
153        self.poll_interval = interval;
154        self
155    }
156}
157
158/// Backend type configuration.
159#[derive(Debug, Clone)]
160pub enum DocketBackendType {
161    /// In-memory backend for testing/development.
162    Memory,
163    /// Redis backend for production.
164    Redis(RedisSettings),
165}
166
167/// Redis connection settings.
168#[derive(Debug, Clone)]
169pub struct RedisSettings {
170    /// Redis connection URL.
171    pub url: String,
172    /// Connection pool size.
173    pub pool_size: usize,
174    /// Connection timeout.
175    pub connect_timeout: Duration,
176}
177
178// ============================================================================
179// Task Types
180// ============================================================================
181
182/// A queued task in the Docket system.
183#[derive(Debug, Clone, Serialize, Deserialize)]
184pub struct DocketTask {
185    /// Unique task identifier.
186    pub id: TaskId,
187    /// Task type (determines which handler processes it).
188    pub task_type: String,
189    /// Task parameters.
190    pub params: serde_json::Value,
191    /// Task priority (higher = processed first).
192    pub priority: i32,
193    /// Number of retry attempts so far.
194    pub retry_count: u32,
195    /// Maximum retries allowed.
196    pub max_retries: u32,
197    /// When the task was created.
198    pub created_at: String,
199    /// When the task was claimed by a worker.
200    pub claimed_at: Option<String>,
201    /// Current task status.
202    pub status: TaskStatus,
203    /// Error message if failed.
204    pub error: Option<String>,
205    /// Task result if completed.
206    pub result: Option<serde_json::Value>,
207}
208
209impl DocketTask {
210    /// Creates a new task.
211    fn new(
212        id: TaskId,
213        task_type: String,
214        params: serde_json::Value,
215        priority: i32,
216        max_retries: u32,
217    ) -> Self {
218        Self {
219            id,
220            task_type,
221            params,
222            priority,
223            retry_count: 0,
224            max_retries,
225            created_at: chrono::Utc::now().to_rfc3339(),
226            claimed_at: None,
227            status: TaskStatus::Pending,
228            error: None,
229            result: None,
230        }
231    }
232
233    /// Converts to TaskInfo for protocol responses.
234    #[must_use]
235    pub fn to_task_info(&self) -> TaskInfo {
236        TaskInfo {
237            id: self.id.clone(),
238            task_type: self.task_type.clone(),
239            status: self.status,
240            progress: None,
241            message: None,
242            created_at: self.created_at.clone(),
243            started_at: self.claimed_at.clone(),
244            completed_at: if self.status.is_terminal() {
245                Some(chrono::Utc::now().to_rfc3339())
246            } else {
247                None
248            },
249            error: self.error.clone(),
250        }
251    }
252
253    /// Converts to TaskResult for protocol responses.
254    #[must_use]
255    pub fn to_task_result(&self) -> Option<TaskResult> {
256        if !self.status.is_terminal() {
257            return None;
258        }
259        Some(TaskResult {
260            id: self.id.clone(),
261            success: self.status == TaskStatus::Completed,
262            data: self.result.clone(),
263            error: self.error.clone(),
264        })
265    }
266}
267
268/// Options for submitting a task.
269#[derive(Debug, Clone, Default)]
270pub struct SubmitOptions {
271    /// Task priority (higher = processed first).
272    pub priority: i32,
273    /// Maximum retries (overrides default).
274    pub max_retries: Option<u32>,
275    /// Delay before task becomes visible.
276    pub delay: Option<Duration>,
277}
278
279impl SubmitOptions {
280    /// Creates default options.
281    #[must_use]
282    pub fn new() -> Self {
283        Self::default()
284    }
285
286    /// Sets task priority.
287    #[must_use]
288    pub fn with_priority(mut self, priority: i32) -> Self {
289        self.priority = priority;
290        self
291    }
292
293    /// Sets max retries.
294    #[must_use]
295    pub fn with_max_retries(mut self, retries: u32) -> Self {
296        self.max_retries = Some(retries);
297        self
298    }
299
300    /// Sets initial delay.
301    #[must_use]
302    pub fn with_delay(mut self, delay: Duration) -> Self {
303        self.delay = Some(delay);
304        self
305    }
306}
307
308// ============================================================================
309// Backend Trait
310// ============================================================================
311
312/// Result type for Docket operations.
313pub type DocketResult<T> = Result<T, DocketError>;
314
315/// Errors that can occur in Docket operations.
316#[derive(Debug)]
317pub enum DocketError {
318    /// Task not found.
319    NotFound(String),
320    /// Backend connection error.
321    Connection(String),
322    /// Serialization error.
323    Serialization(String),
324    /// Task handler error.
325    Handler(String),
326    /// Backend-specific error.
327    Backend(String),
328    /// Cancelled.
329    Cancelled,
330}
331
332impl std::fmt::Display for DocketError {
333    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
334        match self {
335            DocketError::NotFound(msg) => write!(f, "Task not found: {msg}"),
336            DocketError::Connection(msg) => write!(f, "Connection error: {msg}"),
337            DocketError::Serialization(msg) => write!(f, "Serialization error: {msg}"),
338            DocketError::Handler(msg) => write!(f, "Handler error: {msg}"),
339            DocketError::Backend(msg) => write!(f, "Backend error: {msg}"),
340            DocketError::Cancelled => write!(f, "Operation cancelled"),
341        }
342    }
343}
344
345impl std::error::Error for DocketError {}
346
347impl From<DocketError> for McpError {
348    fn from(err: DocketError) -> Self {
349        McpError::internal_error(err.to_string())
350    }
351}
352
353/// Backend trait for Docket storage.
354///
355/// Implementations provide the actual storage and retrieval of tasks.
356/// The trait uses synchronous methods but backends can internally use
357/// async operations wrapped in blocking.
358pub trait DocketBackend: Send + Sync {
359    /// Enqueues a task for processing.
360    fn enqueue(&self, task: DocketTask) -> DocketResult<()>;
361
362    /// Dequeues a task for the given task types.
363    ///
364    /// Returns the highest priority task that matches one of the subscribed types.
365    /// The task is marked as claimed but not removed from the queue until acknowledged.
366    fn dequeue(&self, task_types: &[String]) -> DocketResult<Option<DocketTask>>;
367
368    /// Acknowledges successful task completion.
369    fn ack(&self, task_id: &TaskId, result: serde_json::Value) -> DocketResult<()>;
370
371    /// Negative acknowledgement - task failed, may be retried.
372    fn nack(&self, task_id: &TaskId, error: &str) -> DocketResult<()>;
373
374    /// Gets task by ID.
375    fn get_task(&self, task_id: &TaskId) -> DocketResult<Option<DocketTask>>;
376
377    /// Lists tasks, optionally filtered by status.
378    fn list_tasks(&self, status: Option<TaskStatus>, limit: usize)
379    -> DocketResult<Vec<DocketTask>>;
380
381    /// Cancels a task.
382    fn cancel(&self, task_id: &TaskId, reason: Option<&str>) -> DocketResult<()>;
383
384    /// Returns queue statistics.
385    fn stats(&self) -> DocketResult<QueueStats>;
386
387    /// Requeues tasks that have exceeded visibility timeout.
388    fn requeue_stale(&self) -> DocketResult<usize>;
389}
390
391/// Queue statistics.
392#[derive(Debug, Clone, Default)]
393pub struct QueueStats {
394    /// Number of pending tasks.
395    pub pending: usize,
396    /// Number of in-progress tasks.
397    pub in_progress: usize,
398    /// Number of completed tasks.
399    pub completed: usize,
400    /// Number of failed tasks.
401    pub failed: usize,
402    /// Number of cancelled tasks.
403    pub cancelled: usize,
404}
405
406// ============================================================================
407// Memory Backend
408// ============================================================================
409
410/// In-memory Docket backend for testing and development.
411///
412/// Tasks are stored in memory and not persisted across restarts.
413/// This backend is thread-safe and suitable for single-process deployments.
414pub struct MemoryDocketBackend {
415    /// All tasks indexed by ID.
416    tasks: RwLock<HashMap<TaskId, DocketTask>>,
417    /// Pending tasks queue (sorted by priority, then creation time).
418    pending: RwLock<VecDeque<TaskId>>,
419    /// Settings for visibility timeout, retries, etc.
420    settings: DocketSettings,
421}
422
423impl MemoryDocketBackend {
424    /// Creates a new memory backend.
425    #[must_use]
426    pub fn new(settings: DocketSettings) -> Self {
427        Self {
428            tasks: RwLock::new(HashMap::new()),
429            pending: RwLock::new(VecDeque::new()),
430            settings,
431        }
432    }
433}
434
435impl DocketBackend for MemoryDocketBackend {
436    fn enqueue(&self, task: DocketTask) -> DocketResult<()> {
437        let task_id = task.id.clone();
438        let priority = task.priority;
439
440        {
441            let mut tasks = self
442                .tasks
443                .write()
444                .map_err(|e| DocketError::Backend(format!("Lock poisoned: {e}")))?;
445            tasks.insert(task_id.clone(), task);
446        }
447
448        {
449            let mut pending = self
450                .pending
451                .write()
452                .map_err(|e| DocketError::Backend(format!("Lock poisoned: {e}")))?;
453
454            // Insert maintaining priority order (higher priority first)
455            let tasks = self
456                .tasks
457                .read()
458                .map_err(|e| DocketError::Backend(format!("Lock poisoned: {e}")))?;
459
460            let pos = pending
461                .iter()
462                .position(|id| tasks.get(id).is_none_or(|t| t.priority < priority))
463                .unwrap_or(pending.len());
464
465            pending.insert(pos, task_id);
466        }
467
468        Ok(())
469    }
470
471    fn dequeue(&self, task_types: &[String]) -> DocketResult<Option<DocketTask>> {
472        let mut pending = self
473            .pending
474            .write()
475            .map_err(|e| DocketError::Backend(format!("Lock poisoned: {e}")))?;
476        let mut tasks = self
477            .tasks
478            .write()
479            .map_err(|e| DocketError::Backend(format!("Lock poisoned: {e}")))?;
480
481        // Find first pending task matching subscribed types
482        let pos = pending.iter().position(|id| {
483            tasks.get(id).is_some_and(|t| {
484                t.status == TaskStatus::Pending && task_types.contains(&t.task_type)
485            })
486        });
487
488        if let Some(pos) = pos {
489            let task_id = pending.remove(pos).expect("position valid");
490            if let Some(task) = tasks.get_mut(&task_id) {
491                task.status = TaskStatus::Running;
492                task.claimed_at = Some(chrono::Utc::now().to_rfc3339());
493                return Ok(Some(task.clone()));
494            }
495        }
496
497        Ok(None)
498    }
499
500    fn ack(&self, task_id: &TaskId, result: serde_json::Value) -> DocketResult<()> {
501        let mut tasks = self
502            .tasks
503            .write()
504            .map_err(|e| DocketError::Backend(format!("Lock poisoned: {e}")))?;
505
506        let task = tasks
507            .get_mut(task_id)
508            .ok_or_else(|| DocketError::NotFound(task_id.to_string()))?;
509
510        task.status = TaskStatus::Completed;
511        task.result = Some(result);
512
513        Ok(())
514    }
515
516    fn nack(&self, task_id: &TaskId, error: &str) -> DocketResult<()> {
517        let mut tasks = self
518            .tasks
519            .write()
520            .map_err(|e| DocketError::Backend(format!("Lock poisoned: {e}")))?;
521        let mut pending = self
522            .pending
523            .write()
524            .map_err(|e| DocketError::Backend(format!("Lock poisoned: {e}")))?;
525
526        let task = tasks
527            .get_mut(task_id)
528            .ok_or_else(|| DocketError::NotFound(task_id.to_string()))?;
529
530        task.retry_count += 1;
531        task.error = Some(error.to_string());
532
533        if task.retry_count >= task.max_retries {
534            // Max retries exceeded - mark as failed
535            task.status = TaskStatus::Failed;
536        } else {
537            // Requeue for retry
538            task.status = TaskStatus::Pending;
539            task.claimed_at = None;
540            pending.push_back(task_id.clone());
541        }
542
543        Ok(())
544    }
545
546    fn get_task(&self, task_id: &TaskId) -> DocketResult<Option<DocketTask>> {
547        let tasks = self
548            .tasks
549            .read()
550            .map_err(|e| DocketError::Backend(format!("Lock poisoned: {e}")))?;
551        Ok(tasks.get(task_id).cloned())
552    }
553
554    fn list_tasks(
555        &self,
556        status: Option<TaskStatus>,
557        limit: usize,
558    ) -> DocketResult<Vec<DocketTask>> {
559        let tasks = self
560            .tasks
561            .read()
562            .map_err(|e| DocketError::Backend(format!("Lock poisoned: {e}")))?;
563
564        let iter = tasks
565            .values()
566            .filter(|t| status.is_none_or(|s| t.status == s));
567
568        Ok(iter.take(limit).cloned().collect())
569    }
570
571    fn cancel(&self, task_id: &TaskId, reason: Option<&str>) -> DocketResult<()> {
572        let mut tasks = self
573            .tasks
574            .write()
575            .map_err(|e| DocketError::Backend(format!("Lock poisoned: {e}")))?;
576        let mut pending = self
577            .pending
578            .write()
579            .map_err(|e| DocketError::Backend(format!("Lock poisoned: {e}")))?;
580
581        let task = tasks
582            .get_mut(task_id)
583            .ok_or_else(|| DocketError::NotFound(task_id.to_string()))?;
584
585        if task.status.is_terminal() {
586            return Err(DocketError::Backend(format!(
587                "Cannot cancel task in terminal state: {:?}",
588                task.status
589            )));
590        }
591
592        task.status = TaskStatus::Cancelled;
593        task.error = reason.map(String::from);
594
595        // Remove from pending queue
596        pending.retain(|id| id != task_id);
597
598        Ok(())
599    }
600
601    fn stats(&self) -> DocketResult<QueueStats> {
602        let tasks = self
603            .tasks
604            .read()
605            .map_err(|e| DocketError::Backend(format!("Lock poisoned: {e}")))?;
606
607        let mut stats = QueueStats::default();
608        for task in tasks.values() {
609            match task.status {
610                TaskStatus::Pending => stats.pending += 1,
611                TaskStatus::Running => stats.in_progress += 1,
612                TaskStatus::Completed => stats.completed += 1,
613                TaskStatus::Failed => stats.failed += 1,
614                TaskStatus::Cancelled => stats.cancelled += 1,
615            }
616        }
617
618        Ok(stats)
619    }
620
621    fn requeue_stale(&self) -> DocketResult<usize> {
622        let mut tasks = self
623            .tasks
624            .write()
625            .map_err(|e| DocketError::Backend(format!("Lock poisoned: {e}")))?;
626        let mut pending = self
627            .pending
628            .write()
629            .map_err(|e| DocketError::Backend(format!("Lock poisoned: {e}")))?;
630
631        let now = chrono::Utc::now();
632        let timeout = chrono::Duration::from_std(self.settings.visibility_timeout)
633            .unwrap_or_else(|_| chrono::Duration::seconds(30));
634
635        let mut requeued = 0;
636
637        for task in tasks.values_mut() {
638            if task.status != TaskStatus::Running {
639                continue;
640            }
641
642            if let Some(ref claimed_at) = task.claimed_at {
643                if let Ok(claimed) = chrono::DateTime::parse_from_rfc3339(claimed_at) {
644                    if now - claimed.with_timezone(&chrono::Utc) > timeout {
645                        // Task has exceeded visibility timeout - requeue
646                        task.status = TaskStatus::Pending;
647                        task.claimed_at = None;
648                        task.retry_count += 1;
649
650                        if task.retry_count >= task.max_retries {
651                            task.status = TaskStatus::Failed;
652                            task.error = Some("Exceeded visibility timeout".to_string());
653                        } else {
654                            pending.push_back(task.id.clone());
655                            requeued += 1;
656                        }
657                    }
658                }
659            }
660        }
661
662        Ok(requeued)
663    }
664}
665
666// ============================================================================
667// Redis Backend
668// ============================================================================
669
670/// Redis Docket backend for production distributed deployments.
671///
672/// Uses Redis lists and sorted sets for reliable task queuing with
673/// visibility timeout and atomic operations.
674#[cfg(feature = "redis")]
675pub struct RedisDocketBackend {
676    client: redis::Client,
677    pool: Vec<std::sync::Mutex<redis::Connection>>,
678    next_conn: std::sync::atomic::AtomicUsize,
679    settings: RedisSettings,
680    docket_settings: DocketSettings,
681}
682
683#[cfg(feature = "redis")]
684impl RedisDocketBackend {
685    /// Creates a new Redis backend.
686    pub fn new(
687        redis_settings: RedisSettings,
688        docket_settings: DocketSettings,
689    ) -> DocketResult<Self> {
690        let client = redis::Client::open(redis_settings.url.as_str())
691            .map_err(|e| DocketError::Backend(format!("Redis client init failed: {e}")))?;
692
693        let mut pool = Vec::new();
694        let pool_size = redis_settings.pool_size.max(1);
695        for _ in 0..pool_size {
696            let conn = client
697                .get_connection()
698                .map_err(|e| DocketError::Backend(format!("Redis connect failed: {e}")))?;
699            pool.push(std::sync::Mutex::new(conn));
700        }
701
702        Ok(Self {
703            client,
704            pool,
705            next_conn: std::sync::atomic::AtomicUsize::new(0),
706            settings: redis_settings,
707            docket_settings,
708        })
709    }
710
711    fn key_tasks(&self) -> String {
712        format!("{}:tasks", self.docket_settings.queue_prefix)
713    }
714
715    fn key_running(&self) -> String {
716        format!("{}:running", self.docket_settings.queue_prefix)
717    }
718
719    fn key_types(&self) -> String {
720        format!("{}:types", self.docket_settings.queue_prefix)
721    }
722
723    fn key_queue_member(&self) -> String {
724        // task_id -> queue member encoding (used for pending/delayed removal)
725        format!("{}:queue_member", self.docket_settings.queue_prefix)
726    }
727
728    fn key_queue_type(&self) -> String {
729        // task_id -> task_type (used for pending/delayed key selection)
730        format!("{}:queue_type", self.docket_settings.queue_prefix)
731    }
732
733    fn key_pending(&self, task_type: &str) -> String {
734        format!("{}:pending:{task_type}", self.docket_settings.queue_prefix)
735    }
736
737    fn key_delayed(&self, task_type: &str) -> String {
738        format!("{}:delayed:{task_type}", self.docket_settings.queue_prefix)
739    }
740
741    fn now_ms() -> i64 {
742        chrono::Utc::now().timestamp_millis()
743    }
744
745    fn now_rfc3339() -> String {
746        chrono::Utc::now().to_rfc3339()
747    }
748
749    fn encode_member(task: &DocketTask) -> String {
750        let created_ms = chrono::DateTime::parse_from_rfc3339(&task.created_at)
751            .map(|dt| dt.timestamp_millis())
752            .unwrap_or_else(|_| chrono::Utc::now().timestamp_millis());
753        let prio_key: i64 = (i32::MAX as i64) - (task.priority as i64);
754        format!("{prio_key:010}:{created_ms:013}:{}", task.id.0)
755    }
756
757    fn retry_delay_ms(&self, retry_count: u32) -> i64 {
758        let base = self
759            .docket_settings
760            .retry_delay
761            .as_millis()
762            .min(i64::MAX as u128) as i64;
763        if base <= 0 {
764            return 0;
765        }
766        let exp = retry_count.saturating_sub(1).min(30);
767        let factor: i64 = 1i64.checked_shl(exp).unwrap_or(i64::MAX);
768        base.saturating_mul(factor)
769    }
770
771    fn with_conn<T>(
772        &self,
773        f: impl FnOnce(&mut redis::Connection) -> redis::RedisResult<T>,
774    ) -> DocketResult<T> {
775        let idx = self
776            .next_conn
777            .fetch_add(1, std::sync::atomic::Ordering::Relaxed)
778            % self.pool.len();
779        let mut guard = self.pool[idx]
780            .lock()
781            .unwrap_or_else(std::sync::PoisonError::into_inner);
782        f(&mut guard).map_err(|e| DocketError::Backend(format!("Redis error: {e}")))
783    }
784}
785
786#[cfg(feature = "redis")]
787impl DocketBackend for RedisDocketBackend {
788    fn enqueue(&self, task: DocketTask) -> DocketResult<()> {
789        let task_id = task.id.0.clone();
790        let task_type = task.task_type.clone();
791        let member = Self::encode_member(&task);
792        let json = serde_json::to_string(&task)
793            .map_err(|e| DocketError::Backend(format!("Task serialize failed: {e}")))?;
794
795        let tasks_key = self.key_tasks();
796        let types_key = self.key_types();
797        let member_key = self.key_queue_member();
798        let type_key = self.key_queue_type();
799        let pending_key = self.key_pending(&task_type);
800
801        self.with_conn(|conn| {
802            redis::pipe()
803                .atomic()
804                .cmd("HSET")
805                .arg(&tasks_key)
806                .arg(&task_id)
807                .arg(&json)
808                .ignore()
809                .cmd("SADD")
810                .arg(&types_key)
811                .arg(&task_type)
812                .ignore()
813                .cmd("HSET")
814                .arg(&member_key)
815                .arg(&task_id)
816                .arg(&member)
817                .ignore()
818                .cmd("HSET")
819                .arg(&type_key)
820                .arg(&task_id)
821                .arg(&task_type)
822                .ignore()
823                .cmd("ZADD")
824                .arg(&pending_key)
825                .arg(0)
826                .arg(&member)
827                .ignore()
828                .query::<()>(conn)
829        })?;
830
831        Ok(())
832    }
833
834    fn dequeue(&self, task_types: &[String]) -> DocketResult<Option<DocketTask>> {
835        if task_types.is_empty() {
836            return Ok(None);
837        }
838
839        // Move any delayed tasks that are now due into the pending queues.
840        let _ = self.requeue_stale();
841
842        let mut pending_keys: Vec<String> =
843            task_types.iter().map(|t| self.key_pending(t)).collect();
844        let tasks_key = self.key_tasks();
845        let running_key = self.key_running();
846        let member_key = self.key_queue_member();
847
848        // KEYS: pending..., tasks_key, running_key, member_key
849        pending_keys.push(tasks_key.clone());
850        pending_keys.push(running_key.clone());
851        pending_keys.push(member_key.clone());
852
853        let now_rfc = Self::now_rfc3339();
854        let now_ms = Self::now_ms();
855
856        // Atomically pick the best pending task across the subscribed queues.
857        const LUA: &str = r#"
858local tasks_key = KEYS[#KEYS-2]
859local running_key = KEYS[#KEYS-1]
860local member_key = KEYS[#KEYS]
861
862local now_rfc = ARGV[1]
863local now_ms = tonumber(ARGV[2])
864
865local best = nil
866local best_key = nil
867for i=1,(#KEYS-3) do
868  local k = KEYS[i]
869  local r = redis.call('ZRANGE', k, 0, 0)
870  if r and r[1] then
871    local m = r[1]
872    if (not best) or (m < best) then
873      best = m
874      best_key = k
875    end
876  end
877end
878
879if not best then
880  return nil
881end
882
883redis.call('ZREM', best_key, best)
884
885-- Extract task_id from member "{prio}:{created}:{task_id}"
886local _, _, task_id = string.find(best, "^[^:]+:[^:]+:(.+)$")
887if not task_id then
888  return nil
889end
890
891redis.call('HDEL', member_key, task_id)
892redis.call('ZADD', running_key, now_ms, task_id)
893
894local tjson = redis.call('HGET', tasks_key, task_id)
895if not tjson then
896  return nil
897end
898
899local t = cjson.decode(tjson)
900t["status"] = "running"
901t["claimed_at"] = now_rfc
902local out = cjson.encode(t)
903redis.call('HSET', tasks_key, task_id, out)
904return out
905"#;
906
907        let task_json: Option<String> = self.with_conn(|conn| {
908            let script = redis::Script::new(LUA);
909            let mut inv = script.prepare_invoke();
910            for k in &pending_keys {
911                inv.key(k);
912            }
913            inv.arg(now_rfc).arg(now_ms).invoke(conn)
914        })?;
915
916        let Some(task_json) = task_json else {
917            return Ok(None);
918        };
919
920        let task: DocketTask = serde_json::from_str(&task_json)
921            .map_err(|e| DocketError::Backend(format!("Task deserialize failed: {e}")))?;
922        Ok(Some(task))
923    }
924
925    fn ack(&self, task_id: &TaskId, result: serde_json::Value) -> DocketResult<()> {
926        let task_id_str = task_id.0.clone();
927        let tasks_key = self.key_tasks();
928        let running_key = self.key_running();
929        let member_key = self.key_queue_member();
930        let type_key = self.key_queue_type();
931
932        let mut task = self
933            .get_task(task_id)?
934            .ok_or_else(|| DocketError::NotFound(task_id_str.clone()))?;
935        task.status = TaskStatus::Completed;
936        task.result = Some(result);
937
938        let json = serde_json::to_string(&task)
939            .map_err(|e| DocketError::Backend(format!("Task serialize failed: {e}")))?;
940
941        self.with_conn(|conn| {
942            redis::pipe()
943                .atomic()
944                .cmd("HSET")
945                .arg(&tasks_key)
946                .arg(&task_id_str)
947                .arg(&json)
948                .ignore()
949                .cmd("ZREM")
950                .arg(&running_key)
951                .arg(&task_id_str)
952                .ignore()
953                .cmd("HDEL")
954                .arg(&member_key)
955                .arg(&task_id_str)
956                .ignore()
957                .cmd("HDEL")
958                .arg(&type_key)
959                .arg(&task_id_str)
960                .ignore()
961                .query::<()>(conn)
962        })?;
963
964        Ok(())
965    }
966
967    fn nack(&self, task_id: &TaskId, error: &str) -> DocketResult<()> {
968        let task_id_str = task_id.0.clone();
969        let tasks_key = self.key_tasks();
970        let running_key = self.key_running();
971        let member_key = self.key_queue_member();
972        let type_key = self.key_queue_type();
973
974        let mut task = self
975            .get_task(task_id)?
976            .ok_or_else(|| DocketError::NotFound(task_id_str.clone()))?;
977
978        task.retry_count += 1;
979        task.error = Some(error.to_string());
980
981        // Remove from running
982        self.with_conn(|conn| {
983            redis::cmd("ZREM")
984                .arg(&running_key)
985                .arg(&task_id_str)
986                .query::<()>(conn)
987        })?;
988
989        if task.retry_count >= task.max_retries {
990            task.status = TaskStatus::Failed;
991            let json = serde_json::to_string(&task)
992                .map_err(|e| DocketError::Backend(format!("Task serialize failed: {e}")))?;
993            self.with_conn(|conn| {
994                redis::pipe()
995                    .atomic()
996                    .cmd("HSET")
997                    .arg(&tasks_key)
998                    .arg(&task_id_str)
999                    .arg(&json)
1000                    .ignore()
1001                    .cmd("HDEL")
1002                    .arg(&member_key)
1003                    .arg(&task_id_str)
1004                    .ignore()
1005                    .cmd("HDEL")
1006                    .arg(&type_key)
1007                    .arg(&task_id_str)
1008                    .ignore()
1009                    .query::<()>(conn)
1010            })?;
1011            return Ok(());
1012        }
1013
1014        // Schedule retry with exponential backoff.
1015        task.status = TaskStatus::Pending;
1016        task.claimed_at = None;
1017
1018        let member = Self::encode_member(&task);
1019        let task_type = task.task_type.clone();
1020        let delayed_key = self.key_delayed(&task_type);
1021        let available_ms = Self::now_ms().saturating_add(self.retry_delay_ms(task.retry_count));
1022
1023        let json = serde_json::to_string(&task)
1024            .map_err(|e| DocketError::Backend(format!("Task serialize failed: {e}")))?;
1025
1026        self.with_conn(|conn| {
1027            redis::pipe()
1028                .atomic()
1029                .cmd("HSET")
1030                .arg(&tasks_key)
1031                .arg(&task_id_str)
1032                .arg(&json)
1033                .ignore()
1034                .cmd("HSET")
1035                .arg(&member_key)
1036                .arg(&task_id_str)
1037                .arg(&member)
1038                .ignore()
1039                .cmd("HSET")
1040                .arg(&type_key)
1041                .arg(&task_id_str)
1042                .arg(&task_type)
1043                .ignore()
1044                .cmd("ZADD")
1045                .arg(&delayed_key)
1046                .arg(available_ms)
1047                .arg(&member)
1048                .ignore()
1049                .query::<()>(conn)
1050        })?;
1051
1052        Ok(())
1053    }
1054
1055    fn get_task(&self, task_id: &TaskId) -> DocketResult<Option<DocketTask>> {
1056        let tasks_key = self.key_tasks();
1057        let task_id_str = task_id.0.clone();
1058        let json: Option<String> = self.with_conn(|conn| {
1059            redis::cmd("HGET")
1060                .arg(&tasks_key)
1061                .arg(&task_id_str)
1062                .query(conn)
1063        })?;
1064        let Some(json) = json else {
1065            return Ok(None);
1066        };
1067        let task: DocketTask = serde_json::from_str(&json)
1068            .map_err(|e| DocketError::Backend(format!("Task deserialize failed: {e}")))?;
1069        Ok(Some(task))
1070    }
1071
1072    fn list_tasks(
1073        &self,
1074        status: Option<TaskStatus>,
1075        limit: usize,
1076    ) -> DocketResult<Vec<DocketTask>> {
1077        let tasks_key = self.key_tasks();
1078        let values: Vec<String> =
1079            self.with_conn(|conn| redis::cmd("HVALS").arg(&tasks_key).query(conn))?;
1080
1081        let mut tasks = Vec::new();
1082        for json in values {
1083            if let Ok(task) = serde_json::from_str::<DocketTask>(&json) {
1084                if status.is_none_or(|s| task.status == s) {
1085                    tasks.push(task);
1086                }
1087            }
1088        }
1089
1090        tasks.sort_by(|a, b| {
1091            a.created_at
1092                .cmp(&b.created_at)
1093                .then_with(|| a.id.0.cmp(&b.id.0))
1094        });
1095        tasks.truncate(limit);
1096        Ok(tasks)
1097    }
1098
1099    fn cancel(&self, task_id: &TaskId, reason: Option<&str>) -> DocketResult<()> {
1100        let task_id_str = task_id.0.clone();
1101        let tasks_key = self.key_tasks();
1102        let running_key = self.key_running();
1103        let member_key = self.key_queue_member();
1104        let type_key = self.key_queue_type();
1105
1106        let Some(mut task) = self.get_task(task_id)? else {
1107            return Err(DocketError::NotFound(task_id_str));
1108        };
1109
1110        task.status = TaskStatus::Cancelled;
1111        task.error = Some(reason.unwrap_or("Cancelled").to_string());
1112        task.result = Some(serde_json::json!({"cancelled": true}));
1113
1114        let json = serde_json::to_string(&task)
1115            .map_err(|e| DocketError::Backend(format!("Task serialize failed: {e}")))?;
1116
1117        let member: Option<String> = self.with_conn(|conn| {
1118            redis::cmd("HGET")
1119                .arg(&member_key)
1120                .arg(&task_id_str)
1121                .query(conn)
1122        })?;
1123        let task_type: Option<String> = self.with_conn(|conn| {
1124            redis::cmd("HGET")
1125                .arg(&type_key)
1126                .arg(&task_id_str)
1127                .query(conn)
1128        })?;
1129
1130        self.with_conn(|conn| {
1131            redis::pipe()
1132                .atomic()
1133                .cmd("HSET")
1134                .arg(&tasks_key)
1135                .arg(&task_id_str)
1136                .arg(&json)
1137                .ignore()
1138                .cmd("ZREM")
1139                .arg(&running_key)
1140                .arg(&task_id_str)
1141                .ignore()
1142                .cmd("HDEL")
1143                .arg(&member_key)
1144                .arg(&task_id_str)
1145                .ignore()
1146                .cmd("HDEL")
1147                .arg(&type_key)
1148                .arg(&task_id_str)
1149                .ignore()
1150                .query::<()>(conn)
1151        })?;
1152
1153        if let (Some(member), Some(task_type)) = (member, task_type) {
1154            let pending_key = self.key_pending(&task_type);
1155            let delayed_key = self.key_delayed(&task_type);
1156            let _ = self.with_conn(|conn| {
1157                redis::pipe()
1158                    .atomic()
1159                    .cmd("ZREM")
1160                    .arg(&pending_key)
1161                    .arg(&member)
1162                    .ignore()
1163                    .cmd("ZREM")
1164                    .arg(&delayed_key)
1165                    .arg(&member)
1166                    .ignore()
1167                    .query::<()>(conn)
1168            });
1169        }
1170
1171        Ok(())
1172    }
1173
1174    fn stats(&self) -> DocketResult<QueueStats> {
1175        let tasks = self.list_tasks(None, usize::MAX)?;
1176        let mut stats = QueueStats::default();
1177        for t in tasks {
1178            match t.status {
1179                TaskStatus::Pending => stats.pending += 1,
1180                TaskStatus::Running => stats.in_progress += 1,
1181                TaskStatus::Completed => stats.completed += 1,
1182                TaskStatus::Failed => stats.failed += 1,
1183                TaskStatus::Cancelled => stats.cancelled += 1,
1184            }
1185        }
1186        Ok(stats)
1187    }
1188
1189    fn requeue_stale(&self) -> DocketResult<usize> {
1190        let now_ms = Self::now_ms();
1191        let visibility_ms: i64 = self
1192            .docket_settings
1193            .visibility_timeout
1194            .as_millis()
1195            .min(i64::MAX as u128) as i64;
1196        let cutoff = now_ms.saturating_sub(visibility_ms);
1197
1198        let tasks_key = self.key_tasks();
1199        let running_key = self.key_running();
1200        let types_key = self.key_types();
1201        let member_key = self.key_queue_member();
1202        let type_key = self.key_queue_type();
1203
1204        // 1) Promote due delayed tasks into pending queues.
1205        let types: Vec<String> =
1206            self.with_conn(|conn| redis::cmd("SMEMBERS").arg(&types_key).query(conn))?;
1207        for t in &types {
1208            let delayed_key = self.key_delayed(t);
1209            let pending_key = self.key_pending(t);
1210            let due_members: Vec<String> = self.with_conn(|conn| {
1211                redis::cmd("ZRANGEBYSCORE")
1212                    .arg(&delayed_key)
1213                    .arg("-inf")
1214                    .arg(now_ms)
1215                    .query(conn)
1216            })?;
1217            if due_members.is_empty() {
1218                continue;
1219            }
1220            self.with_conn(|conn| {
1221                let mut pipe = redis::pipe();
1222                pipe.atomic();
1223                for m in &due_members {
1224                    pipe.cmd("ZREM").arg(&delayed_key).arg(m).ignore();
1225                    pipe.cmd("ZADD").arg(&pending_key).arg(0).arg(m).ignore();
1226                }
1227                pipe.query::<()>(conn)
1228            })?;
1229        }
1230
1231        // 2) Requeue running tasks past visibility timeout.
1232        let stale: Vec<String> = self.with_conn(|conn| {
1233            redis::cmd("ZRANGEBYSCORE")
1234                .arg(&running_key)
1235                .arg("-inf")
1236                .arg(cutoff)
1237                .query(conn)
1238        })?;
1239
1240        let mut requeued = 0usize;
1241        for task_id in stale {
1242            // Claim the stale record: if another worker already handled it, skip.
1243            let removed: i64 = self.with_conn(|conn| {
1244                redis::cmd("ZREM")
1245                    .arg(&running_key)
1246                    .arg(&task_id)
1247                    .query(conn)
1248            })?;
1249            if removed == 0 {
1250                continue;
1251            }
1252
1253            let id = TaskId::from_string(task_id.clone());
1254            let Some(mut task) = self.get_task(&id)? else {
1255                continue;
1256            };
1257
1258            task.retry_count += 1;
1259            task.claimed_at = None;
1260            task.error = Some("Exceeded visibility timeout".to_string());
1261
1262            if task.retry_count >= task.max_retries {
1263                task.status = TaskStatus::Failed;
1264                let json = serde_json::to_string(&task)
1265                    .map_err(|e| DocketError::Backend(format!("Task serialize failed: {e}")))?;
1266                self.with_conn(|conn| {
1267                    redis::pipe()
1268                        .atomic()
1269                        .cmd("HSET")
1270                        .arg(&tasks_key)
1271                        .arg(&task_id)
1272                        .arg(&json)
1273                        .ignore()
1274                        .cmd("HDEL")
1275                        .arg(&member_key)
1276                        .arg(&task_id)
1277                        .ignore()
1278                        .cmd("HDEL")
1279                        .arg(&type_key)
1280                        .arg(&task_id)
1281                        .ignore()
1282                        .query::<()>(conn)
1283                })?;
1284                continue;
1285            }
1286
1287            task.status = TaskStatus::Pending;
1288            let member = Self::encode_member(&task);
1289            let task_type = task.task_type.clone();
1290            let delayed_key = self.key_delayed(&task_type);
1291            let available_ms = now_ms.saturating_add(self.retry_delay_ms(task.retry_count));
1292
1293            let json = serde_json::to_string(&task)
1294                .map_err(|e| DocketError::Backend(format!("Task serialize failed: {e}")))?;
1295            self.with_conn(|conn| {
1296                redis::pipe()
1297                    .atomic()
1298                    .cmd("HSET")
1299                    .arg(&tasks_key)
1300                    .arg(&task_id)
1301                    .arg(&json)
1302                    .ignore()
1303                    .cmd("HSET")
1304                    .arg(&member_key)
1305                    .arg(&task_id)
1306                    .arg(&member)
1307                    .ignore()
1308                    .cmd("HSET")
1309                    .arg(&type_key)
1310                    .arg(&task_id)
1311                    .arg(&task_type)
1312                    .ignore()
1313                    .cmd("ZADD")
1314                    .arg(&delayed_key)
1315                    .arg(available_ms)
1316                    .arg(&member)
1317                    .ignore()
1318                    .query::<()>(conn)
1319            })?;
1320            requeued += 1;
1321        }
1322
1323        Ok(requeued)
1324    }
1325}
1326
1327// ============================================================================
1328// Docket Client
1329// ============================================================================
1330
1331/// Docket distributed task queue.
1332///
1333/// The main entry point for submitting and managing distributed tasks.
1334pub struct Docket {
1335    backend: Arc<dyn DocketBackend>,
1336    settings: DocketSettings,
1337    task_counter: AtomicU64,
1338}
1339
1340impl Docket {
1341    /// Creates a new Docket with the given settings.
1342    pub fn new(settings: DocketSettings) -> DocketResult<Self> {
1343        let backend: Arc<dyn DocketBackend> = match &settings.backend {
1344            DocketBackendType::Memory => Arc::new(MemoryDocketBackend::new(settings.clone())),
1345            #[cfg(feature = "redis")]
1346            DocketBackendType::Redis(redis_settings) => Arc::new(RedisDocketBackend::new(
1347                redis_settings.clone(),
1348                settings.clone(),
1349            )?),
1350            #[cfg(not(feature = "redis"))]
1351            DocketBackendType::Redis(_) => {
1352                return Err(DocketError::Backend(
1353                    "Redis backend requires 'redis' feature".to_string(),
1354                ));
1355            }
1356        };
1357
1358        Ok(Self {
1359            backend,
1360            settings,
1361            task_counter: AtomicU64::new(0),
1362        })
1363    }
1364
1365    /// Creates a Docket with memory backend (for testing).
1366    #[must_use]
1367    pub fn memory() -> Self {
1368        Self::new(DocketSettings::memory()).expect("memory backend always succeeds")
1369    }
1370
1371    /// Submits a task to the queue.
1372    pub fn submit(
1373        &self,
1374        task_type: impl Into<String>,
1375        params: serde_json::Value,
1376    ) -> DocketResult<TaskId> {
1377        self.submit_with_options(task_type, params, SubmitOptions::default())
1378    }
1379
1380    /// Submits a task with custom options.
1381    pub fn submit_with_options(
1382        &self,
1383        task_type: impl Into<String>,
1384        params: serde_json::Value,
1385        options: SubmitOptions,
1386    ) -> DocketResult<TaskId> {
1387        let counter = self.task_counter.fetch_add(1, Ordering::SeqCst);
1388        let task_id = TaskId::from_string(format!("docket-{counter:08x}"));
1389
1390        let max_retries = options.max_retries.unwrap_or(self.settings.max_retries);
1391        let task = DocketTask::new(
1392            task_id.clone(),
1393            task_type.into(),
1394            params,
1395            options.priority,
1396            max_retries,
1397        );
1398
1399        self.backend.enqueue(task)?;
1400
1401        info!(
1402            target: targets::SERVER,
1403            "Docket: submitted task {} (type: {})",
1404            task_id,
1405            task_id
1406        );
1407
1408        Ok(task_id)
1409    }
1410
1411    /// Gets a task by ID.
1412    pub fn get_task(&self, task_id: &TaskId) -> DocketResult<Option<DocketTask>> {
1413        self.backend.get_task(task_id)
1414    }
1415
1416    /// Lists tasks with optional status filter.
1417    pub fn list_tasks(
1418        &self,
1419        status: Option<TaskStatus>,
1420        limit: usize,
1421    ) -> DocketResult<Vec<DocketTask>> {
1422        self.backend.list_tasks(status, limit)
1423    }
1424
1425    /// Cancels a task.
1426    pub fn cancel(&self, task_id: &TaskId, reason: Option<&str>) -> DocketResult<()> {
1427        self.backend.cancel(task_id, reason)
1428    }
1429
1430    /// Returns queue statistics.
1431    pub fn stats(&self) -> DocketResult<QueueStats> {
1432        self.backend.stats()
1433    }
1434
1435    /// Creates a worker builder.
1436    #[must_use]
1437    pub fn worker(&self) -> WorkerBuilder {
1438        WorkerBuilder::new(Arc::clone(&self.backend), self.settings.clone())
1439    }
1440
1441    /// Returns the settings.
1442    #[must_use]
1443    pub fn settings(&self) -> &DocketSettings {
1444        &self.settings
1445    }
1446
1447    /// Converts to a shared handle.
1448    #[must_use]
1449    pub fn into_shared(self) -> SharedDocket {
1450        Arc::new(self)
1451    }
1452}
1453
1454impl std::fmt::Debug for Docket {
1455    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1456        f.debug_struct("Docket")
1457            .field("settings", &self.settings)
1458            .field("task_counter", &self.task_counter.load(Ordering::SeqCst))
1459            .finish_non_exhaustive()
1460    }
1461}
1462
1463/// Thread-safe handle to a Docket.
1464pub type SharedDocket = Arc<Docket>;
1465
1466// ============================================================================
1467// Worker
1468// ============================================================================
1469
1470/// Task handler function type.
1471pub type TaskHandlerFn = Box<
1472    dyn Fn(DocketTask) -> Pin<Box<dyn Future<Output = DocketResult<serde_json::Value>> + Send>>
1473        + Send
1474        + Sync,
1475>;
1476
1477/// Builder for creating workers.
1478pub struct WorkerBuilder {
1479    backend: Arc<dyn DocketBackend>,
1480    settings: DocketSettings,
1481    handlers: HashMap<String, TaskHandlerFn>,
1482}
1483
1484impl WorkerBuilder {
1485    fn new(backend: Arc<dyn DocketBackend>, settings: DocketSettings) -> Self {
1486        Self {
1487            backend,
1488            settings,
1489            handlers: HashMap::new(),
1490        }
1491    }
1492
1493    /// Subscribes to a task type with a handler.
1494    pub fn subscribe<F, Fut>(mut self, task_type: impl Into<String>, handler: F) -> Self
1495    where
1496        F: Fn(DocketTask) -> Fut + Send + Sync + 'static,
1497        Fut: Future<Output = DocketResult<serde_json::Value>> + Send + 'static,
1498    {
1499        let task_type = task_type.into();
1500        let boxed: TaskHandlerFn = Box::new(move |task| Box::pin(handler(task)));
1501        self.handlers.insert(task_type, boxed);
1502        self
1503    }
1504
1505    /// Builds the worker.
1506    #[must_use]
1507    pub fn build(self) -> Worker {
1508        Worker {
1509            backend: self.backend,
1510            settings: self.settings,
1511            handlers: Arc::new(self.handlers),
1512            running: Arc::new(AtomicBool::new(false)),
1513        }
1514    }
1515}
1516
1517/// A worker that processes tasks from the queue.
1518pub struct Worker {
1519    backend: Arc<dyn DocketBackend>,
1520    settings: DocketSettings,
1521    handlers: Arc<HashMap<String, TaskHandlerFn>>,
1522    running: Arc<AtomicBool>,
1523}
1524
1525impl Worker {
1526    /// Returns the task types this worker is subscribed to.
1527    #[must_use]
1528    pub fn subscribed_types(&self) -> Vec<String> {
1529        self.handlers.keys().cloned().collect()
1530    }
1531
1532    /// Returns whether the worker is running.
1533    #[must_use]
1534    pub fn is_running(&self) -> bool {
1535        self.running.load(Ordering::SeqCst)
1536    }
1537
1538    /// Stops the worker.
1539    pub fn stop(&self) {
1540        self.running.store(false, Ordering::SeqCst);
1541    }
1542
1543    /// Processes a single task if available.
1544    ///
1545    /// Returns true if a task was processed, false if no task was available.
1546    pub async fn process_one(&self, cx: &Cx) -> DocketResult<bool> {
1547        let task_types = self.subscribed_types();
1548
1549        // Check for cancellation
1550        if cx.is_cancel_requested() {
1551            return Err(DocketError::Cancelled);
1552        }
1553
1554        // Try to dequeue a task
1555        let Some(task) = self.backend.dequeue(&task_types)? else {
1556            return Ok(false);
1557        };
1558
1559        let task_id = task.id.clone();
1560        let task_type = task.task_type.clone();
1561
1562        debug!(
1563            target: targets::SERVER,
1564            "Docket worker: processing task {} (type: {})",
1565            task_id,
1566            task_type
1567        );
1568
1569        // Get handler
1570        let Some(handler) = self.handlers.get(&task_type) else {
1571            // This shouldn't happen since we only dequeue subscribed types
1572            self.backend.nack(&task_id, "No handler for task type")?;
1573            return Ok(true);
1574        };
1575
1576        // Execute handler
1577        let result = handler(task).await;
1578
1579        match result {
1580            Ok(data) => {
1581                self.backend.ack(&task_id, data)?;
1582                info!(
1583                    target: targets::SERVER,
1584                    "Docket worker: completed task {}",
1585                    task_id
1586                );
1587            }
1588            Err(e) => {
1589                let error_msg = e.to_string();
1590                self.backend.nack(&task_id, &error_msg)?;
1591                warn!(
1592                    target: targets::SERVER,
1593                    "Docket worker: task {} failed: {}",
1594                    task_id,
1595                    error_msg
1596                );
1597            }
1598        }
1599
1600        Ok(true)
1601    }
1602
1603    /// Runs the worker loop until stopped.
1604    pub async fn run(&self, cx: &Cx) -> DocketResult<()> {
1605        self.running.store(true, Ordering::SeqCst);
1606
1607        info!(
1608            target: targets::SERVER,
1609            "Docket worker starting with subscriptions: {:?}",
1610            self.subscribed_types()
1611        );
1612
1613        while self.running.load(Ordering::SeqCst) {
1614            // Check for cancellation
1615            if cx.is_cancel_requested() {
1616                break;
1617            }
1618
1619            // Requeue stale tasks periodically
1620            let _ = self.backend.requeue_stale();
1621
1622            // Process tasks
1623            match self.process_one(cx).await {
1624                Ok(true) => {
1625                    // Processed a task, immediately try for another
1626                    continue;
1627                }
1628                Ok(false) => {
1629                    // No task available, wait before polling again
1630                    std::thread::sleep(self.settings.poll_interval);
1631                }
1632                Err(DocketError::Cancelled) => {
1633                    break;
1634                }
1635                Err(e) => {
1636                    warn!(
1637                        target: targets::SERVER,
1638                        "Docket worker error: {}",
1639                        e
1640                    );
1641                    // Brief pause on error before retrying
1642                    std::thread::sleep(Duration::from_millis(100));
1643                }
1644            }
1645        }
1646
1647        self.running.store(false, Ordering::SeqCst);
1648        info!(target: targets::SERVER, "Docket worker stopped");
1649
1650        Ok(())
1651    }
1652}
1653
1654impl std::fmt::Debug for Worker {
1655    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1656        f.debug_struct("Worker")
1657            .field("subscribed_types", &self.subscribed_types())
1658            .field("running", &self.is_running())
1659            .finish_non_exhaustive()
1660    }
1661}
1662
1663// ============================================================================
1664// Tests
1665// ============================================================================
1666
1667#[cfg(test)]
1668mod tests {
1669    use super::*;
1670    #[cfg(feature = "redis")]
1671    use std::sync::atomic::{AtomicU64, Ordering as AtomicOrdering};
1672
1673    #[cfg(feature = "redis")]
1674    use std::net::TcpListener;
1675    #[cfg(feature = "redis")]
1676    use std::process::{Child, Command, Stdio};
1677    #[cfg(feature = "redis")]
1678    use std::time::Instant;
1679
1680    #[cfg(feature = "redis")]
1681    static REDIS_TEST_SEQ: AtomicU64 = AtomicU64::new(1);
1682
1683    #[cfg(feature = "redis")]
1684    fn next_test_token(label: &str) -> String {
1685        let seq = REDIS_TEST_SEQ.fetch_add(1, AtomicOrdering::SeqCst);
1686        format!("{label}-{}-{seq}", std::process::id())
1687    }
1688
1689    #[cfg(feature = "redis")]
1690    struct TestRedisServer {
1691        child: Child,
1692        url: String,
1693    }
1694
1695    #[cfg(feature = "redis")]
1696    impl TestRedisServer {
1697        fn start() -> Self {
1698            let listener = TcpListener::bind("127.0.0.1:0").expect("bind ephemeral redis port");
1699            let port = listener.local_addr().expect("redis test local addr").port();
1700            drop(listener);
1701
1702            let child = Command::new("redis-server")
1703                .arg("--port")
1704                .arg(port.to_string())
1705                .arg("--save")
1706                .arg("")
1707                .arg("--appendonly")
1708                .arg("no")
1709                .arg("--bind")
1710                .arg("127.0.0.1")
1711                .arg("--protected-mode")
1712                .arg("no")
1713                .stdout(Stdio::null())
1714                .stderr(Stdio::null())
1715                .spawn()
1716                .expect("spawn redis-server");
1717
1718            let url = format!("redis://127.0.0.1:{port}/");
1719            let deadline = Instant::now() + Duration::from_secs(5);
1720            loop {
1721                let ready = redis::Client::open(url.as_str())
1722                    .ok()
1723                    .and_then(|client| client.get_connection().ok())
1724                    .and_then(|mut conn| redis::cmd("PING").query::<String>(&mut conn).ok())
1725                    .is_some_and(|pong| pong == "PONG");
1726
1727                if ready {
1728                    break;
1729                }
1730                assert!(
1731                    Instant::now() < deadline,
1732                    "redis-server did not become ready"
1733                );
1734                std::thread::sleep(Duration::from_millis(20));
1735            }
1736
1737            Self { child, url }
1738        }
1739    }
1740
1741    #[cfg(feature = "redis")]
1742    impl Drop for TestRedisServer {
1743        fn drop(&mut self) {
1744            let _ = self.child.kill();
1745            let _ = self.child.wait();
1746        }
1747    }
1748
1749    #[cfg(feature = "redis")]
1750    fn redis_settings_for_test(url: &str) -> DocketSettings {
1751        let mut settings = DocketSettings::redis(url);
1752        settings.queue_prefix = format!("fastmcp:docket:test:{}", next_test_token("queue"));
1753        settings.poll_interval = Duration::from_millis(1);
1754        settings.retry_delay = Duration::from_millis(0);
1755        settings
1756    }
1757
1758    #[test]
1759    fn test_docket_settings_default() {
1760        let settings = DocketSettings::default();
1761        assert!(matches!(settings.backend, DocketBackendType::Memory));
1762        assert_eq!(settings.max_retries, 3);
1763    }
1764
1765    #[test]
1766    fn test_docket_settings_redis() {
1767        let settings = DocketSettings::redis("redis://localhost:6379");
1768        assert!(matches!(settings.backend, DocketBackendType::Redis(_)));
1769    }
1770
1771    #[test]
1772    fn test_docket_settings_builder() {
1773        let settings = DocketSettings::memory()
1774            .with_queue_prefix("test:queue")
1775            .with_max_retries(5)
1776            .with_poll_interval(Duration::from_millis(50));
1777
1778        assert_eq!(settings.queue_prefix, "test:queue");
1779        assert_eq!(settings.max_retries, 5);
1780        assert_eq!(settings.poll_interval, Duration::from_millis(50));
1781    }
1782
1783    #[test]
1784    fn test_docket_memory_creation() {
1785        let docket = Docket::memory();
1786        assert!(matches!(docket.settings.backend, DocketBackendType::Memory));
1787    }
1788
1789    #[test]
1790    fn test_docket_submit_task() {
1791        let docket = Docket::memory();
1792
1793        let task_id = docket
1794            .submit("test_task", serde_json::json!({"key": "value"}))
1795            .unwrap();
1796
1797        assert!(task_id.to_string().starts_with("docket-"));
1798
1799        // Verify task exists
1800        let task = docket.get_task(&task_id).unwrap().unwrap();
1801        assert_eq!(task.task_type, "test_task");
1802        assert_eq!(task.status, TaskStatus::Pending);
1803    }
1804
1805    #[test]
1806    fn test_docket_submit_with_priority() {
1807        let docket = Docket::memory();
1808
1809        let low_id = docket
1810            .submit_with_options(
1811                "task",
1812                serde_json::json!({"priority": "low"}),
1813                SubmitOptions::new().with_priority(1),
1814            )
1815            .unwrap();
1816
1817        let high_id = docket
1818            .submit_with_options(
1819                "task",
1820                serde_json::json!({"priority": "high"}),
1821                SubmitOptions::new().with_priority(10),
1822            )
1823            .unwrap();
1824
1825        // High priority should be dequeued first
1826        let worker = docket
1827            .worker()
1828            .subscribe("task", |t| async move { Ok(t.params) })
1829            .build();
1830
1831        let types = worker.subscribed_types();
1832        let dequeued = docket.backend.dequeue(&types).unwrap().unwrap();
1833        assert_eq!(dequeued.id, high_id);
1834
1835        // Ack it
1836        docket.backend.ack(&high_id, serde_json::json!({})).unwrap();
1837
1838        // Now low priority should be available
1839        let dequeued = docket.backend.dequeue(&types).unwrap().unwrap();
1840        assert_eq!(dequeued.id, low_id);
1841    }
1842
1843    #[test]
1844    fn test_docket_cancel_task() {
1845        let docket = Docket::memory();
1846
1847        let task_id = docket.submit("task", serde_json::json!({})).unwrap();
1848
1849        docket.cancel(&task_id, Some("User cancelled")).unwrap();
1850
1851        let task = docket.get_task(&task_id).unwrap().unwrap();
1852        assert_eq!(task.status, TaskStatus::Cancelled);
1853        assert_eq!(task.error, Some("User cancelled".to_string()));
1854    }
1855
1856    #[test]
1857    fn test_docket_stats() {
1858        let docket = Docket::memory();
1859
1860        docket.submit("task1", serde_json::json!({})).unwrap();
1861        docket.submit("task2", serde_json::json!({})).unwrap();
1862        let task3 = docket.submit("task3", serde_json::json!({})).unwrap();
1863        docket.cancel(&task3, None).unwrap();
1864
1865        let stats = docket.stats().unwrap();
1866        assert_eq!(stats.pending, 2);
1867        assert_eq!(stats.cancelled, 1);
1868    }
1869
1870    #[test]
1871    fn test_docket_list_tasks() {
1872        let docket = Docket::memory();
1873
1874        docket.submit("type_a", serde_json::json!({})).unwrap();
1875        docket.submit("type_b", serde_json::json!({})).unwrap();
1876        let cancelled_id = docket.submit("type_a", serde_json::json!({})).unwrap();
1877        docket.cancel(&cancelled_id, None).unwrap();
1878
1879        // All tasks
1880        let all = docket.list_tasks(None, 100).unwrap();
1881        assert_eq!(all.len(), 3);
1882
1883        // Pending only
1884        let pending = docket.list_tasks(Some(TaskStatus::Pending), 100).unwrap();
1885        assert_eq!(pending.len(), 2);
1886
1887        // Cancelled only
1888        let cancelled = docket.list_tasks(Some(TaskStatus::Cancelled), 100).unwrap();
1889        assert_eq!(cancelled.len(), 1);
1890    }
1891
1892    #[test]
1893    fn test_worker_builder() {
1894        let docket = Docket::memory();
1895
1896        let worker = docket
1897            .worker()
1898            .subscribe("type_a", |_| async { Ok(serde_json::json!({})) })
1899            .subscribe("type_b", |_| async { Ok(serde_json::json!({})) })
1900            .build();
1901
1902        let types = worker.subscribed_types();
1903        assert!(types.contains(&"type_a".to_string()));
1904        assert!(types.contains(&"type_b".to_string()));
1905    }
1906
1907    #[test]
1908    fn test_memory_backend_retry() {
1909        let settings = DocketSettings::memory().with_max_retries(2);
1910        let backend = MemoryDocketBackend::new(settings);
1911
1912        let task = DocketTask::new(
1913            TaskId::from_string("test-1"),
1914            "retry_test".to_string(),
1915            serde_json::json!({}),
1916            0,
1917            2,
1918        );
1919
1920        backend.enqueue(task).unwrap();
1921
1922        // Dequeue and nack (first failure)
1923        let task = backend
1924            .dequeue(&["retry_test".to_string()])
1925            .unwrap()
1926            .unwrap();
1927        backend.nack(&task.id, "error 1").unwrap();
1928
1929        // Should be requeued
1930        let task = backend
1931            .dequeue(&["retry_test".to_string()])
1932            .unwrap()
1933            .unwrap();
1934        assert_eq!(task.retry_count, 1);
1935        backend.nack(&task.id, "error 2").unwrap();
1936
1937        // Max retries exceeded - should be failed, not requeued
1938        let task = backend.dequeue(&["retry_test".to_string()]).unwrap();
1939        assert!(task.is_none());
1940
1941        // Verify it's marked as failed
1942        let task = backend
1943            .get_task(&TaskId::from_string("test-1"))
1944            .unwrap()
1945            .unwrap();
1946        assert_eq!(task.status, TaskStatus::Failed);
1947    }
1948
1949    #[test]
1950    fn test_docket_task_to_info() {
1951        let task = DocketTask::new(
1952            TaskId::from_string("test-info"),
1953            "test_type".to_string(),
1954            serde_json::json!({"data": 42}),
1955            5,
1956            3,
1957        );
1958
1959        let info = task.to_task_info();
1960        assert_eq!(info.id.to_string(), "test-info");
1961        assert_eq!(info.task_type, "test_type");
1962        assert_eq!(info.status, TaskStatus::Pending);
1963        assert!(info.started_at.is_none());
1964    }
1965
1966    #[test]
1967    fn test_worker_process_one() {
1968        use fastmcp_core::block_on;
1969
1970        let docket = Docket::memory();
1971
1972        // Submit a task
1973        let task_id = docket
1974            .submit("process_test", serde_json::json!({"x": 1}))
1975            .unwrap();
1976
1977        // Create worker
1978        let worker = docket
1979            .worker()
1980            .subscribe("process_test", |task| async move {
1981                let x = task.params.get("x").and_then(|v| v.as_i64()).unwrap_or(0);
1982                Ok(serde_json::json!({"result": x * 2}))
1983            })
1984            .build();
1985
1986        // Process
1987        let cx = Cx::for_testing();
1988        let processed = block_on(worker.process_one(&cx)).unwrap();
1989        assert!(processed);
1990
1991        // Verify completion
1992        let task = docket.get_task(&task_id).unwrap().unwrap();
1993        assert_eq!(task.status, TaskStatus::Completed);
1994        assert_eq!(task.result, Some(serde_json::json!({"result": 2})));
1995    }
1996
1997    #[test]
1998    fn test_worker_no_task_available() {
1999        use fastmcp_core::block_on;
2000
2001        let docket = Docket::memory();
2002
2003        let worker = docket
2004            .worker()
2005            .subscribe("empty_test", |_| async { Ok(serde_json::json!({})) })
2006            .build();
2007
2008        let cx = Cx::for_testing();
2009        let processed = block_on(worker.process_one(&cx)).unwrap();
2010        assert!(!processed);
2011    }
2012
2013    #[test]
2014    fn test_submit_options() {
2015        let opts = SubmitOptions::new()
2016            .with_priority(10)
2017            .with_max_retries(5)
2018            .with_delay(Duration::from_secs(60));
2019
2020        assert_eq!(opts.priority, 10);
2021        assert_eq!(opts.max_retries, Some(5));
2022        assert_eq!(opts.delay, Some(Duration::from_secs(60)));
2023    }
2024
2025    #[test]
2026    fn test_docket_error_display() {
2027        let errors = vec![
2028            (
2029                DocketError::NotFound("task-1".into()),
2030                "Task not found: task-1",
2031            ),
2032            (
2033                DocketError::Connection("refused".into()),
2034                "Connection error: refused",
2035            ),
2036            (DocketError::Handler("panic".into()), "Handler error: panic"),
2037            (DocketError::Cancelled, "Operation cancelled"),
2038        ];
2039
2040        for (error, expected) in errors {
2041            assert_eq!(error.to_string(), expected);
2042        }
2043    }
2044
2045    #[cfg(feature = "redis")]
2046    #[test]
2047    fn test_redis_worker_process_round_trip() {
2048        use fastmcp_core::block_on;
2049
2050        let redis_server = TestRedisServer::start();
2051        let settings = redis_settings_for_test(&redis_server.url);
2052        let docket = Docket::new(settings).expect("redis docket");
2053
2054        let task_id = docket
2055            .submit("redis_round_trip", serde_json::json!({"value": 21}))
2056            .expect("submit");
2057
2058        let worker = docket
2059            .worker()
2060            .subscribe("redis_round_trip", |task| async move {
2061                let value = task
2062                    .params
2063                    .get("value")
2064                    .and_then(|v| v.as_i64())
2065                    .unwrap_or(0);
2066                Ok(serde_json::json!({"doubled": value * 2}))
2067            })
2068            .build();
2069
2070        let cx = Cx::for_testing();
2071        let processed = block_on(worker.process_one(&cx)).expect("process one");
2072        assert!(processed);
2073
2074        let task = docket
2075            .get_task(&task_id)
2076            .expect("get task")
2077            .expect("task exists");
2078        assert_eq!(task.status, TaskStatus::Completed);
2079        assert_eq!(task.result, Some(serde_json::json!({"doubled": 42})));
2080    }
2081
2082    #[cfg(feature = "redis")]
2083    #[test]
2084    fn test_redis_worker_retries_then_marks_failed() {
2085        use fastmcp_core::block_on;
2086
2087        let redis_server = TestRedisServer::start();
2088        let mut settings = redis_settings_for_test(&redis_server.url);
2089        settings.max_retries = 2;
2090        let docket = Docket::new(settings).expect("redis docket");
2091
2092        let task_id = docket
2093            .submit("redis_retry", serde_json::json!({"attempt": 0}))
2094            .expect("submit");
2095
2096        let worker = docket
2097            .worker()
2098            .subscribe("redis_retry", |_task| async move {
2099                Err(DocketError::Handler("boom".to_string()))
2100            })
2101            .build();
2102
2103        let cx = Cx::for_testing();
2104        assert!(block_on(worker.process_one(&cx)).expect("process first"));
2105        assert!(block_on(worker.process_one(&cx)).expect("process second"));
2106
2107        let task = docket
2108            .get_task(&task_id)
2109            .expect("get task")
2110            .expect("task exists");
2111        assert_eq!(task.status, TaskStatus::Failed);
2112        assert_eq!(task.retry_count, 2);
2113        assert!(task.error.unwrap_or_default().contains("boom"));
2114    }
2115
2116    #[cfg(feature = "redis")]
2117    #[test]
2118    fn test_redis_cancel_pending_task() {
2119        let redis_server = TestRedisServer::start();
2120        let settings = redis_settings_for_test(&redis_server.url);
2121        let docket = Docket::new(settings).expect("redis docket");
2122
2123        let task_id = docket
2124            .submit("redis_cancel", serde_json::json!({"x": 1}))
2125            .expect("submit");
2126        docket
2127            .cancel(&task_id, Some("stopped by test"))
2128            .expect("cancel task");
2129
2130        let task = docket
2131            .get_task(&task_id)
2132            .expect("get task")
2133            .expect("task exists");
2134        assert_eq!(task.status, TaskStatus::Cancelled);
2135        assert_eq!(task.error, Some("stopped by test".to_string()));
2136    }
2137
2138    #[cfg(feature = "redis")]
2139    #[test]
2140    fn test_redis_requeue_stale_running_task() {
2141        let redis_server = TestRedisServer::start();
2142        let mut settings = redis_settings_for_test(&redis_server.url);
2143        settings.visibility_timeout = Duration::from_millis(0);
2144        let docket = Docket::new(settings).expect("redis docket");
2145
2146        let task_id = docket
2147            .submit("redis_stale", serde_json::json!({"x": 1}))
2148            .expect("submit");
2149
2150        let task_types = vec!["redis_stale".to_string()];
2151        let claimed = docket
2152            .backend
2153            .dequeue(&task_types)
2154            .expect("dequeue")
2155            .expect("claimed task");
2156        assert_eq!(claimed.id, task_id);
2157        assert_eq!(claimed.status, TaskStatus::Running);
2158
2159        let requeued = docket.backend.requeue_stale().expect("requeue stale");
2160        assert_eq!(requeued, 1);
2161
2162        let reclaimed = docket
2163            .backend
2164            .dequeue(&task_types)
2165            .expect("dequeue after requeue")
2166            .expect("task available again");
2167        assert_eq!(reclaimed.id, task_id);
2168        assert_eq!(reclaimed.retry_count, 1);
2169    }
2170
2171    // ========================================
2172    // DocketSettings — additional
2173    // ========================================
2174
2175    #[test]
2176    fn docket_settings_memory_equals_default() {
2177        let mem = DocketSettings::memory();
2178        let def = DocketSettings::default();
2179        assert_eq!(mem.queue_prefix, def.queue_prefix);
2180        assert_eq!(mem.max_retries, def.max_retries);
2181        assert_eq!(mem.visibility_timeout, def.visibility_timeout);
2182        assert_eq!(mem.default_task_timeout, def.default_task_timeout);
2183        assert_eq!(mem.retry_delay, def.retry_delay);
2184        assert_eq!(mem.poll_interval, def.poll_interval);
2185    }
2186
2187    #[test]
2188    fn docket_settings_with_visibility_timeout() {
2189        let s = DocketSettings::memory().with_visibility_timeout(Duration::from_secs(120));
2190        assert_eq!(s.visibility_timeout, Duration::from_secs(120));
2191    }
2192
2193    #[test]
2194    fn docket_settings_debug() {
2195        let s = DocketSettings::memory();
2196        let debug = format!("{:?}", s);
2197        assert!(debug.contains("DocketSettings"));
2198        assert!(debug.contains("fastmcp:docket"));
2199    }
2200
2201    #[test]
2202    fn docket_settings_clone() {
2203        let s = DocketSettings::memory().with_max_retries(7);
2204        let c = s.clone();
2205        assert_eq!(c.max_retries, 7);
2206    }
2207
2208    #[test]
2209    fn docket_settings_redis_pool_size() {
2210        let s = DocketSettings::redis("redis://localhost:6379");
2211        match s.backend {
2212            DocketBackendType::Redis(ref r) => {
2213                assert_eq!(r.pool_size, 10);
2214                assert_eq!(r.connect_timeout, Duration::from_secs(5));
2215                assert_eq!(r.url, "redis://localhost:6379");
2216            }
2217            _ => panic!("expected Redis backend"),
2218        }
2219    }
2220
2221    // ========================================
2222    // DocketBackendType / RedisSettings
2223    // ========================================
2224
2225    #[test]
2226    fn docket_backend_type_debug() {
2227        let mem = DocketBackendType::Memory;
2228        let debug = format!("{:?}", mem);
2229        assert!(debug.contains("Memory"));
2230
2231        let redis = DocketBackendType::Redis(RedisSettings {
2232            url: "redis://test".to_string(),
2233            pool_size: 5,
2234            connect_timeout: Duration::from_secs(3),
2235        });
2236        let debug = format!("{:?}", redis);
2237        assert!(debug.contains("Redis"));
2238    }
2239
2240    #[test]
2241    fn redis_settings_clone() {
2242        let r = RedisSettings {
2243            url: "redis://host".to_string(),
2244            pool_size: 3,
2245            connect_timeout: Duration::from_secs(2),
2246        };
2247        let c = r.clone();
2248        assert_eq!(c.url, "redis://host");
2249        assert_eq!(c.pool_size, 3);
2250    }
2251
2252    // ========================================
2253    // DocketTask
2254    // ========================================
2255
2256    #[test]
2257    fn docket_task_new_fields() {
2258        let task = DocketTask::new(
2259            TaskId::from_string("t-1"),
2260            "my_type".to_string(),
2261            serde_json::json!({"key": "val"}),
2262            5,
2263            3,
2264        );
2265        assert_eq!(task.task_type, "my_type");
2266        assert_eq!(task.priority, 5);
2267        assert_eq!(task.max_retries, 3);
2268        assert_eq!(task.retry_count, 0);
2269        assert_eq!(task.status, TaskStatus::Pending);
2270        assert!(task.claimed_at.is_none());
2271        assert!(task.error.is_none());
2272        assert!(task.result.is_none());
2273        assert!(!task.created_at.is_empty());
2274    }
2275
2276    #[test]
2277    fn docket_task_debug_and_clone() {
2278        let task = DocketTask::new(
2279            TaskId::from_string("t-dbg"),
2280            "dbg_type".to_string(),
2281            serde_json::json!(null),
2282            0,
2283            1,
2284        );
2285        let debug = format!("{:?}", task);
2286        assert!(debug.contains("DocketTask"));
2287        assert!(debug.contains("dbg_type"));
2288
2289        let cloned = task.clone();
2290        assert_eq!(cloned.task_type, "dbg_type");
2291    }
2292
2293    #[test]
2294    fn docket_task_serialize_deserialize_roundtrip() {
2295        let task = DocketTask::new(
2296            TaskId::from_string("t-ser"),
2297            "ser_type".to_string(),
2298            serde_json::json!({"a": 1}),
2299            2,
2300            4,
2301        );
2302        let json = serde_json::to_string(&task).unwrap();
2303        let deserialized: DocketTask = serde_json::from_str(&json).unwrap();
2304        assert_eq!(deserialized.task_type, "ser_type");
2305        assert_eq!(deserialized.priority, 2);
2306        assert_eq!(deserialized.max_retries, 4);
2307    }
2308
2309    #[test]
2310    fn docket_task_to_task_info_pending() {
2311        let task = DocketTask::new(
2312            TaskId::from_string("t-info"),
2313            "info_type".to_string(),
2314            serde_json::json!({}),
2315            0,
2316            3,
2317        );
2318        let info = task.to_task_info();
2319        assert_eq!(info.status, TaskStatus::Pending);
2320        assert!(info.started_at.is_none());
2321        assert!(info.completed_at.is_none()); // Pending is not terminal
2322    }
2323
2324    #[test]
2325    fn docket_task_to_task_info_completed() {
2326        let mut task = DocketTask::new(
2327            TaskId::from_string("t-comp"),
2328            "comp_type".to_string(),
2329            serde_json::json!({}),
2330            0,
2331            3,
2332        );
2333        task.status = TaskStatus::Completed;
2334        task.claimed_at = Some("2025-01-01T00:00:00Z".to_string());
2335        let info = task.to_task_info();
2336        assert_eq!(info.status, TaskStatus::Completed);
2337        assert!(info.started_at.is_some());
2338        assert!(info.completed_at.is_some()); // Completed is terminal
2339    }
2340
2341    #[test]
2342    fn docket_task_to_task_result_non_terminal() {
2343        let task = DocketTask::new(
2344            TaskId::from_string("t-res"),
2345            "res_type".to_string(),
2346            serde_json::json!({}),
2347            0,
2348            3,
2349        );
2350        assert!(task.to_task_result().is_none());
2351    }
2352
2353    #[test]
2354    fn docket_task_to_task_result_completed() {
2355        let mut task = DocketTask::new(
2356            TaskId::from_string("t-res2"),
2357            "res_type".to_string(),
2358            serde_json::json!({}),
2359            0,
2360            3,
2361        );
2362        task.status = TaskStatus::Completed;
2363        task.result = Some(serde_json::json!({"data": 42}));
2364        let result = task.to_task_result().unwrap();
2365        assert!(result.success);
2366        assert_eq!(result.data, Some(serde_json::json!({"data": 42})));
2367        assert!(result.error.is_none());
2368    }
2369
2370    #[test]
2371    fn docket_task_to_task_result_failed() {
2372        let mut task = DocketTask::new(
2373            TaskId::from_string("t-fail"),
2374            "fail_type".to_string(),
2375            serde_json::json!({}),
2376            0,
2377            3,
2378        );
2379        task.status = TaskStatus::Failed;
2380        task.error = Some("something broke".to_string());
2381        let result = task.to_task_result().unwrap();
2382        assert!(!result.success);
2383        assert_eq!(result.error, Some("something broke".to_string()));
2384    }
2385
2386    // ========================================
2387    // SubmitOptions — additional
2388    // ========================================
2389
2390    #[test]
2391    fn submit_options_default() {
2392        let opts = SubmitOptions::default();
2393        assert_eq!(opts.priority, 0);
2394        assert!(opts.max_retries.is_none());
2395        assert!(opts.delay.is_none());
2396    }
2397
2398    #[test]
2399    fn submit_options_debug() {
2400        let opts = SubmitOptions::new().with_priority(3);
2401        let debug = format!("{:?}", opts);
2402        assert!(debug.contains("SubmitOptions"));
2403        assert!(debug.contains('3'));
2404    }
2405
2406    // ========================================
2407    // DocketError — additional
2408    // ========================================
2409
2410    #[test]
2411    fn docket_error_serialization_display() {
2412        let err = DocketError::Serialization("bad json".to_string());
2413        assert_eq!(err.to_string(), "Serialization error: bad json");
2414    }
2415
2416    #[test]
2417    fn docket_error_backend_display() {
2418        let err = DocketError::Backend("lock poisoned".to_string());
2419        assert_eq!(err.to_string(), "Backend error: lock poisoned");
2420    }
2421
2422    #[test]
2423    fn docket_error_is_std_error() {
2424        let err = DocketError::NotFound("x".to_string());
2425        let _: &dyn std::error::Error = &err;
2426    }
2427
2428    #[test]
2429    fn docket_error_into_mcp_error() {
2430        let err = DocketError::Handler("timeout".to_string());
2431        let mcp: McpError = err.into();
2432        assert!(mcp.message.contains("Handler error: timeout"));
2433    }
2434
2435    #[test]
2436    fn docket_error_debug() {
2437        let err = DocketError::Cancelled;
2438        let debug = format!("{:?}", err);
2439        assert!(debug.contains("Cancelled"));
2440    }
2441
2442    // ========================================
2443    // QueueStats
2444    // ========================================
2445
2446    #[test]
2447    fn queue_stats_default() {
2448        let stats = QueueStats::default();
2449        assert_eq!(stats.pending, 0);
2450        assert_eq!(stats.in_progress, 0);
2451        assert_eq!(stats.completed, 0);
2452        assert_eq!(stats.failed, 0);
2453        assert_eq!(stats.cancelled, 0);
2454    }
2455
2456    #[test]
2457    fn queue_stats_debug_and_clone() {
2458        let stats = QueueStats {
2459            pending: 1,
2460            in_progress: 2,
2461            completed: 3,
2462            failed: 4,
2463            cancelled: 5,
2464        };
2465        let debug = format!("{:?}", stats);
2466        assert!(debug.contains("QueueStats"));
2467
2468        let c = stats.clone();
2469        assert_eq!(c.pending, 1);
2470        assert_eq!(c.completed, 3);
2471    }
2472
2473    // ========================================
2474    // MemoryDocketBackend — additional
2475    // ========================================
2476
2477    #[test]
2478    fn memory_backend_dequeue_empty() {
2479        let backend = MemoryDocketBackend::new(DocketSettings::memory());
2480        let result = backend.dequeue(&["any".to_string()]).unwrap();
2481        assert!(result.is_none());
2482    }
2483
2484    #[test]
2485    fn memory_backend_dequeue_wrong_type() {
2486        let backend = MemoryDocketBackend::new(DocketSettings::memory());
2487        let task = DocketTask::new(
2488            TaskId::from_string("t-1"),
2489            "type_a".to_string(),
2490            serde_json::json!({}),
2491            0,
2492            3,
2493        );
2494        backend.enqueue(task).unwrap();
2495
2496        // Request wrong type — should return None
2497        let result = backend.dequeue(&["type_b".to_string()]).unwrap();
2498        assert!(result.is_none());
2499    }
2500
2501    #[test]
2502    fn memory_backend_dequeue_sets_running() {
2503        let backend = MemoryDocketBackend::new(DocketSettings::memory());
2504        let task = DocketTask::new(
2505            TaskId::from_string("t-run"),
2506            "work".to_string(),
2507            serde_json::json!({}),
2508            0,
2509            3,
2510        );
2511        backend.enqueue(task).unwrap();
2512
2513        let dequeued = backend.dequeue(&["work".to_string()]).unwrap().unwrap();
2514        assert_eq!(dequeued.status, TaskStatus::Running);
2515        assert!(dequeued.claimed_at.is_some());
2516    }
2517
2518    #[test]
2519    fn memory_backend_ack_nonexistent() {
2520        let backend = MemoryDocketBackend::new(DocketSettings::memory());
2521        let result = backend.ack(&TaskId::from_string("nonexistent"), serde_json::json!({}));
2522        assert!(result.is_err());
2523    }
2524
2525    #[test]
2526    fn memory_backend_nack_nonexistent() {
2527        let backend = MemoryDocketBackend::new(DocketSettings::memory());
2528        let result = backend.nack(&TaskId::from_string("nonexistent"), "err");
2529        assert!(result.is_err());
2530    }
2531
2532    #[test]
2533    fn memory_backend_get_task_nonexistent() {
2534        let backend = MemoryDocketBackend::new(DocketSettings::memory());
2535        let result = backend.get_task(&TaskId::from_string("missing")).unwrap();
2536        assert!(result.is_none());
2537    }
2538
2539    #[test]
2540    fn memory_backend_cancel_terminal_task_fails() {
2541        let backend = MemoryDocketBackend::new(DocketSettings::memory());
2542        let task = DocketTask::new(
2543            TaskId::from_string("t-can"),
2544            "cancel_test".to_string(),
2545            serde_json::json!({}),
2546            0,
2547            3,
2548        );
2549        backend.enqueue(task).unwrap();
2550
2551        let id = TaskId::from_string("t-can");
2552        backend.cancel(&id, None).unwrap();
2553
2554        // Cancelling again should fail (already in terminal state)
2555        let result = backend.cancel(&id, Some("again"));
2556        assert!(result.is_err());
2557    }
2558
2559    #[test]
2560    fn memory_backend_cancel_removes_from_pending() {
2561        let backend = MemoryDocketBackend::new(DocketSettings::memory());
2562        let task = DocketTask::new(
2563            TaskId::from_string("t-crp"),
2564            "cancel_pend".to_string(),
2565            serde_json::json!({}),
2566            0,
2567            3,
2568        );
2569        backend.enqueue(task).unwrap();
2570
2571        let id = TaskId::from_string("t-crp");
2572        backend.cancel(&id, Some("bye")).unwrap();
2573
2574        // Should not be dequeued
2575        let result = backend.dequeue(&["cancel_pend".to_string()]).unwrap();
2576        assert!(result.is_none());
2577    }
2578
2579    #[test]
2580    fn memory_backend_list_tasks_with_limit() {
2581        let backend = MemoryDocketBackend::new(DocketSettings::memory());
2582        for i in 0..5 {
2583            let task = DocketTask::new(
2584                TaskId::from_string(format!("t-{i}")),
2585                "limit_test".to_string(),
2586                serde_json::json!({}),
2587                0,
2588                3,
2589            );
2590            backend.enqueue(task).unwrap();
2591        }
2592
2593        let tasks = backend.list_tasks(None, 3).unwrap();
2594        assert_eq!(tasks.len(), 3);
2595    }
2596
2597    #[test]
2598    fn memory_backend_list_tasks_filter_by_status() {
2599        let backend = MemoryDocketBackend::new(DocketSettings::memory());
2600        for i in 0..3 {
2601            let task = DocketTask::new(
2602                TaskId::from_string(format!("t-{i}")),
2603                "filter_test".to_string(),
2604                serde_json::json!({}),
2605                0,
2606                3,
2607            );
2608            backend.enqueue(task).unwrap();
2609        }
2610        // Cancel one
2611        backend.cancel(&TaskId::from_string("t-1"), None).unwrap();
2612
2613        let pending = backend.list_tasks(Some(TaskStatus::Pending), 100).unwrap();
2614        assert_eq!(pending.len(), 2);
2615
2616        let cancelled = backend
2617            .list_tasks(Some(TaskStatus::Cancelled), 100)
2618            .unwrap();
2619        assert_eq!(cancelled.len(), 1);
2620    }
2621
2622    #[test]
2623    fn memory_backend_stats_all_statuses() {
2624        let backend = MemoryDocketBackend::new(DocketSettings::memory());
2625
2626        // Create pending
2627        for i in 0..2 {
2628            let task = DocketTask::new(
2629                TaskId::from_string(format!("p-{i}")),
2630                "stat_test".to_string(),
2631                serde_json::json!({}),
2632                0,
2633                3,
2634            );
2635            backend.enqueue(task).unwrap();
2636        }
2637
2638        // Dequeue one → Running
2639        let _running = backend.dequeue(&["stat_test".to_string()]).unwrap();
2640
2641        // Ack → Completed (re-create and dequeue another)
2642        let task = DocketTask::new(
2643            TaskId::from_string("c-0"),
2644            "stat_test".to_string(),
2645            serde_json::json!({}),
2646            0,
2647            3,
2648        );
2649        backend.enqueue(task).unwrap();
2650        let deq = backend
2651            .dequeue(&["stat_test".to_string()])
2652            .unwrap()
2653            .unwrap();
2654        backend.ack(&deq.id, serde_json::json!({})).unwrap();
2655
2656        let stats = backend.stats().unwrap();
2657        assert!(stats.pending >= 1);
2658        assert!(stats.in_progress >= 1 || stats.completed >= 1);
2659    }
2660
2661    #[test]
2662    fn memory_backend_requeue_stale_no_stale() {
2663        let backend = MemoryDocketBackend::new(DocketSettings::memory());
2664        let task = DocketTask::new(
2665            TaskId::from_string("t-fresh"),
2666            "stale_test".to_string(),
2667            serde_json::json!({}),
2668            0,
2669            3,
2670        );
2671        backend.enqueue(task).unwrap();
2672
2673        // Dequeue but with a long visibility timeout — should not be stale
2674        let _deq = backend
2675            .dequeue(&["stale_test".to_string()])
2676            .unwrap()
2677            .unwrap();
2678        let requeued = backend.requeue_stale().unwrap();
2679        assert_eq!(requeued, 0);
2680    }
2681
2682    #[test]
2683    fn memory_backend_ack_marks_completed() {
2684        let backend = MemoryDocketBackend::new(DocketSettings::memory());
2685        let task = DocketTask::new(
2686            TaskId::from_string("t-ack"),
2687            "ack_test".to_string(),
2688            serde_json::json!({}),
2689            0,
2690            3,
2691        );
2692        backend.enqueue(task).unwrap();
2693
2694        let deq = backend.dequeue(&["ack_test".to_string()]).unwrap().unwrap();
2695        backend
2696            .ack(&deq.id, serde_json::json!({"done": true}))
2697            .unwrap();
2698
2699        let task = backend
2700            .get_task(&TaskId::from_string("t-ack"))
2701            .unwrap()
2702            .unwrap();
2703        assert_eq!(task.status, TaskStatus::Completed);
2704        assert_eq!(task.result, Some(serde_json::json!({"done": true})));
2705    }
2706
2707    #[test]
2708    fn memory_backend_priority_ordering() {
2709        let backend = MemoryDocketBackend::new(DocketSettings::memory());
2710
2711        // Enqueue low priority first, then high
2712        let low = DocketTask::new(
2713            TaskId::from_string("low"),
2714            "prio".to_string(),
2715            serde_json::json!({}),
2716            1,
2717            3,
2718        );
2719        let high = DocketTask::new(
2720            TaskId::from_string("high"),
2721            "prio".to_string(),
2722            serde_json::json!({}),
2723            10,
2724            3,
2725        );
2726        backend.enqueue(low).unwrap();
2727        backend.enqueue(high).unwrap();
2728
2729        // High priority dequeued first
2730        let first = backend.dequeue(&["prio".to_string()]).unwrap().unwrap();
2731        assert_eq!(first.id.to_string(), "high");
2732    }
2733
2734    // ========================================
2735    // Docket — additional
2736    // ========================================
2737
2738    #[test]
2739    fn docket_debug() {
2740        let docket = Docket::memory();
2741        let debug = format!("{:?}", docket);
2742        assert!(debug.contains("Docket"));
2743        assert!(debug.contains("settings"));
2744    }
2745
2746    #[test]
2747    fn docket_settings_accessor() {
2748        let docket = Docket::memory();
2749        let settings = docket.settings();
2750        assert!(matches!(settings.backend, DocketBackendType::Memory));
2751    }
2752
2753    #[test]
2754    fn docket_into_shared() {
2755        let docket = Docket::memory();
2756        let shared: SharedDocket = docket.into_shared();
2757        // Can be cloned (Arc)
2758        let _clone = Arc::clone(&shared);
2759        assert!(matches!(
2760            shared.settings().backend,
2761            DocketBackendType::Memory
2762        ));
2763    }
2764
2765    #[test]
2766    fn docket_get_task_nonexistent() {
2767        let docket = Docket::memory();
2768        let result = docket
2769            .get_task(&TaskId::from_string("no-such-task"))
2770            .unwrap();
2771        assert!(result.is_none());
2772    }
2773
2774    #[test]
2775    fn docket_cancel_nonexistent_is_error() {
2776        let docket = Docket::memory();
2777        let result = docket.cancel(&TaskId::from_string("no-such"), None);
2778        assert!(result.is_err());
2779    }
2780
2781    #[test]
2782    fn docket_task_ids_are_sequential() {
2783        let docket = Docket::memory();
2784        let id1 = docket.submit("seq", serde_json::json!({})).unwrap();
2785        let id2 = docket.submit("seq", serde_json::json!({})).unwrap();
2786        // IDs should be different (sequential counter)
2787        assert_ne!(id1.to_string(), id2.to_string());
2788        assert!(id1.to_string().starts_with("docket-"));
2789        assert!(id2.to_string().starts_with("docket-"));
2790    }
2791
2792    #[test]
2793    fn docket_submit_with_max_retries_override() {
2794        let docket = Docket::memory();
2795        let id = docket
2796            .submit_with_options(
2797                "retry_over",
2798                serde_json::json!({}),
2799                SubmitOptions::new().with_max_retries(10),
2800            )
2801            .unwrap();
2802
2803        let task = docket.get_task(&id).unwrap().unwrap();
2804        assert_eq!(task.max_retries, 10);
2805    }
2806
2807    #[cfg(not(feature = "redis"))]
2808    #[test]
2809    fn docket_new_redis_without_feature_fails() {
2810        let settings = DocketSettings::redis("redis://localhost:6379");
2811        let result = Docket::new(settings);
2812        assert!(result.is_err());
2813    }
2814
2815    // ========================================
2816    // Worker — additional
2817    // ========================================
2818
2819    #[test]
2820    fn worker_is_running_initially_false() {
2821        let docket = Docket::memory();
2822        let worker = docket
2823            .worker()
2824            .subscribe("test", |_| async { Ok(serde_json::json!({})) })
2825            .build();
2826        assert!(!worker.is_running());
2827    }
2828
2829    #[test]
2830    fn worker_stop() {
2831        let docket = Docket::memory();
2832        let worker = docket
2833            .worker()
2834            .subscribe("test", |_| async { Ok(serde_json::json!({})) })
2835            .build();
2836
2837        // Manually set running
2838        worker.running.store(true, Ordering::SeqCst);
2839        assert!(worker.is_running());
2840
2841        worker.stop();
2842        assert!(!worker.is_running());
2843    }
2844
2845    #[test]
2846    fn worker_debug() {
2847        let docket = Docket::memory();
2848        let worker = docket
2849            .worker()
2850            .subscribe("type_x", |_| async { Ok(serde_json::json!({})) })
2851            .build();
2852        let debug = format!("{:?}", worker);
2853        assert!(debug.contains("Worker"));
2854        assert!(debug.contains("type_x"));
2855    }
2856
2857    #[test]
2858    fn worker_process_one_handler_error_nacks() {
2859        use fastmcp_core::block_on;
2860
2861        let settings = DocketSettings::memory().with_max_retries(2);
2862        let docket = Docket::new(settings).unwrap();
2863
2864        let id = docket.submit("fail_type", serde_json::json!({})).unwrap();
2865
2866        let worker = docket
2867            .worker()
2868            .subscribe("fail_type", |_| async {
2869                Err(DocketError::Handler("boom".to_string()))
2870            })
2871            .build();
2872
2873        let cx = Cx::for_testing();
2874        // First failure — nack, requeue
2875        let processed = block_on(worker.process_one(&cx)).unwrap();
2876        assert!(processed);
2877
2878        // Task should be requeued (retry_count=1, still < max_retries=2)
2879        let task = docket.get_task(&id).unwrap().unwrap();
2880        assert_eq!(task.retry_count, 1);
2881        assert_eq!(task.status, TaskStatus::Pending);
2882        assert!(task.error.as_deref().unwrap().contains("boom"));
2883
2884        // Second failure — should exceed max retries
2885        let processed = block_on(worker.process_one(&cx)).unwrap();
2886        assert!(processed);
2887
2888        let task = docket.get_task(&id).unwrap().unwrap();
2889        assert_eq!(task.status, TaskStatus::Failed);
2890        assert_eq!(task.retry_count, 2);
2891    }
2892
2893    // ========================================
2894    // DocketSettings — additional
2895    // ========================================
2896
2897    #[test]
2898    fn docket_settings_with_poll_interval() {
2899        let s = DocketSettings::memory().with_poll_interval(Duration::from_millis(500));
2900        assert_eq!(s.poll_interval, Duration::from_millis(500));
2901    }
2902
2903    #[test]
2904    fn docket_settings_with_max_retries() {
2905        let s = DocketSettings::memory().with_max_retries(10);
2906        assert_eq!(s.max_retries, 10);
2907    }
2908
2909    // ========================================
2910    // SubmitOptions — clone
2911    // ========================================
2912
2913    #[test]
2914    fn submit_options_clone() {
2915        let opts = SubmitOptions::new().with_priority(5).with_max_retries(3);
2916        let cloned = opts.clone();
2917        assert_eq!(cloned.priority, 5);
2918        assert_eq!(cloned.max_retries, Some(3));
2919    }
2920
2921    // ========================================
2922    // DocketTask::to_task_info — additional statuses
2923    // ========================================
2924
2925    #[test]
2926    fn docket_task_to_task_info_running_has_started_at() {
2927        let mut task = DocketTask::new(
2928            TaskId::from_string("t"),
2929            "type".into(),
2930            serde_json::json!({}),
2931            0,
2932            3,
2933        );
2934        task.status = TaskStatus::Running;
2935        task.claimed_at = Some("2026-01-01T00:00:00Z".to_string());
2936
2937        let info = task.to_task_info();
2938        assert_eq!(info.status, TaskStatus::Running);
2939        assert_eq!(info.started_at, Some("2026-01-01T00:00:00Z".to_string()));
2940        // Not terminal, so completed_at should be None
2941        assert!(info.completed_at.is_none());
2942    }
2943
2944    #[test]
2945    fn docket_task_to_task_info_cancelled_has_completed_at() {
2946        let mut task = DocketTask::new(
2947            TaskId::from_string("t"),
2948            "type".into(),
2949            serde_json::json!({}),
2950            0,
2951            3,
2952        );
2953        task.status = TaskStatus::Cancelled;
2954        task.error = Some("user cancelled".to_string());
2955
2956        let info = task.to_task_info();
2957        assert_eq!(info.status, TaskStatus::Cancelled);
2958        assert!(info.completed_at.is_some()); // terminal
2959        assert_eq!(info.error, Some("user cancelled".to_string()));
2960    }
2961
2962    #[test]
2963    fn docket_task_to_task_result_cancelled() {
2964        let mut task = DocketTask::new(
2965            TaskId::from_string("t"),
2966            "type".into(),
2967            serde_json::json!({}),
2968            0,
2969            3,
2970        );
2971        task.status = TaskStatus::Cancelled;
2972        task.error = Some("cancelled by user".to_string());
2973
2974        let result = task.to_task_result().expect("terminal");
2975        assert!(!result.success);
2976        assert_eq!(result.error, Some("cancelled by user".to_string()));
2977    }
2978
2979    // ========================================
2980    // Memory backend — cancel running
2981    // ========================================
2982
2983    #[test]
2984    fn memory_backend_cancel_running_task() {
2985        let backend = MemoryDocketBackend::new(DocketSettings::memory());
2986        let task = DocketTask::new(
2987            TaskId::from_string("t1"),
2988            "work".into(),
2989            serde_json::json!({}),
2990            0,
2991            3,
2992        );
2993        backend.enqueue(task).unwrap();
2994
2995        // Dequeue to set running
2996        let _claimed = backend.dequeue(&["work".to_string()]).unwrap().unwrap();
2997
2998        // Cancel while running
2999        backend
3000            .cancel(&TaskId::from_string("t1"), Some("force cancel"))
3001            .unwrap();
3002
3003        let task = backend
3004            .get_task(&TaskId::from_string("t1"))
3005            .unwrap()
3006            .unwrap();
3007        assert_eq!(task.status, TaskStatus::Cancelled);
3008        assert_eq!(task.error, Some("force cancel".to_string()));
3009    }
3010
3011    // ========================================
3012    // Memory backend — requeue stale
3013    // ========================================
3014
3015    #[test]
3016    fn memory_backend_requeue_stale_with_stale_task() {
3017        let settings = DocketSettings::memory().with_visibility_timeout(Duration::from_millis(0));
3018        let backend = MemoryDocketBackend::new(settings);
3019
3020        let mut task = DocketTask::new(
3021            TaskId::from_string("t1"),
3022            "work".into(),
3023            serde_json::json!({}),
3024            0,
3025            3,
3026        );
3027        task.status = TaskStatus::Running;
3028        task.claimed_at = Some("2000-01-01T00:00:00Z".to_string()); // long ago
3029        backend.enqueue(task).unwrap();
3030
3031        // Manually set status to Running (enqueue sets Pending, we need to update)
3032        {
3033            let mut tasks = backend.tasks.write().unwrap();
3034            let t = tasks.get_mut(&TaskId::from_string("t1")).unwrap();
3035            t.status = TaskStatus::Running;
3036            t.claimed_at = Some("2000-01-01T00:00:00Z".to_string());
3037        }
3038
3039        let requeued = backend.requeue_stale().unwrap();
3040        assert_eq!(requeued, 1);
3041
3042        let task = backend
3043            .get_task(&TaskId::from_string("t1"))
3044            .unwrap()
3045            .unwrap();
3046        assert_eq!(task.status, TaskStatus::Pending);
3047        assert!(task.claimed_at.is_none());
3048        assert_eq!(task.retry_count, 1);
3049    }
3050
3051    #[test]
3052    fn memory_backend_requeue_stale_at_retry_limit_marks_failed() {
3053        let settings = DocketSettings::memory()
3054            .with_visibility_timeout(Duration::from_millis(0))
3055            .with_max_retries(1);
3056        let backend = MemoryDocketBackend::new(settings);
3057
3058        let task = DocketTask::new(
3059            TaskId::from_string("t1"),
3060            "work".into(),
3061            serde_json::json!({}),
3062            0,
3063            1, // max_retries = 1
3064        );
3065        backend.enqueue(task).unwrap();
3066
3067        // Manually set as running and stale
3068        {
3069            let mut tasks = backend.tasks.write().unwrap();
3070            let t = tasks.get_mut(&TaskId::from_string("t1")).unwrap();
3071            t.status = TaskStatus::Running;
3072            t.claimed_at = Some("2000-01-01T00:00:00Z".to_string());
3073        }
3074
3075        let requeued = backend.requeue_stale().unwrap();
3076        assert_eq!(requeued, 0); // Failed, not requeued
3077
3078        let task = backend
3079            .get_task(&TaskId::from_string("t1"))
3080            .unwrap()
3081            .unwrap();
3082        assert_eq!(task.status, TaskStatus::Failed);
3083        assert_eq!(task.error.as_deref(), Some("Exceeded visibility timeout"));
3084    }
3085
3086    // ========================================
3087    // Memory backend — nack retry vs fail
3088    // ========================================
3089
3090    #[test]
3091    fn memory_backend_nack_under_limit_requeues() {
3092        let settings = DocketSettings::memory().with_max_retries(3);
3093        let backend = MemoryDocketBackend::new(settings);
3094
3095        let task = DocketTask::new(
3096            TaskId::from_string("t1"),
3097            "work".into(),
3098            serde_json::json!({}),
3099            0,
3100            3,
3101        );
3102        backend.enqueue(task).unwrap();
3103
3104        // Dequeue to set running
3105        let _claimed = backend.dequeue(&["work".to_string()]).unwrap().unwrap();
3106
3107        // First nack — requeued
3108        backend.nack(&TaskId::from_string("t1"), "fail1").unwrap();
3109
3110        let task = backend
3111            .get_task(&TaskId::from_string("t1"))
3112            .unwrap()
3113            .unwrap();
3114        assert_eq!(task.status, TaskStatus::Pending);
3115        assert_eq!(task.retry_count, 1);
3116    }
3117
3118    #[test]
3119    fn memory_backend_nack_at_limit_marks_failed() {
3120        let settings = DocketSettings::memory().with_max_retries(1);
3121        let backend = MemoryDocketBackend::new(settings);
3122
3123        let task = DocketTask::new(
3124            TaskId::from_string("t1"),
3125            "work".into(),
3126            serde_json::json!({}),
3127            0,
3128            1,
3129        );
3130        backend.enqueue(task).unwrap();
3131
3132        // Dequeue to set running
3133        let _claimed = backend.dequeue(&["work".to_string()]).unwrap().unwrap();
3134
3135        // First nack at max_retries=1 — fails
3136        backend.nack(&TaskId::from_string("t1"), "fatal").unwrap();
3137
3138        let task = backend
3139            .get_task(&TaskId::from_string("t1"))
3140            .unwrap()
3141            .unwrap();
3142        assert_eq!(task.status, TaskStatus::Failed);
3143        assert_eq!(task.retry_count, 1);
3144    }
3145
3146    // ========================================
3147    // Memory backend — dequeue multi-type
3148    // ========================================
3149
3150    #[test]
3151    fn memory_backend_dequeue_multiple_types() {
3152        let backend = MemoryDocketBackend::new(DocketSettings::memory());
3153
3154        let task_a = DocketTask::new(
3155            TaskId::from_string("a"),
3156            "type_a".into(),
3157            serde_json::json!({}),
3158            0,
3159            3,
3160        );
3161        let task_b = DocketTask::new(
3162            TaskId::from_string("b"),
3163            "type_b".into(),
3164            serde_json::json!({}),
3165            0,
3166            3,
3167        );
3168        backend.enqueue(task_a).unwrap();
3169        backend.enqueue(task_b).unwrap();
3170
3171        // Dequeue accepting both types — should get first enqueued
3172        let claimed = backend
3173            .dequeue(&["type_a".to_string(), "type_b".to_string()])
3174            .unwrap()
3175            .unwrap();
3176        assert_eq!(claimed.task_type, "type_a");
3177    }
3178
3179    // ========================================
3180    // Docket::memory constructor
3181    // ========================================
3182
3183    #[test]
3184    fn docket_memory_constructor() {
3185        let d = Docket::memory();
3186        assert!(matches!(d.settings().backend, DocketBackendType::Memory));
3187    }
3188
3189    // ========================================
3190    // Worker — subscribed_types order
3191    // ========================================
3192
3193    #[test]
3194    fn worker_subscribed_types_contains_all() {
3195        let docket = Docket::memory();
3196        let worker = docket
3197            .worker()
3198            .subscribe("x", |_| async { Ok(serde_json::json!(1)) })
3199            .subscribe("y", |_| async { Ok(serde_json::json!(2)) })
3200            .subscribe("z", |_| async { Ok(serde_json::json!(3)) })
3201            .build();
3202
3203        let types = worker.subscribed_types();
3204        assert_eq!(types.len(), 3);
3205        assert!(types.contains(&"x".to_string()));
3206        assert!(types.contains(&"y".to_string()));
3207        assert!(types.contains(&"z".to_string()));
3208    }
3209
3210    // ========================================
3211    // Worker — process_one with cancelled cx
3212    // ========================================
3213
3214    #[test]
3215    fn worker_process_one_cancelled_cx_returns_error() {
3216        use fastmcp_core::block_on;
3217
3218        let docket = Docket::memory();
3219        docket.submit("cancel_test", serde_json::json!({})).unwrap();
3220
3221        let worker = docket
3222            .worker()
3223            .subscribe("cancel_test", |_| async { Ok(serde_json::json!({})) })
3224            .build();
3225
3226        let cx = Cx::for_testing();
3227        cx.set_cancel_requested(true);
3228
3229        let result = block_on(worker.process_one(&cx));
3230        assert!(result.is_err());
3231    }
3232
3233    // ========================================
3234    // Cancel with None reason sets error to None
3235    // ========================================
3236
3237    #[test]
3238    fn cancel_with_none_reason_sets_no_error() {
3239        let docket = Docket::memory();
3240        let id = docket.submit("t", serde_json::json!({})).unwrap();
3241        docket.cancel(&id, None).unwrap();
3242
3243        let task = docket.get_task(&id).unwrap().unwrap();
3244        assert_eq!(task.status, TaskStatus::Cancelled);
3245        assert!(task.error.is_none());
3246    }
3247
3248    // ========================================
3249    // to_task_info for Failed status
3250    // ========================================
3251
3252    #[test]
3253    fn docket_task_to_task_info_failed_has_completed_at() {
3254        let mut task = DocketTask::new(
3255            TaskId::from_string("t"),
3256            "type".into(),
3257            serde_json::json!({}),
3258            0,
3259            3,
3260        );
3261        task.status = TaskStatus::Failed;
3262        task.error = Some("crash".to_string());
3263
3264        let info = task.to_task_info();
3265        assert_eq!(info.status, TaskStatus::Failed);
3266        assert!(info.completed_at.is_some()); // Failed is terminal
3267        assert_eq!(info.error, Some("crash".to_string()));
3268    }
3269
3270    // ========================================
3271    // Memory backend cancel nonexistent
3272    // ========================================
3273
3274    #[test]
3275    fn memory_backend_cancel_nonexistent_returns_error() {
3276        let backend = MemoryDocketBackend::new(DocketSettings::memory());
3277        let result = backend.cancel(&TaskId::from_string("no-such"), None);
3278        assert!(result.is_err());
3279    }
3280
3281    // ========================================
3282    // submit_with_options uses default max_retries
3283    // ========================================
3284
3285    #[test]
3286    fn submit_with_options_uses_default_max_retries() {
3287        let settings = DocketSettings::memory().with_max_retries(7);
3288        let docket = Docket::new(settings).unwrap();
3289
3290        // Submit without overriding max_retries
3291        let id = docket
3292            .submit_with_options("t", serde_json::json!({}), SubmitOptions::default())
3293            .unwrap();
3294
3295        let task = docket.get_task(&id).unwrap().unwrap();
3296        assert_eq!(task.max_retries, 7); // inherited from settings
3297    }
3298
3299    // ========================================
3300    // Same-priority FIFO ordering
3301    // ========================================
3302
3303    #[test]
3304    fn memory_backend_same_priority_fifo() {
3305        let backend = MemoryDocketBackend::new(DocketSettings::memory());
3306
3307        for i in 0..3 {
3308            let task = DocketTask::new(
3309                TaskId::from_string(format!("t-{i}")),
3310                "fifo".into(),
3311                serde_json::json!({}),
3312                0, // same priority
3313                3,
3314            );
3315            backend.enqueue(task).unwrap();
3316        }
3317
3318        let first = backend.dequeue(&["fifo".to_string()]).unwrap().unwrap();
3319        assert_eq!(first.id.to_string(), "t-0");
3320        let second = backend.dequeue(&["fifo".to_string()]).unwrap().unwrap();
3321        assert_eq!(second.id.to_string(), "t-1");
3322        let third = backend.dequeue(&["fifo".to_string()]).unwrap().unwrap();
3323        assert_eq!(third.id.to_string(), "t-2");
3324    }
3325
3326    // ========================================
3327    // requeue_stale skips invalid claimed_at
3328    // ========================================
3329
3330    #[test]
3331    fn memory_backend_requeue_stale_skips_invalid_claimed_at() {
3332        let settings = DocketSettings::memory().with_visibility_timeout(Duration::from_millis(0));
3333        let backend = MemoryDocketBackend::new(settings);
3334
3335        let task = DocketTask::new(
3336            TaskId::from_string("t1"),
3337            "work".into(),
3338            serde_json::json!({}),
3339            0,
3340            3,
3341        );
3342        backend.enqueue(task).unwrap();
3343
3344        // Set as running with an invalid claimed_at
3345        {
3346            let mut tasks = backend.tasks.write().unwrap();
3347            let t = tasks.get_mut(&TaskId::from_string("t1")).unwrap();
3348            t.status = TaskStatus::Running;
3349            t.claimed_at = Some("not-a-valid-timestamp".to_string());
3350        }
3351
3352        // Should not requeue (can't parse claimed_at)
3353        let requeued = backend.requeue_stale().unwrap();
3354        assert_eq!(requeued, 0);
3355
3356        // Task should still be Running
3357        let task = backend
3358            .get_task(&TaskId::from_string("t1"))
3359            .unwrap()
3360            .unwrap();
3361        assert_eq!(task.status, TaskStatus::Running);
3362    }
3363
3364    // ========================================
3365    // DocketError::Cancelled into McpError
3366    // ========================================
3367
3368    #[test]
3369    fn docket_error_cancelled_into_mcp_error() {
3370        let err = DocketError::Cancelled;
3371        let mcp: McpError = err.into();
3372        assert!(mcp.message.contains("cancelled"));
3373    }
3374
3375    #[cfg(feature = "redis")]
3376    #[test]
3377    fn test_redis_requeue_stale_marks_failed_at_retry_limit() {
3378        let redis_server = TestRedisServer::start();
3379        let mut settings = redis_settings_for_test(&redis_server.url);
3380        settings.visibility_timeout = Duration::from_millis(0);
3381        settings.max_retries = 1;
3382        let docket = Docket::new(settings).expect("redis docket");
3383
3384        let task_id = docket
3385            .submit("redis_stale_fail", serde_json::json!({}))
3386            .expect("submit");
3387
3388        let task_types = vec!["redis_stale_fail".to_string()];
3389        let _claimed = docket
3390            .backend
3391            .dequeue(&task_types)
3392            .expect("dequeue")
3393            .expect("claimed");
3394
3395        let requeued = docket.backend.requeue_stale().expect("requeue stale");
3396        assert_eq!(requeued, 0);
3397
3398        let task = docket
3399            .get_task(&task_id)
3400            .expect("get task")
3401            .expect("task exists");
3402        assert_eq!(task.status, TaskStatus::Failed);
3403        assert_eq!(task.error.as_deref(), Some("Exceeded visibility timeout"));
3404    }
3405}