Skip to main content

fastmcp_server/
docket.rs

1//! Docket: Distributed task queue for FastMCP.
2//!
3//! Provides distributed task queue capabilities with support for multiple backends:
4//! - **Memory**: In-process queue for testing and development
5//! - **Redis**: Distributed queue for production deployments
6//!
7//! # Architecture
8//!
9//! ```text
10//! ┌────────────────────────────────────────────────────────────┐
11//! │                        Docket                              │
12//! │  ┌─────────────────┐  ┌─────────────────┐                  │
13//! │  │   DocketClient  │  │   Worker Pool   │                  │
14//! │  │   (task submit) │  │   (processing)  │                  │
15//! │  └────────┬────────┘  └────────┬────────┘                  │
16//! │           │                    │                           │
17//! │           └──────────┬─────────┘                           │
18//! │                      ▼                                     │
19//! │           ┌────────────────────┐                           │
20//! │           │   DocketBackend    │                           │
21//! │           └─────────┬──────────┘                           │
22//! │                     │                                      │
23//! │       ┌─────────────┼─────────────┐                        │
24//! │       ▼             ▼             ▼                        │
25//! │  ┌─────────┐  ┌─────────┐  ┌─────────────┐                 │
26//! │  │ Memory  │  │  Redis  │  │   Future    │                 │
27//! │  │ Backend │  │ Backend │  │  Backends   │                 │
28//! │  └─────────┘  └─────────┘  └─────────────┘                 │
29//! └────────────────────────────────────────────────────────────┘
30//! ```
31//!
32//! # Usage
33//!
34//! ```ignore
35//! use fastmcp_server::docket::{Docket, DocketSettings, Worker};
36//!
37//! // Create with memory backend (testing/development)
38//! let docket = Docket::memory();
39//!
40//! // Create with Redis backend (production)
41//! let settings = DocketSettings::redis("redis://localhost:6379");
42//! let docket = Docket::new(settings)?;
43//!
44//! // Submit a task
45//! let task_id = docket.submit("process_data", json!({"input": "data"})).await?;
46//!
47//! // Create a worker
48//! let worker = docket.worker()
49//!     .subscribe("process_data", |task| async move {
50//!         // Process the task
51//!         Ok(json!({"result": "processed"}))
52//!     })
53//!     .build();
54//!
55//! // Start processing
56//! worker.run().await?;
57//! ```
58
59use std::collections::{HashMap, VecDeque};
60use std::future::Future;
61use std::pin::Pin;
62use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
63use std::sync::{Arc, RwLock};
64use std::time::Duration;
65
66use asupersync::Cx;
67use fastmcp_core::McpError;
68use fastmcp_core::logging::{debug, info, targets, warn};
69use fastmcp_protocol::{TaskId, TaskInfo, TaskResult, TaskStatus};
70use serde::{Deserialize, Serialize};
71
72// ============================================================================
73// Configuration
74// ============================================================================
75
76/// Settings for configuring a Docket instance.
77#[derive(Debug, Clone)]
78pub struct DocketSettings {
79    /// Backend type (memory or redis).
80    pub backend: DocketBackendType,
81    /// Queue name prefix for namespacing.
82    pub queue_prefix: String,
83    /// Task visibility timeout (how long before unacked task is requeued).
84    pub visibility_timeout: Duration,
85    /// Default task timeout.
86    pub default_task_timeout: Duration,
87    /// Maximum retry count for failed tasks.
88    pub max_retries: u32,
89    /// Delay between retries (with exponential backoff).
90    pub retry_delay: Duration,
91    /// Worker poll interval when queue is empty.
92    pub poll_interval: Duration,
93}
94
95impl Default for DocketSettings {
96    fn default() -> Self {
97        Self {
98            backend: DocketBackendType::Memory,
99            queue_prefix: "fastmcp:docket".to_string(),
100            visibility_timeout: Duration::from_secs(30),
101            default_task_timeout: Duration::from_secs(300),
102            max_retries: 3,
103            retry_delay: Duration::from_secs(1),
104            poll_interval: Duration::from_millis(100),
105        }
106    }
107}
108
109impl DocketSettings {
110    /// Creates settings for memory backend (testing/development).
111    #[must_use]
112    pub fn memory() -> Self {
113        Self::default()
114    }
115
116    /// Creates settings for Redis backend.
117    #[must_use]
118    pub fn redis(url: impl Into<String>) -> Self {
119        Self {
120            backend: DocketBackendType::Redis(RedisSettings {
121                url: url.into(),
122                pool_size: 10,
123                connect_timeout: Duration::from_secs(5),
124            }),
125            ..Self::default()
126        }
127    }
128
129    /// Sets the queue prefix.
130    #[must_use]
131    pub fn with_queue_prefix(mut self, prefix: impl Into<String>) -> Self {
132        self.queue_prefix = prefix.into();
133        self
134    }
135
136    /// Sets the visibility timeout.
137    #[must_use]
138    pub fn with_visibility_timeout(mut self, timeout: Duration) -> Self {
139        self.visibility_timeout = timeout;
140        self
141    }
142
143    /// Sets the maximum retry count.
144    #[must_use]
145    pub fn with_max_retries(mut self, retries: u32) -> Self {
146        self.max_retries = retries;
147        self
148    }
149
150    /// Sets the poll interval.
151    #[must_use]
152    pub fn with_poll_interval(mut self, interval: Duration) -> Self {
153        self.poll_interval = interval;
154        self
155    }
156}
157
158/// Backend type configuration.
159#[derive(Debug, Clone)]
160pub enum DocketBackendType {
161    /// In-memory backend for testing/development.
162    Memory,
163    /// Redis backend for production.
164    Redis(RedisSettings),
165}
166
167/// Redis connection settings.
168#[derive(Debug, Clone)]
169pub struct RedisSettings {
170    /// Redis connection URL.
171    pub url: String,
172    /// Connection pool size.
173    pub pool_size: usize,
174    /// Connection timeout.
175    pub connect_timeout: Duration,
176}
177
178// ============================================================================
179// Task Types
180// ============================================================================
181
182/// A queued task in the Docket system.
183#[derive(Debug, Clone, Serialize, Deserialize)]
184pub struct DocketTask {
185    /// Unique task identifier.
186    pub id: TaskId,
187    /// Task type (determines which handler processes it).
188    pub task_type: String,
189    /// Task parameters.
190    pub params: serde_json::Value,
191    /// Task priority (higher = processed first).
192    pub priority: i32,
193    /// Number of retry attempts so far.
194    pub retry_count: u32,
195    /// Maximum retries allowed.
196    pub max_retries: u32,
197    /// When the task was created.
198    pub created_at: String,
199    /// When the task was claimed by a worker.
200    pub claimed_at: Option<String>,
201    /// Current task status.
202    pub status: TaskStatus,
203    /// Error message if failed.
204    pub error: Option<String>,
205    /// Task result if completed.
206    pub result: Option<serde_json::Value>,
207}
208
209impl DocketTask {
210    /// Creates a new task.
211    fn new(
212        id: TaskId,
213        task_type: String,
214        params: serde_json::Value,
215        priority: i32,
216        max_retries: u32,
217    ) -> Self {
218        Self {
219            id,
220            task_type,
221            params,
222            priority,
223            retry_count: 0,
224            max_retries,
225            created_at: chrono::Utc::now().to_rfc3339(),
226            claimed_at: None,
227            status: TaskStatus::Pending,
228            error: None,
229            result: None,
230        }
231    }
232
233    /// Converts to TaskInfo for protocol responses.
234    #[must_use]
235    pub fn to_task_info(&self) -> TaskInfo {
236        TaskInfo {
237            id: self.id.clone(),
238            task_type: self.task_type.clone(),
239            status: self.status,
240            progress: None,
241            message: None,
242            created_at: self.created_at.clone(),
243            started_at: self.claimed_at.clone(),
244            completed_at: if self.status.is_terminal() {
245                Some(chrono::Utc::now().to_rfc3339())
246            } else {
247                None
248            },
249            error: self.error.clone(),
250        }
251    }
252
253    /// Converts to TaskResult for protocol responses.
254    #[must_use]
255    pub fn to_task_result(&self) -> Option<TaskResult> {
256        if !self.status.is_terminal() {
257            return None;
258        }
259        Some(TaskResult {
260            id: self.id.clone(),
261            success: self.status == TaskStatus::Completed,
262            data: self.result.clone(),
263            error: self.error.clone(),
264        })
265    }
266}
267
268/// Options for submitting a task.
269#[derive(Debug, Clone, Default)]
270pub struct SubmitOptions {
271    /// Task priority (higher = processed first).
272    pub priority: i32,
273    /// Maximum retries (overrides default).
274    pub max_retries: Option<u32>,
275    /// Delay before task becomes visible.
276    pub delay: Option<Duration>,
277}
278
279impl SubmitOptions {
280    /// Creates default options.
281    #[must_use]
282    pub fn new() -> Self {
283        Self::default()
284    }
285
286    /// Sets task priority.
287    #[must_use]
288    pub fn with_priority(mut self, priority: i32) -> Self {
289        self.priority = priority;
290        self
291    }
292
293    /// Sets max retries.
294    #[must_use]
295    pub fn with_max_retries(mut self, retries: u32) -> Self {
296        self.max_retries = Some(retries);
297        self
298    }
299
300    /// Sets initial delay.
301    #[must_use]
302    pub fn with_delay(mut self, delay: Duration) -> Self {
303        self.delay = Some(delay);
304        self
305    }
306}
307
308// ============================================================================
309// Backend Trait
310// ============================================================================
311
312/// Result type for Docket operations.
313pub type DocketResult<T> = Result<T, DocketError>;
314
315/// Errors that can occur in Docket operations.
316#[derive(Debug)]
317pub enum DocketError {
318    /// Task not found.
319    NotFound(String),
320    /// Backend connection error.
321    Connection(String),
322    /// Serialization error.
323    Serialization(String),
324    /// Task handler error.
325    Handler(String),
326    /// Backend-specific error.
327    Backend(String),
328    /// Cancelled.
329    Cancelled,
330}
331
332impl std::fmt::Display for DocketError {
333    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
334        match self {
335            DocketError::NotFound(msg) => write!(f, "Task not found: {msg}"),
336            DocketError::Connection(msg) => write!(f, "Connection error: {msg}"),
337            DocketError::Serialization(msg) => write!(f, "Serialization error: {msg}"),
338            DocketError::Handler(msg) => write!(f, "Handler error: {msg}"),
339            DocketError::Backend(msg) => write!(f, "Backend error: {msg}"),
340            DocketError::Cancelled => write!(f, "Operation cancelled"),
341        }
342    }
343}
344
345impl std::error::Error for DocketError {}
346
347impl From<DocketError> for McpError {
348    fn from(err: DocketError) -> Self {
349        McpError::internal_error(err.to_string())
350    }
351}
352
353/// Backend trait for Docket storage.
354///
355/// Implementations provide the actual storage and retrieval of tasks.
356/// The trait uses synchronous methods but backends can internally use
357/// async operations wrapped in blocking.
358pub trait DocketBackend: Send + Sync {
359    /// Enqueues a task for processing.
360    fn enqueue(&self, task: DocketTask) -> DocketResult<()>;
361
362    /// Dequeues a task for the given task types.
363    ///
364    /// Returns the highest priority task that matches one of the subscribed types.
365    /// The task is marked as claimed but not removed from the queue until acknowledged.
366    fn dequeue(&self, task_types: &[String]) -> DocketResult<Option<DocketTask>>;
367
368    /// Acknowledges successful task completion.
369    fn ack(&self, task_id: &TaskId, result: serde_json::Value) -> DocketResult<()>;
370
371    /// Negative acknowledgement - task failed, may be retried.
372    fn nack(&self, task_id: &TaskId, error: &str) -> DocketResult<()>;
373
374    /// Gets task by ID.
375    fn get_task(&self, task_id: &TaskId) -> DocketResult<Option<DocketTask>>;
376
377    /// Lists tasks, optionally filtered by status.
378    fn list_tasks(&self, status: Option<TaskStatus>, limit: usize)
379    -> DocketResult<Vec<DocketTask>>;
380
381    /// Cancels a task.
382    fn cancel(&self, task_id: &TaskId, reason: Option<&str>) -> DocketResult<()>;
383
384    /// Returns queue statistics.
385    fn stats(&self) -> DocketResult<QueueStats>;
386
387    /// Requeues tasks that have exceeded visibility timeout.
388    fn requeue_stale(&self) -> DocketResult<usize>;
389}
390
391/// Queue statistics.
392#[derive(Debug, Clone, Default)]
393pub struct QueueStats {
394    /// Number of pending tasks.
395    pub pending: usize,
396    /// Number of in-progress tasks.
397    pub in_progress: usize,
398    /// Number of completed tasks.
399    pub completed: usize,
400    /// Number of failed tasks.
401    pub failed: usize,
402    /// Number of cancelled tasks.
403    pub cancelled: usize,
404}
405
406// ============================================================================
407// Memory Backend
408// ============================================================================
409
410/// In-memory Docket backend for testing and development.
411///
412/// Tasks are stored in memory and not persisted across restarts.
413/// This backend is thread-safe and suitable for single-process deployments.
414pub struct MemoryDocketBackend {
415    /// All tasks indexed by ID.
416    tasks: RwLock<HashMap<TaskId, DocketTask>>,
417    /// Pending tasks queue (sorted by priority, then creation time).
418    pending: RwLock<VecDeque<TaskId>>,
419    /// Settings for visibility timeout, retries, etc.
420    settings: DocketSettings,
421}
422
423impl MemoryDocketBackend {
424    /// Creates a new memory backend.
425    #[must_use]
426    pub fn new(settings: DocketSettings) -> Self {
427        Self {
428            tasks: RwLock::new(HashMap::new()),
429            pending: RwLock::new(VecDeque::new()),
430            settings,
431        }
432    }
433}
434
435impl DocketBackend for MemoryDocketBackend {
436    fn enqueue(&self, task: DocketTask) -> DocketResult<()> {
437        let task_id = task.id.clone();
438        let priority = task.priority;
439
440        {
441            let mut tasks = self
442                .tasks
443                .write()
444                .map_err(|e| DocketError::Backend(format!("Lock poisoned: {e}")))?;
445            tasks.insert(task_id.clone(), task);
446        }
447
448        {
449            let mut pending = self
450                .pending
451                .write()
452                .map_err(|e| DocketError::Backend(format!("Lock poisoned: {e}")))?;
453
454            // Insert maintaining priority order (higher priority first)
455            let tasks = self
456                .tasks
457                .read()
458                .map_err(|e| DocketError::Backend(format!("Lock poisoned: {e}")))?;
459
460            let pos = pending
461                .iter()
462                .position(|id| tasks.get(id).is_none_or(|t| t.priority < priority))
463                .unwrap_or(pending.len());
464
465            pending.insert(pos, task_id);
466        }
467
468        Ok(())
469    }
470
471    fn dequeue(&self, task_types: &[String]) -> DocketResult<Option<DocketTask>> {
472        let mut pending = self
473            .pending
474            .write()
475            .map_err(|e| DocketError::Backend(format!("Lock poisoned: {e}")))?;
476        let mut tasks = self
477            .tasks
478            .write()
479            .map_err(|e| DocketError::Backend(format!("Lock poisoned: {e}")))?;
480
481        // Find first pending task matching subscribed types
482        let pos = pending.iter().position(|id| {
483            tasks.get(id).is_some_and(|t| {
484                t.status == TaskStatus::Pending && task_types.contains(&t.task_type)
485            })
486        });
487
488        if let Some(pos) = pos {
489            let task_id = pending.remove(pos).expect("position valid");
490            if let Some(task) = tasks.get_mut(&task_id) {
491                task.status = TaskStatus::Running;
492                task.claimed_at = Some(chrono::Utc::now().to_rfc3339());
493                return Ok(Some(task.clone()));
494            }
495        }
496
497        Ok(None)
498    }
499
500    fn ack(&self, task_id: &TaskId, result: serde_json::Value) -> DocketResult<()> {
501        let mut tasks = self
502            .tasks
503            .write()
504            .map_err(|e| DocketError::Backend(format!("Lock poisoned: {e}")))?;
505
506        let task = tasks
507            .get_mut(task_id)
508            .ok_or_else(|| DocketError::NotFound(task_id.to_string()))?;
509
510        task.status = TaskStatus::Completed;
511        task.result = Some(result);
512
513        Ok(())
514    }
515
516    fn nack(&self, task_id: &TaskId, error: &str) -> DocketResult<()> {
517        let mut tasks = self
518            .tasks
519            .write()
520            .map_err(|e| DocketError::Backend(format!("Lock poisoned: {e}")))?;
521        let mut pending = self
522            .pending
523            .write()
524            .map_err(|e| DocketError::Backend(format!("Lock poisoned: {e}")))?;
525
526        let task = tasks
527            .get_mut(task_id)
528            .ok_or_else(|| DocketError::NotFound(task_id.to_string()))?;
529
530        task.retry_count += 1;
531        task.error = Some(error.to_string());
532
533        if task.retry_count >= task.max_retries {
534            // Max retries exceeded - mark as failed
535            task.status = TaskStatus::Failed;
536        } else {
537            // Requeue for retry
538            task.status = TaskStatus::Pending;
539            task.claimed_at = None;
540            pending.push_back(task_id.clone());
541        }
542
543        Ok(())
544    }
545
546    fn get_task(&self, task_id: &TaskId) -> DocketResult<Option<DocketTask>> {
547        let tasks = self
548            .tasks
549            .read()
550            .map_err(|e| DocketError::Backend(format!("Lock poisoned: {e}")))?;
551        Ok(tasks.get(task_id).cloned())
552    }
553
554    fn list_tasks(
555        &self,
556        status: Option<TaskStatus>,
557        limit: usize,
558    ) -> DocketResult<Vec<DocketTask>> {
559        let tasks = self
560            .tasks
561            .read()
562            .map_err(|e| DocketError::Backend(format!("Lock poisoned: {e}")))?;
563
564        let iter = tasks
565            .values()
566            .filter(|t| status.is_none_or(|s| t.status == s));
567
568        Ok(iter.take(limit).cloned().collect())
569    }
570
571    fn cancel(&self, task_id: &TaskId, reason: Option<&str>) -> DocketResult<()> {
572        let mut tasks = self
573            .tasks
574            .write()
575            .map_err(|e| DocketError::Backend(format!("Lock poisoned: {e}")))?;
576        let mut pending = self
577            .pending
578            .write()
579            .map_err(|e| DocketError::Backend(format!("Lock poisoned: {e}")))?;
580
581        let task = tasks
582            .get_mut(task_id)
583            .ok_or_else(|| DocketError::NotFound(task_id.to_string()))?;
584
585        if task.status.is_terminal() {
586            return Err(DocketError::Backend(format!(
587                "Cannot cancel task in terminal state: {:?}",
588                task.status
589            )));
590        }
591
592        task.status = TaskStatus::Cancelled;
593        task.error = reason.map(String::from);
594
595        // Remove from pending queue
596        pending.retain(|id| id != task_id);
597
598        Ok(())
599    }
600
601    fn stats(&self) -> DocketResult<QueueStats> {
602        let tasks = self
603            .tasks
604            .read()
605            .map_err(|e| DocketError::Backend(format!("Lock poisoned: {e}")))?;
606
607        let mut stats = QueueStats::default();
608        for task in tasks.values() {
609            match task.status {
610                TaskStatus::Pending => stats.pending += 1,
611                TaskStatus::Running => stats.in_progress += 1,
612                TaskStatus::Completed => stats.completed += 1,
613                TaskStatus::Failed => stats.failed += 1,
614                TaskStatus::Cancelled => stats.cancelled += 1,
615            }
616        }
617
618        Ok(stats)
619    }
620
621    fn requeue_stale(&self) -> DocketResult<usize> {
622        let mut tasks = self
623            .tasks
624            .write()
625            .map_err(|e| DocketError::Backend(format!("Lock poisoned: {e}")))?;
626        let mut pending = self
627            .pending
628            .write()
629            .map_err(|e| DocketError::Backend(format!("Lock poisoned: {e}")))?;
630
631        let now = chrono::Utc::now();
632        let timeout = chrono::Duration::from_std(self.settings.visibility_timeout)
633            .unwrap_or_else(|_| chrono::Duration::seconds(30));
634
635        let mut requeued = 0;
636
637        for task in tasks.values_mut() {
638            if task.status != TaskStatus::Running {
639                continue;
640            }
641
642            if let Some(ref claimed_at) = task.claimed_at {
643                if let Ok(claimed) = chrono::DateTime::parse_from_rfc3339(claimed_at) {
644                    if now - claimed.with_timezone(&chrono::Utc) > timeout {
645                        // Task has exceeded visibility timeout - requeue
646                        task.status = TaskStatus::Pending;
647                        task.claimed_at = None;
648                        task.retry_count += 1;
649
650                        if task.retry_count >= task.max_retries {
651                            task.status = TaskStatus::Failed;
652                            task.error = Some("Exceeded visibility timeout".to_string());
653                        } else {
654                            pending.push_back(task.id.clone());
655                            requeued += 1;
656                        }
657                    }
658                }
659            }
660        }
661
662        Ok(requeued)
663    }
664}
665
666// ============================================================================
667// Redis Backend (stub - requires redis crate feature)
668// ============================================================================
669
670/// Redis Docket backend for production distributed deployments.
671///
672/// Uses Redis lists and sorted sets for reliable task queuing with
673/// visibility timeout and atomic operations.
674#[cfg(feature = "redis")]
675pub struct RedisDocketBackend {
676    // Redis client would go here
677    _settings: RedisSettings,
678    _docket_settings: DocketSettings,
679}
680
681#[cfg(feature = "redis")]
682impl RedisDocketBackend {
683    /// Creates a new Redis backend.
684    pub fn new(
685        _redis_settings: RedisSettings,
686        _docket_settings: DocketSettings,
687    ) -> DocketResult<Self> {
688        // TODO: Initialize Redis connection pool
689        Err(DocketError::Backend(
690            "Redis backend not yet implemented".to_string(),
691        ))
692    }
693}
694
695#[cfg(feature = "redis")]
696impl DocketBackend for RedisDocketBackend {
697    fn enqueue(&self, _task: DocketTask) -> DocketResult<()> {
698        Err(DocketError::Backend(
699            "Redis backend not yet implemented".to_string(),
700        ))
701    }
702
703    fn dequeue(&self, _task_types: &[String]) -> DocketResult<Option<DocketTask>> {
704        Err(DocketError::Backend(
705            "Redis backend not yet implemented".to_string(),
706        ))
707    }
708
709    fn ack(&self, _task_id: &TaskId, _result: serde_json::Value) -> DocketResult<()> {
710        Err(DocketError::Backend(
711            "Redis backend not yet implemented".to_string(),
712        ))
713    }
714
715    fn nack(&self, _task_id: &TaskId, _error: &str) -> DocketResult<()> {
716        Err(DocketError::Backend(
717            "Redis backend not yet implemented".to_string(),
718        ))
719    }
720
721    fn get_task(&self, _task_id: &TaskId) -> DocketResult<Option<DocketTask>> {
722        Err(DocketError::Backend(
723            "Redis backend not yet implemented".to_string(),
724        ))
725    }
726
727    fn list_tasks(
728        &self,
729        _status: Option<TaskStatus>,
730        _limit: usize,
731    ) -> DocketResult<Vec<DocketTask>> {
732        Err(DocketError::Backend(
733            "Redis backend not yet implemented".to_string(),
734        ))
735    }
736
737    fn cancel(&self, _task_id: &TaskId, _reason: Option<&str>) -> DocketResult<()> {
738        Err(DocketError::Backend(
739            "Redis backend not yet implemented".to_string(),
740        ))
741    }
742
743    fn stats(&self) -> DocketResult<QueueStats> {
744        Err(DocketError::Backend(
745            "Redis backend not yet implemented".to_string(),
746        ))
747    }
748
749    fn requeue_stale(&self) -> DocketResult<usize> {
750        Err(DocketError::Backend(
751            "Redis backend not yet implemented".to_string(),
752        ))
753    }
754}
755
756// ============================================================================
757// Docket Client
758// ============================================================================
759
760/// Docket distributed task queue.
761///
762/// The main entry point for submitting and managing distributed tasks.
763pub struct Docket {
764    backend: Arc<dyn DocketBackend>,
765    settings: DocketSettings,
766    task_counter: AtomicU64,
767}
768
769impl Docket {
770    /// Creates a new Docket with the given settings.
771    pub fn new(settings: DocketSettings) -> DocketResult<Self> {
772        let backend: Arc<dyn DocketBackend> = match &settings.backend {
773            DocketBackendType::Memory => Arc::new(MemoryDocketBackend::new(settings.clone())),
774            #[cfg(feature = "redis")]
775            DocketBackendType::Redis(redis_settings) => Arc::new(RedisDocketBackend::new(
776                redis_settings.clone(),
777                settings.clone(),
778            )?),
779            #[cfg(not(feature = "redis"))]
780            DocketBackendType::Redis(_) => {
781                return Err(DocketError::Backend(
782                    "Redis backend requires 'redis' feature".to_string(),
783                ));
784            }
785        };
786
787        Ok(Self {
788            backend,
789            settings,
790            task_counter: AtomicU64::new(0),
791        })
792    }
793
794    /// Creates a Docket with memory backend (for testing).
795    #[must_use]
796    pub fn memory() -> Self {
797        Self::new(DocketSettings::memory()).expect("memory backend always succeeds")
798    }
799
800    /// Submits a task to the queue.
801    pub fn submit(
802        &self,
803        task_type: impl Into<String>,
804        params: serde_json::Value,
805    ) -> DocketResult<TaskId> {
806        self.submit_with_options(task_type, params, SubmitOptions::default())
807    }
808
809    /// Submits a task with custom options.
810    pub fn submit_with_options(
811        &self,
812        task_type: impl Into<String>,
813        params: serde_json::Value,
814        options: SubmitOptions,
815    ) -> DocketResult<TaskId> {
816        let counter = self.task_counter.fetch_add(1, Ordering::SeqCst);
817        let task_id = TaskId::from_string(format!("docket-{counter:08x}"));
818
819        let max_retries = options.max_retries.unwrap_or(self.settings.max_retries);
820        let task = DocketTask::new(
821            task_id.clone(),
822            task_type.into(),
823            params,
824            options.priority,
825            max_retries,
826        );
827
828        self.backend.enqueue(task)?;
829
830        info!(
831            target: targets::SERVER,
832            "Docket: submitted task {} (type: {})",
833            task_id,
834            task_id
835        );
836
837        Ok(task_id)
838    }
839
840    /// Gets a task by ID.
841    pub fn get_task(&self, task_id: &TaskId) -> DocketResult<Option<DocketTask>> {
842        self.backend.get_task(task_id)
843    }
844
845    /// Lists tasks with optional status filter.
846    pub fn list_tasks(
847        &self,
848        status: Option<TaskStatus>,
849        limit: usize,
850    ) -> DocketResult<Vec<DocketTask>> {
851        self.backend.list_tasks(status, limit)
852    }
853
854    /// Cancels a task.
855    pub fn cancel(&self, task_id: &TaskId, reason: Option<&str>) -> DocketResult<()> {
856        self.backend.cancel(task_id, reason)
857    }
858
859    /// Returns queue statistics.
860    pub fn stats(&self) -> DocketResult<QueueStats> {
861        self.backend.stats()
862    }
863
864    /// Creates a worker builder.
865    #[must_use]
866    pub fn worker(&self) -> WorkerBuilder {
867        WorkerBuilder::new(Arc::clone(&self.backend), self.settings.clone())
868    }
869
870    /// Returns the settings.
871    #[must_use]
872    pub fn settings(&self) -> &DocketSettings {
873        &self.settings
874    }
875
876    /// Converts to a shared handle.
877    #[must_use]
878    pub fn into_shared(self) -> SharedDocket {
879        Arc::new(self)
880    }
881}
882
883impl std::fmt::Debug for Docket {
884    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
885        f.debug_struct("Docket")
886            .field("settings", &self.settings)
887            .field("task_counter", &self.task_counter.load(Ordering::SeqCst))
888            .finish_non_exhaustive()
889    }
890}
891
892/// Thread-safe handle to a Docket.
893pub type SharedDocket = Arc<Docket>;
894
895// ============================================================================
896// Worker
897// ============================================================================
898
899/// Task handler function type.
900pub type TaskHandlerFn = Box<
901    dyn Fn(DocketTask) -> Pin<Box<dyn Future<Output = DocketResult<serde_json::Value>> + Send>>
902        + Send
903        + Sync,
904>;
905
906/// Builder for creating workers.
907pub struct WorkerBuilder {
908    backend: Arc<dyn DocketBackend>,
909    settings: DocketSettings,
910    handlers: HashMap<String, TaskHandlerFn>,
911}
912
913impl WorkerBuilder {
914    fn new(backend: Arc<dyn DocketBackend>, settings: DocketSettings) -> Self {
915        Self {
916            backend,
917            settings,
918            handlers: HashMap::new(),
919        }
920    }
921
922    /// Subscribes to a task type with a handler.
923    pub fn subscribe<F, Fut>(mut self, task_type: impl Into<String>, handler: F) -> Self
924    where
925        F: Fn(DocketTask) -> Fut + Send + Sync + 'static,
926        Fut: Future<Output = DocketResult<serde_json::Value>> + Send + 'static,
927    {
928        let task_type = task_type.into();
929        let boxed: TaskHandlerFn = Box::new(move |task| Box::pin(handler(task)));
930        self.handlers.insert(task_type, boxed);
931        self
932    }
933
934    /// Builds the worker.
935    #[must_use]
936    pub fn build(self) -> Worker {
937        Worker {
938            backend: self.backend,
939            settings: self.settings,
940            handlers: Arc::new(self.handlers),
941            running: Arc::new(AtomicBool::new(false)),
942        }
943    }
944}
945
946/// A worker that processes tasks from the queue.
947pub struct Worker {
948    backend: Arc<dyn DocketBackend>,
949    settings: DocketSettings,
950    handlers: Arc<HashMap<String, TaskHandlerFn>>,
951    running: Arc<AtomicBool>,
952}
953
954impl Worker {
955    /// Returns the task types this worker is subscribed to.
956    #[must_use]
957    pub fn subscribed_types(&self) -> Vec<String> {
958        self.handlers.keys().cloned().collect()
959    }
960
961    /// Returns whether the worker is running.
962    #[must_use]
963    pub fn is_running(&self) -> bool {
964        self.running.load(Ordering::SeqCst)
965    }
966
967    /// Stops the worker.
968    pub fn stop(&self) {
969        self.running.store(false, Ordering::SeqCst);
970    }
971
972    /// Processes a single task if available.
973    ///
974    /// Returns true if a task was processed, false if no task was available.
975    pub async fn process_one(&self, cx: &Cx) -> DocketResult<bool> {
976        let task_types = self.subscribed_types();
977
978        // Check for cancellation
979        if cx.is_cancel_requested() {
980            return Err(DocketError::Cancelled);
981        }
982
983        // Try to dequeue a task
984        let Some(task) = self.backend.dequeue(&task_types)? else {
985            return Ok(false);
986        };
987
988        let task_id = task.id.clone();
989        let task_type = task.task_type.clone();
990
991        debug!(
992            target: targets::SERVER,
993            "Docket worker: processing task {} (type: {})",
994            task_id,
995            task_type
996        );
997
998        // Get handler
999        let Some(handler) = self.handlers.get(&task_type) else {
1000            // This shouldn't happen since we only dequeue subscribed types
1001            self.backend.nack(&task_id, "No handler for task type")?;
1002            return Ok(true);
1003        };
1004
1005        // Execute handler
1006        let result = handler(task).await;
1007
1008        match result {
1009            Ok(data) => {
1010                self.backend.ack(&task_id, data)?;
1011                info!(
1012                    target: targets::SERVER,
1013                    "Docket worker: completed task {}",
1014                    task_id
1015                );
1016            }
1017            Err(e) => {
1018                let error_msg = e.to_string();
1019                self.backend.nack(&task_id, &error_msg)?;
1020                warn!(
1021                    target: targets::SERVER,
1022                    "Docket worker: task {} failed: {}",
1023                    task_id,
1024                    error_msg
1025                );
1026            }
1027        }
1028
1029        Ok(true)
1030    }
1031
1032    /// Runs the worker loop until stopped.
1033    pub async fn run(&self, cx: &Cx) -> DocketResult<()> {
1034        self.running.store(true, Ordering::SeqCst);
1035
1036        info!(
1037            target: targets::SERVER,
1038            "Docket worker starting with subscriptions: {:?}",
1039            self.subscribed_types()
1040        );
1041
1042        while self.running.load(Ordering::SeqCst) {
1043            // Check for cancellation
1044            if cx.is_cancel_requested() {
1045                break;
1046            }
1047
1048            // Requeue stale tasks periodically
1049            let _ = self.backend.requeue_stale();
1050
1051            // Process tasks
1052            match self.process_one(cx).await {
1053                Ok(true) => {
1054                    // Processed a task, immediately try for another
1055                    continue;
1056                }
1057                Ok(false) => {
1058                    // No task available, wait before polling again
1059                    std::thread::sleep(self.settings.poll_interval);
1060                }
1061                Err(DocketError::Cancelled) => {
1062                    break;
1063                }
1064                Err(e) => {
1065                    warn!(
1066                        target: targets::SERVER,
1067                        "Docket worker error: {}",
1068                        e
1069                    );
1070                    // Brief pause on error before retrying
1071                    std::thread::sleep(Duration::from_millis(100));
1072                }
1073            }
1074        }
1075
1076        self.running.store(false, Ordering::SeqCst);
1077        info!(target: targets::SERVER, "Docket worker stopped");
1078
1079        Ok(())
1080    }
1081}
1082
1083impl std::fmt::Debug for Worker {
1084    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1085        f.debug_struct("Worker")
1086            .field("subscribed_types", &self.subscribed_types())
1087            .field("running", &self.is_running())
1088            .finish_non_exhaustive()
1089    }
1090}
1091
1092// ============================================================================
1093// Tests
1094// ============================================================================
1095
1096#[cfg(test)]
1097mod tests {
1098    use super::*;
1099
1100    #[test]
1101    fn test_docket_settings_default() {
1102        let settings = DocketSettings::default();
1103        assert!(matches!(settings.backend, DocketBackendType::Memory));
1104        assert_eq!(settings.max_retries, 3);
1105    }
1106
1107    #[test]
1108    fn test_docket_settings_redis() {
1109        let settings = DocketSettings::redis("redis://localhost:6379");
1110        assert!(matches!(settings.backend, DocketBackendType::Redis(_)));
1111    }
1112
1113    #[test]
1114    fn test_docket_settings_builder() {
1115        let settings = DocketSettings::memory()
1116            .with_queue_prefix("test:queue")
1117            .with_max_retries(5)
1118            .with_poll_interval(Duration::from_millis(50));
1119
1120        assert_eq!(settings.queue_prefix, "test:queue");
1121        assert_eq!(settings.max_retries, 5);
1122        assert_eq!(settings.poll_interval, Duration::from_millis(50));
1123    }
1124
1125    #[test]
1126    fn test_docket_memory_creation() {
1127        let docket = Docket::memory();
1128        assert!(matches!(docket.settings.backend, DocketBackendType::Memory));
1129    }
1130
1131    #[test]
1132    fn test_docket_submit_task() {
1133        let docket = Docket::memory();
1134
1135        let task_id = docket
1136            .submit("test_task", serde_json::json!({"key": "value"}))
1137            .unwrap();
1138
1139        assert!(task_id.to_string().starts_with("docket-"));
1140
1141        // Verify task exists
1142        let task = docket.get_task(&task_id).unwrap().unwrap();
1143        assert_eq!(task.task_type, "test_task");
1144        assert_eq!(task.status, TaskStatus::Pending);
1145    }
1146
1147    #[test]
1148    fn test_docket_submit_with_priority() {
1149        let docket = Docket::memory();
1150
1151        let low_id = docket
1152            .submit_with_options(
1153                "task",
1154                serde_json::json!({"priority": "low"}),
1155                SubmitOptions::new().with_priority(1),
1156            )
1157            .unwrap();
1158
1159        let high_id = docket
1160            .submit_with_options(
1161                "task",
1162                serde_json::json!({"priority": "high"}),
1163                SubmitOptions::new().with_priority(10),
1164            )
1165            .unwrap();
1166
1167        // High priority should be dequeued first
1168        let worker = docket
1169            .worker()
1170            .subscribe("task", |t| async move { Ok(t.params) })
1171            .build();
1172
1173        let types = worker.subscribed_types();
1174        let dequeued = docket.backend.dequeue(&types).unwrap().unwrap();
1175        assert_eq!(dequeued.id, high_id);
1176
1177        // Ack it
1178        docket.backend.ack(&high_id, serde_json::json!({})).unwrap();
1179
1180        // Now low priority should be available
1181        let dequeued = docket.backend.dequeue(&types).unwrap().unwrap();
1182        assert_eq!(dequeued.id, low_id);
1183    }
1184
1185    #[test]
1186    fn test_docket_cancel_task() {
1187        let docket = Docket::memory();
1188
1189        let task_id = docket.submit("task", serde_json::json!({})).unwrap();
1190
1191        docket.cancel(&task_id, Some("User cancelled")).unwrap();
1192
1193        let task = docket.get_task(&task_id).unwrap().unwrap();
1194        assert_eq!(task.status, TaskStatus::Cancelled);
1195        assert_eq!(task.error, Some("User cancelled".to_string()));
1196    }
1197
1198    #[test]
1199    fn test_docket_stats() {
1200        let docket = Docket::memory();
1201
1202        docket.submit("task1", serde_json::json!({})).unwrap();
1203        docket.submit("task2", serde_json::json!({})).unwrap();
1204        let task3 = docket.submit("task3", serde_json::json!({})).unwrap();
1205        docket.cancel(&task3, None).unwrap();
1206
1207        let stats = docket.stats().unwrap();
1208        assert_eq!(stats.pending, 2);
1209        assert_eq!(stats.cancelled, 1);
1210    }
1211
1212    #[test]
1213    fn test_docket_list_tasks() {
1214        let docket = Docket::memory();
1215
1216        docket.submit("type_a", serde_json::json!({})).unwrap();
1217        docket.submit("type_b", serde_json::json!({})).unwrap();
1218        let cancelled_id = docket.submit("type_a", serde_json::json!({})).unwrap();
1219        docket.cancel(&cancelled_id, None).unwrap();
1220
1221        // All tasks
1222        let all = docket.list_tasks(None, 100).unwrap();
1223        assert_eq!(all.len(), 3);
1224
1225        // Pending only
1226        let pending = docket.list_tasks(Some(TaskStatus::Pending), 100).unwrap();
1227        assert_eq!(pending.len(), 2);
1228
1229        // Cancelled only
1230        let cancelled = docket.list_tasks(Some(TaskStatus::Cancelled), 100).unwrap();
1231        assert_eq!(cancelled.len(), 1);
1232    }
1233
1234    #[test]
1235    fn test_worker_builder() {
1236        let docket = Docket::memory();
1237
1238        let worker = docket
1239            .worker()
1240            .subscribe("type_a", |_| async { Ok(serde_json::json!({})) })
1241            .subscribe("type_b", |_| async { Ok(serde_json::json!({})) })
1242            .build();
1243
1244        let types = worker.subscribed_types();
1245        assert!(types.contains(&"type_a".to_string()));
1246        assert!(types.contains(&"type_b".to_string()));
1247    }
1248
1249    #[test]
1250    fn test_memory_backend_retry() {
1251        let settings = DocketSettings::memory().with_max_retries(2);
1252        let backend = MemoryDocketBackend::new(settings);
1253
1254        let task = DocketTask::new(
1255            TaskId::from_string("test-1"),
1256            "retry_test".to_string(),
1257            serde_json::json!({}),
1258            0,
1259            2,
1260        );
1261
1262        backend.enqueue(task).unwrap();
1263
1264        // Dequeue and nack (first failure)
1265        let task = backend
1266            .dequeue(&["retry_test".to_string()])
1267            .unwrap()
1268            .unwrap();
1269        backend.nack(&task.id, "error 1").unwrap();
1270
1271        // Should be requeued
1272        let task = backend
1273            .dequeue(&["retry_test".to_string()])
1274            .unwrap()
1275            .unwrap();
1276        assert_eq!(task.retry_count, 1);
1277        backend.nack(&task.id, "error 2").unwrap();
1278
1279        // Max retries exceeded - should be failed, not requeued
1280        let task = backend.dequeue(&["retry_test".to_string()]).unwrap();
1281        assert!(task.is_none());
1282
1283        // Verify it's marked as failed
1284        let task = backend
1285            .get_task(&TaskId::from_string("test-1"))
1286            .unwrap()
1287            .unwrap();
1288        assert_eq!(task.status, TaskStatus::Failed);
1289    }
1290
1291    #[test]
1292    fn test_docket_task_to_info() {
1293        let task = DocketTask::new(
1294            TaskId::from_string("test-info"),
1295            "test_type".to_string(),
1296            serde_json::json!({"data": 42}),
1297            5,
1298            3,
1299        );
1300
1301        let info = task.to_task_info();
1302        assert_eq!(info.id.to_string(), "test-info");
1303        assert_eq!(info.task_type, "test_type");
1304        assert_eq!(info.status, TaskStatus::Pending);
1305        assert!(info.started_at.is_none());
1306    }
1307
1308    #[test]
1309    fn test_worker_process_one() {
1310        use fastmcp_core::block_on;
1311
1312        let docket = Docket::memory();
1313
1314        // Submit a task
1315        let task_id = docket
1316            .submit("process_test", serde_json::json!({"x": 1}))
1317            .unwrap();
1318
1319        // Create worker
1320        let worker = docket
1321            .worker()
1322            .subscribe("process_test", |task| async move {
1323                let x = task.params.get("x").and_then(|v| v.as_i64()).unwrap_or(0);
1324                Ok(serde_json::json!({"result": x * 2}))
1325            })
1326            .build();
1327
1328        // Process
1329        let cx = Cx::for_testing();
1330        let processed = block_on(worker.process_one(&cx)).unwrap();
1331        assert!(processed);
1332
1333        // Verify completion
1334        let task = docket.get_task(&task_id).unwrap().unwrap();
1335        assert_eq!(task.status, TaskStatus::Completed);
1336        assert_eq!(task.result, Some(serde_json::json!({"result": 2})));
1337    }
1338
1339    #[test]
1340    fn test_worker_no_task_available() {
1341        use fastmcp_core::block_on;
1342
1343        let docket = Docket::memory();
1344
1345        let worker = docket
1346            .worker()
1347            .subscribe("empty_test", |_| async { Ok(serde_json::json!({})) })
1348            .build();
1349
1350        let cx = Cx::for_testing();
1351        let processed = block_on(worker.process_one(&cx)).unwrap();
1352        assert!(!processed);
1353    }
1354
1355    #[test]
1356    fn test_submit_options() {
1357        let opts = SubmitOptions::new()
1358            .with_priority(10)
1359            .with_max_retries(5)
1360            .with_delay(Duration::from_secs(60));
1361
1362        assert_eq!(opts.priority, 10);
1363        assert_eq!(opts.max_retries, Some(5));
1364        assert_eq!(opts.delay, Some(Duration::from_secs(60)));
1365    }
1366
1367    #[test]
1368    fn test_docket_error_display() {
1369        let errors = vec![
1370            (
1371                DocketError::NotFound("task-1".into()),
1372                "Task not found: task-1",
1373            ),
1374            (
1375                DocketError::Connection("refused".into()),
1376                "Connection error: refused",
1377            ),
1378            (DocketError::Handler("panic".into()), "Handler error: panic"),
1379            (DocketError::Cancelled, "Operation cancelled"),
1380        ];
1381
1382        for (error, expected) in errors {
1383            assert_eq!(error.to_string(), expected);
1384        }
1385    }
1386}