1use std::collections::{HashMap, VecDeque};
60use std::future::Future;
61use std::pin::Pin;
62use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
63use std::sync::{Arc, RwLock};
64use std::time::Duration;
65
66use asupersync::Cx;
67use fastmcp_core::McpError;
68use fastmcp_core::logging::{debug, info, targets, warn};
69use fastmcp_protocol::{TaskId, TaskInfo, TaskResult, TaskStatus};
70use serde::{Deserialize, Serialize};
71
72#[derive(Debug, Clone)]
78pub struct DocketSettings {
79 pub backend: DocketBackendType,
81 pub queue_prefix: String,
83 pub visibility_timeout: Duration,
85 pub default_task_timeout: Duration,
87 pub max_retries: u32,
89 pub retry_delay: Duration,
91 pub poll_interval: Duration,
93}
94
95impl Default for DocketSettings {
96 fn default() -> Self {
97 Self {
98 backend: DocketBackendType::Memory,
99 queue_prefix: "fastmcp:docket".to_string(),
100 visibility_timeout: Duration::from_secs(30),
101 default_task_timeout: Duration::from_secs(300),
102 max_retries: 3,
103 retry_delay: Duration::from_secs(1),
104 poll_interval: Duration::from_millis(100),
105 }
106 }
107}
108
109impl DocketSettings {
110 #[must_use]
112 pub fn memory() -> Self {
113 Self::default()
114 }
115
116 #[must_use]
118 pub fn redis(url: impl Into<String>) -> Self {
119 Self {
120 backend: DocketBackendType::Redis(RedisSettings {
121 url: url.into(),
122 pool_size: 10,
123 connect_timeout: Duration::from_secs(5),
124 }),
125 ..Self::default()
126 }
127 }
128
129 #[must_use]
131 pub fn with_queue_prefix(mut self, prefix: impl Into<String>) -> Self {
132 self.queue_prefix = prefix.into();
133 self
134 }
135
136 #[must_use]
138 pub fn with_visibility_timeout(mut self, timeout: Duration) -> Self {
139 self.visibility_timeout = timeout;
140 self
141 }
142
143 #[must_use]
145 pub fn with_max_retries(mut self, retries: u32) -> Self {
146 self.max_retries = retries;
147 self
148 }
149
150 #[must_use]
152 pub fn with_poll_interval(mut self, interval: Duration) -> Self {
153 self.poll_interval = interval;
154 self
155 }
156}
157
158#[derive(Debug, Clone)]
160pub enum DocketBackendType {
161 Memory,
163 Redis(RedisSettings),
165}
166
167#[derive(Debug, Clone)]
169pub struct RedisSettings {
170 pub url: String,
172 pub pool_size: usize,
174 pub connect_timeout: Duration,
176}
177
178#[derive(Debug, Clone, Serialize, Deserialize)]
184pub struct DocketTask {
185 pub id: TaskId,
187 pub task_type: String,
189 pub params: serde_json::Value,
191 pub priority: i32,
193 pub retry_count: u32,
195 pub max_retries: u32,
197 pub created_at: String,
199 pub claimed_at: Option<String>,
201 pub status: TaskStatus,
203 pub error: Option<String>,
205 pub result: Option<serde_json::Value>,
207}
208
209impl DocketTask {
210 fn new(
212 id: TaskId,
213 task_type: String,
214 params: serde_json::Value,
215 priority: i32,
216 max_retries: u32,
217 ) -> Self {
218 Self {
219 id,
220 task_type,
221 params,
222 priority,
223 retry_count: 0,
224 max_retries,
225 created_at: chrono::Utc::now().to_rfc3339(),
226 claimed_at: None,
227 status: TaskStatus::Pending,
228 error: None,
229 result: None,
230 }
231 }
232
233 #[must_use]
235 pub fn to_task_info(&self) -> TaskInfo {
236 TaskInfo {
237 id: self.id.clone(),
238 task_type: self.task_type.clone(),
239 status: self.status,
240 progress: None,
241 message: None,
242 created_at: self.created_at.clone(),
243 started_at: self.claimed_at.clone(),
244 completed_at: if self.status.is_terminal() {
245 Some(chrono::Utc::now().to_rfc3339())
246 } else {
247 None
248 },
249 error: self.error.clone(),
250 }
251 }
252
253 #[must_use]
255 pub fn to_task_result(&self) -> Option<TaskResult> {
256 if !self.status.is_terminal() {
257 return None;
258 }
259 Some(TaskResult {
260 id: self.id.clone(),
261 success: self.status == TaskStatus::Completed,
262 data: self.result.clone(),
263 error: self.error.clone(),
264 })
265 }
266}
267
268#[derive(Debug, Clone, Default)]
270pub struct SubmitOptions {
271 pub priority: i32,
273 pub max_retries: Option<u32>,
275 pub delay: Option<Duration>,
277}
278
279impl SubmitOptions {
280 #[must_use]
282 pub fn new() -> Self {
283 Self::default()
284 }
285
286 #[must_use]
288 pub fn with_priority(mut self, priority: i32) -> Self {
289 self.priority = priority;
290 self
291 }
292
293 #[must_use]
295 pub fn with_max_retries(mut self, retries: u32) -> Self {
296 self.max_retries = Some(retries);
297 self
298 }
299
300 #[must_use]
302 pub fn with_delay(mut self, delay: Duration) -> Self {
303 self.delay = Some(delay);
304 self
305 }
306}
307
308pub type DocketResult<T> = Result<T, DocketError>;
314
315#[derive(Debug)]
317pub enum DocketError {
318 NotFound(String),
320 Connection(String),
322 Serialization(String),
324 Handler(String),
326 Backend(String),
328 Cancelled,
330}
331
332impl std::fmt::Display for DocketError {
333 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
334 match self {
335 DocketError::NotFound(msg) => write!(f, "Task not found: {msg}"),
336 DocketError::Connection(msg) => write!(f, "Connection error: {msg}"),
337 DocketError::Serialization(msg) => write!(f, "Serialization error: {msg}"),
338 DocketError::Handler(msg) => write!(f, "Handler error: {msg}"),
339 DocketError::Backend(msg) => write!(f, "Backend error: {msg}"),
340 DocketError::Cancelled => write!(f, "Operation cancelled"),
341 }
342 }
343}
344
345impl std::error::Error for DocketError {}
346
347impl From<DocketError> for McpError {
348 fn from(err: DocketError) -> Self {
349 McpError::internal_error(err.to_string())
350 }
351}
352
353pub trait DocketBackend: Send + Sync {
359 fn enqueue(&self, task: DocketTask) -> DocketResult<()>;
361
362 fn dequeue(&self, task_types: &[String]) -> DocketResult<Option<DocketTask>>;
367
368 fn ack(&self, task_id: &TaskId, result: serde_json::Value) -> DocketResult<()>;
370
371 fn nack(&self, task_id: &TaskId, error: &str) -> DocketResult<()>;
373
374 fn get_task(&self, task_id: &TaskId) -> DocketResult<Option<DocketTask>>;
376
377 fn list_tasks(&self, status: Option<TaskStatus>, limit: usize)
379 -> DocketResult<Vec<DocketTask>>;
380
381 fn cancel(&self, task_id: &TaskId, reason: Option<&str>) -> DocketResult<()>;
383
384 fn stats(&self) -> DocketResult<QueueStats>;
386
387 fn requeue_stale(&self) -> DocketResult<usize>;
389}
390
391#[derive(Debug, Clone, Default)]
393pub struct QueueStats {
394 pub pending: usize,
396 pub in_progress: usize,
398 pub completed: usize,
400 pub failed: usize,
402 pub cancelled: usize,
404}
405
406pub struct MemoryDocketBackend {
415 tasks: RwLock<HashMap<TaskId, DocketTask>>,
417 pending: RwLock<VecDeque<TaskId>>,
419 settings: DocketSettings,
421}
422
423impl MemoryDocketBackend {
424 #[must_use]
426 pub fn new(settings: DocketSettings) -> Self {
427 Self {
428 tasks: RwLock::new(HashMap::new()),
429 pending: RwLock::new(VecDeque::new()),
430 settings,
431 }
432 }
433}
434
435impl DocketBackend for MemoryDocketBackend {
436 fn enqueue(&self, task: DocketTask) -> DocketResult<()> {
437 let task_id = task.id.clone();
438 let priority = task.priority;
439
440 {
441 let mut tasks = self
442 .tasks
443 .write()
444 .map_err(|e| DocketError::Backend(format!("Lock poisoned: {e}")))?;
445 tasks.insert(task_id.clone(), task);
446 }
447
448 {
449 let mut pending = self
450 .pending
451 .write()
452 .map_err(|e| DocketError::Backend(format!("Lock poisoned: {e}")))?;
453
454 let tasks = self
456 .tasks
457 .read()
458 .map_err(|e| DocketError::Backend(format!("Lock poisoned: {e}")))?;
459
460 let pos = pending
461 .iter()
462 .position(|id| tasks.get(id).is_none_or(|t| t.priority < priority))
463 .unwrap_or(pending.len());
464
465 pending.insert(pos, task_id);
466 }
467
468 Ok(())
469 }
470
471 fn dequeue(&self, task_types: &[String]) -> DocketResult<Option<DocketTask>> {
472 let mut pending = self
473 .pending
474 .write()
475 .map_err(|e| DocketError::Backend(format!("Lock poisoned: {e}")))?;
476 let mut tasks = self
477 .tasks
478 .write()
479 .map_err(|e| DocketError::Backend(format!("Lock poisoned: {e}")))?;
480
481 let pos = pending.iter().position(|id| {
483 tasks.get(id).is_some_and(|t| {
484 t.status == TaskStatus::Pending && task_types.contains(&t.task_type)
485 })
486 });
487
488 if let Some(pos) = pos {
489 let task_id = pending.remove(pos).expect("position valid");
490 if let Some(task) = tasks.get_mut(&task_id) {
491 task.status = TaskStatus::Running;
492 task.claimed_at = Some(chrono::Utc::now().to_rfc3339());
493 return Ok(Some(task.clone()));
494 }
495 }
496
497 Ok(None)
498 }
499
500 fn ack(&self, task_id: &TaskId, result: serde_json::Value) -> DocketResult<()> {
501 let mut tasks = self
502 .tasks
503 .write()
504 .map_err(|e| DocketError::Backend(format!("Lock poisoned: {e}")))?;
505
506 let task = tasks
507 .get_mut(task_id)
508 .ok_or_else(|| DocketError::NotFound(task_id.to_string()))?;
509
510 task.status = TaskStatus::Completed;
511 task.result = Some(result);
512
513 Ok(())
514 }
515
516 fn nack(&self, task_id: &TaskId, error: &str) -> DocketResult<()> {
517 let mut tasks = self
518 .tasks
519 .write()
520 .map_err(|e| DocketError::Backend(format!("Lock poisoned: {e}")))?;
521 let mut pending = self
522 .pending
523 .write()
524 .map_err(|e| DocketError::Backend(format!("Lock poisoned: {e}")))?;
525
526 let task = tasks
527 .get_mut(task_id)
528 .ok_or_else(|| DocketError::NotFound(task_id.to_string()))?;
529
530 task.retry_count += 1;
531 task.error = Some(error.to_string());
532
533 if task.retry_count >= task.max_retries {
534 task.status = TaskStatus::Failed;
536 } else {
537 task.status = TaskStatus::Pending;
539 task.claimed_at = None;
540 pending.push_back(task_id.clone());
541 }
542
543 Ok(())
544 }
545
546 fn get_task(&self, task_id: &TaskId) -> DocketResult<Option<DocketTask>> {
547 let tasks = self
548 .tasks
549 .read()
550 .map_err(|e| DocketError::Backend(format!("Lock poisoned: {e}")))?;
551 Ok(tasks.get(task_id).cloned())
552 }
553
554 fn list_tasks(
555 &self,
556 status: Option<TaskStatus>,
557 limit: usize,
558 ) -> DocketResult<Vec<DocketTask>> {
559 let tasks = self
560 .tasks
561 .read()
562 .map_err(|e| DocketError::Backend(format!("Lock poisoned: {e}")))?;
563
564 let iter = tasks
565 .values()
566 .filter(|t| status.is_none_or(|s| t.status == s));
567
568 Ok(iter.take(limit).cloned().collect())
569 }
570
571 fn cancel(&self, task_id: &TaskId, reason: Option<&str>) -> DocketResult<()> {
572 let mut tasks = self
573 .tasks
574 .write()
575 .map_err(|e| DocketError::Backend(format!("Lock poisoned: {e}")))?;
576 let mut pending = self
577 .pending
578 .write()
579 .map_err(|e| DocketError::Backend(format!("Lock poisoned: {e}")))?;
580
581 let task = tasks
582 .get_mut(task_id)
583 .ok_or_else(|| DocketError::NotFound(task_id.to_string()))?;
584
585 if task.status.is_terminal() {
586 return Err(DocketError::Backend(format!(
587 "Cannot cancel task in terminal state: {:?}",
588 task.status
589 )));
590 }
591
592 task.status = TaskStatus::Cancelled;
593 task.error = reason.map(String::from);
594
595 pending.retain(|id| id != task_id);
597
598 Ok(())
599 }
600
601 fn stats(&self) -> DocketResult<QueueStats> {
602 let tasks = self
603 .tasks
604 .read()
605 .map_err(|e| DocketError::Backend(format!("Lock poisoned: {e}")))?;
606
607 let mut stats = QueueStats::default();
608 for task in tasks.values() {
609 match task.status {
610 TaskStatus::Pending => stats.pending += 1,
611 TaskStatus::Running => stats.in_progress += 1,
612 TaskStatus::Completed => stats.completed += 1,
613 TaskStatus::Failed => stats.failed += 1,
614 TaskStatus::Cancelled => stats.cancelled += 1,
615 }
616 }
617
618 Ok(stats)
619 }
620
621 fn requeue_stale(&self) -> DocketResult<usize> {
622 let mut tasks = self
623 .tasks
624 .write()
625 .map_err(|e| DocketError::Backend(format!("Lock poisoned: {e}")))?;
626 let mut pending = self
627 .pending
628 .write()
629 .map_err(|e| DocketError::Backend(format!("Lock poisoned: {e}")))?;
630
631 let now = chrono::Utc::now();
632 let timeout = chrono::Duration::from_std(self.settings.visibility_timeout)
633 .unwrap_or_else(|_| chrono::Duration::seconds(30));
634
635 let mut requeued = 0;
636
637 for task in tasks.values_mut() {
638 if task.status != TaskStatus::Running {
639 continue;
640 }
641
642 if let Some(ref claimed_at) = task.claimed_at {
643 if let Ok(claimed) = chrono::DateTime::parse_from_rfc3339(claimed_at) {
644 if now - claimed.with_timezone(&chrono::Utc) > timeout {
645 task.status = TaskStatus::Pending;
647 task.claimed_at = None;
648 task.retry_count += 1;
649
650 if task.retry_count >= task.max_retries {
651 task.status = TaskStatus::Failed;
652 task.error = Some("Exceeded visibility timeout".to_string());
653 } else {
654 pending.push_back(task.id.clone());
655 requeued += 1;
656 }
657 }
658 }
659 }
660 }
661
662 Ok(requeued)
663 }
664}
665
666#[cfg(feature = "redis")]
675pub struct RedisDocketBackend {
676 _settings: RedisSettings,
678 _docket_settings: DocketSettings,
679}
680
681#[cfg(feature = "redis")]
682impl RedisDocketBackend {
683 pub fn new(
685 _redis_settings: RedisSettings,
686 _docket_settings: DocketSettings,
687 ) -> DocketResult<Self> {
688 Err(DocketError::Backend(
690 "Redis backend not yet implemented".to_string(),
691 ))
692 }
693}
694
695#[cfg(feature = "redis")]
696impl DocketBackend for RedisDocketBackend {
697 fn enqueue(&self, _task: DocketTask) -> DocketResult<()> {
698 Err(DocketError::Backend(
699 "Redis backend not yet implemented".to_string(),
700 ))
701 }
702
703 fn dequeue(&self, _task_types: &[String]) -> DocketResult<Option<DocketTask>> {
704 Err(DocketError::Backend(
705 "Redis backend not yet implemented".to_string(),
706 ))
707 }
708
709 fn ack(&self, _task_id: &TaskId, _result: serde_json::Value) -> DocketResult<()> {
710 Err(DocketError::Backend(
711 "Redis backend not yet implemented".to_string(),
712 ))
713 }
714
715 fn nack(&self, _task_id: &TaskId, _error: &str) -> DocketResult<()> {
716 Err(DocketError::Backend(
717 "Redis backend not yet implemented".to_string(),
718 ))
719 }
720
721 fn get_task(&self, _task_id: &TaskId) -> DocketResult<Option<DocketTask>> {
722 Err(DocketError::Backend(
723 "Redis backend not yet implemented".to_string(),
724 ))
725 }
726
727 fn list_tasks(
728 &self,
729 _status: Option<TaskStatus>,
730 _limit: usize,
731 ) -> DocketResult<Vec<DocketTask>> {
732 Err(DocketError::Backend(
733 "Redis backend not yet implemented".to_string(),
734 ))
735 }
736
737 fn cancel(&self, _task_id: &TaskId, _reason: Option<&str>) -> DocketResult<()> {
738 Err(DocketError::Backend(
739 "Redis backend not yet implemented".to_string(),
740 ))
741 }
742
743 fn stats(&self) -> DocketResult<QueueStats> {
744 Err(DocketError::Backend(
745 "Redis backend not yet implemented".to_string(),
746 ))
747 }
748
749 fn requeue_stale(&self) -> DocketResult<usize> {
750 Err(DocketError::Backend(
751 "Redis backend not yet implemented".to_string(),
752 ))
753 }
754}
755
756pub struct Docket {
764 backend: Arc<dyn DocketBackend>,
765 settings: DocketSettings,
766 task_counter: AtomicU64,
767}
768
769impl Docket {
770 pub fn new(settings: DocketSettings) -> DocketResult<Self> {
772 let backend: Arc<dyn DocketBackend> = match &settings.backend {
773 DocketBackendType::Memory => Arc::new(MemoryDocketBackend::new(settings.clone())),
774 #[cfg(feature = "redis")]
775 DocketBackendType::Redis(redis_settings) => Arc::new(RedisDocketBackend::new(
776 redis_settings.clone(),
777 settings.clone(),
778 )?),
779 #[cfg(not(feature = "redis"))]
780 DocketBackendType::Redis(_) => {
781 return Err(DocketError::Backend(
782 "Redis backend requires 'redis' feature".to_string(),
783 ));
784 }
785 };
786
787 Ok(Self {
788 backend,
789 settings,
790 task_counter: AtomicU64::new(0),
791 })
792 }
793
794 #[must_use]
796 pub fn memory() -> Self {
797 Self::new(DocketSettings::memory()).expect("memory backend always succeeds")
798 }
799
800 pub fn submit(
802 &self,
803 task_type: impl Into<String>,
804 params: serde_json::Value,
805 ) -> DocketResult<TaskId> {
806 self.submit_with_options(task_type, params, SubmitOptions::default())
807 }
808
809 pub fn submit_with_options(
811 &self,
812 task_type: impl Into<String>,
813 params: serde_json::Value,
814 options: SubmitOptions,
815 ) -> DocketResult<TaskId> {
816 let counter = self.task_counter.fetch_add(1, Ordering::SeqCst);
817 let task_id = TaskId::from_string(format!("docket-{counter:08x}"));
818
819 let max_retries = options.max_retries.unwrap_or(self.settings.max_retries);
820 let task = DocketTask::new(
821 task_id.clone(),
822 task_type.into(),
823 params,
824 options.priority,
825 max_retries,
826 );
827
828 self.backend.enqueue(task)?;
829
830 info!(
831 target: targets::SERVER,
832 "Docket: submitted task {} (type: {})",
833 task_id,
834 task_id
835 );
836
837 Ok(task_id)
838 }
839
840 pub fn get_task(&self, task_id: &TaskId) -> DocketResult<Option<DocketTask>> {
842 self.backend.get_task(task_id)
843 }
844
845 pub fn list_tasks(
847 &self,
848 status: Option<TaskStatus>,
849 limit: usize,
850 ) -> DocketResult<Vec<DocketTask>> {
851 self.backend.list_tasks(status, limit)
852 }
853
854 pub fn cancel(&self, task_id: &TaskId, reason: Option<&str>) -> DocketResult<()> {
856 self.backend.cancel(task_id, reason)
857 }
858
859 pub fn stats(&self) -> DocketResult<QueueStats> {
861 self.backend.stats()
862 }
863
864 #[must_use]
866 pub fn worker(&self) -> WorkerBuilder {
867 WorkerBuilder::new(Arc::clone(&self.backend), self.settings.clone())
868 }
869
870 #[must_use]
872 pub fn settings(&self) -> &DocketSettings {
873 &self.settings
874 }
875
876 #[must_use]
878 pub fn into_shared(self) -> SharedDocket {
879 Arc::new(self)
880 }
881}
882
883impl std::fmt::Debug for Docket {
884 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
885 f.debug_struct("Docket")
886 .field("settings", &self.settings)
887 .field("task_counter", &self.task_counter.load(Ordering::SeqCst))
888 .finish_non_exhaustive()
889 }
890}
891
892pub type SharedDocket = Arc<Docket>;
894
895pub type TaskHandlerFn = Box<
901 dyn Fn(DocketTask) -> Pin<Box<dyn Future<Output = DocketResult<serde_json::Value>> + Send>>
902 + Send
903 + Sync,
904>;
905
906pub struct WorkerBuilder {
908 backend: Arc<dyn DocketBackend>,
909 settings: DocketSettings,
910 handlers: HashMap<String, TaskHandlerFn>,
911}
912
913impl WorkerBuilder {
914 fn new(backend: Arc<dyn DocketBackend>, settings: DocketSettings) -> Self {
915 Self {
916 backend,
917 settings,
918 handlers: HashMap::new(),
919 }
920 }
921
922 pub fn subscribe<F, Fut>(mut self, task_type: impl Into<String>, handler: F) -> Self
924 where
925 F: Fn(DocketTask) -> Fut + Send + Sync + 'static,
926 Fut: Future<Output = DocketResult<serde_json::Value>> + Send + 'static,
927 {
928 let task_type = task_type.into();
929 let boxed: TaskHandlerFn = Box::new(move |task| Box::pin(handler(task)));
930 self.handlers.insert(task_type, boxed);
931 self
932 }
933
934 #[must_use]
936 pub fn build(self) -> Worker {
937 Worker {
938 backend: self.backend,
939 settings: self.settings,
940 handlers: Arc::new(self.handlers),
941 running: Arc::new(AtomicBool::new(false)),
942 }
943 }
944}
945
946pub struct Worker {
948 backend: Arc<dyn DocketBackend>,
949 settings: DocketSettings,
950 handlers: Arc<HashMap<String, TaskHandlerFn>>,
951 running: Arc<AtomicBool>,
952}
953
954impl Worker {
955 #[must_use]
957 pub fn subscribed_types(&self) -> Vec<String> {
958 self.handlers.keys().cloned().collect()
959 }
960
961 #[must_use]
963 pub fn is_running(&self) -> bool {
964 self.running.load(Ordering::SeqCst)
965 }
966
967 pub fn stop(&self) {
969 self.running.store(false, Ordering::SeqCst);
970 }
971
972 pub async fn process_one(&self, cx: &Cx) -> DocketResult<bool> {
976 let task_types = self.subscribed_types();
977
978 if cx.is_cancel_requested() {
980 return Err(DocketError::Cancelled);
981 }
982
983 let Some(task) = self.backend.dequeue(&task_types)? else {
985 return Ok(false);
986 };
987
988 let task_id = task.id.clone();
989 let task_type = task.task_type.clone();
990
991 debug!(
992 target: targets::SERVER,
993 "Docket worker: processing task {} (type: {})",
994 task_id,
995 task_type
996 );
997
998 let Some(handler) = self.handlers.get(&task_type) else {
1000 self.backend.nack(&task_id, "No handler for task type")?;
1002 return Ok(true);
1003 };
1004
1005 let result = handler(task).await;
1007
1008 match result {
1009 Ok(data) => {
1010 self.backend.ack(&task_id, data)?;
1011 info!(
1012 target: targets::SERVER,
1013 "Docket worker: completed task {}",
1014 task_id
1015 );
1016 }
1017 Err(e) => {
1018 let error_msg = e.to_string();
1019 self.backend.nack(&task_id, &error_msg)?;
1020 warn!(
1021 target: targets::SERVER,
1022 "Docket worker: task {} failed: {}",
1023 task_id,
1024 error_msg
1025 );
1026 }
1027 }
1028
1029 Ok(true)
1030 }
1031
1032 pub async fn run(&self, cx: &Cx) -> DocketResult<()> {
1034 self.running.store(true, Ordering::SeqCst);
1035
1036 info!(
1037 target: targets::SERVER,
1038 "Docket worker starting with subscriptions: {:?}",
1039 self.subscribed_types()
1040 );
1041
1042 while self.running.load(Ordering::SeqCst) {
1043 if cx.is_cancel_requested() {
1045 break;
1046 }
1047
1048 let _ = self.backend.requeue_stale();
1050
1051 match self.process_one(cx).await {
1053 Ok(true) => {
1054 continue;
1056 }
1057 Ok(false) => {
1058 std::thread::sleep(self.settings.poll_interval);
1060 }
1061 Err(DocketError::Cancelled) => {
1062 break;
1063 }
1064 Err(e) => {
1065 warn!(
1066 target: targets::SERVER,
1067 "Docket worker error: {}",
1068 e
1069 );
1070 std::thread::sleep(Duration::from_millis(100));
1072 }
1073 }
1074 }
1075
1076 self.running.store(false, Ordering::SeqCst);
1077 info!(target: targets::SERVER, "Docket worker stopped");
1078
1079 Ok(())
1080 }
1081}
1082
1083impl std::fmt::Debug for Worker {
1084 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1085 f.debug_struct("Worker")
1086 .field("subscribed_types", &self.subscribed_types())
1087 .field("running", &self.is_running())
1088 .finish_non_exhaustive()
1089 }
1090}
1091
1092#[cfg(test)]
1097mod tests {
1098 use super::*;
1099
1100 #[test]
1101 fn test_docket_settings_default() {
1102 let settings = DocketSettings::default();
1103 assert!(matches!(settings.backend, DocketBackendType::Memory));
1104 assert_eq!(settings.max_retries, 3);
1105 }
1106
1107 #[test]
1108 fn test_docket_settings_redis() {
1109 let settings = DocketSettings::redis("redis://localhost:6379");
1110 assert!(matches!(settings.backend, DocketBackendType::Redis(_)));
1111 }
1112
1113 #[test]
1114 fn test_docket_settings_builder() {
1115 let settings = DocketSettings::memory()
1116 .with_queue_prefix("test:queue")
1117 .with_max_retries(5)
1118 .with_poll_interval(Duration::from_millis(50));
1119
1120 assert_eq!(settings.queue_prefix, "test:queue");
1121 assert_eq!(settings.max_retries, 5);
1122 assert_eq!(settings.poll_interval, Duration::from_millis(50));
1123 }
1124
1125 #[test]
1126 fn test_docket_memory_creation() {
1127 let docket = Docket::memory();
1128 assert!(matches!(docket.settings.backend, DocketBackendType::Memory));
1129 }
1130
1131 #[test]
1132 fn test_docket_submit_task() {
1133 let docket = Docket::memory();
1134
1135 let task_id = docket
1136 .submit("test_task", serde_json::json!({"key": "value"}))
1137 .unwrap();
1138
1139 assert!(task_id.to_string().starts_with("docket-"));
1140
1141 let task = docket.get_task(&task_id).unwrap().unwrap();
1143 assert_eq!(task.task_type, "test_task");
1144 assert_eq!(task.status, TaskStatus::Pending);
1145 }
1146
1147 #[test]
1148 fn test_docket_submit_with_priority() {
1149 let docket = Docket::memory();
1150
1151 let low_id = docket
1152 .submit_with_options(
1153 "task",
1154 serde_json::json!({"priority": "low"}),
1155 SubmitOptions::new().with_priority(1),
1156 )
1157 .unwrap();
1158
1159 let high_id = docket
1160 .submit_with_options(
1161 "task",
1162 serde_json::json!({"priority": "high"}),
1163 SubmitOptions::new().with_priority(10),
1164 )
1165 .unwrap();
1166
1167 let worker = docket
1169 .worker()
1170 .subscribe("task", |t| async move { Ok(t.params) })
1171 .build();
1172
1173 let types = worker.subscribed_types();
1174 let dequeued = docket.backend.dequeue(&types).unwrap().unwrap();
1175 assert_eq!(dequeued.id, high_id);
1176
1177 docket.backend.ack(&high_id, serde_json::json!({})).unwrap();
1179
1180 let dequeued = docket.backend.dequeue(&types).unwrap().unwrap();
1182 assert_eq!(dequeued.id, low_id);
1183 }
1184
1185 #[test]
1186 fn test_docket_cancel_task() {
1187 let docket = Docket::memory();
1188
1189 let task_id = docket.submit("task", serde_json::json!({})).unwrap();
1190
1191 docket.cancel(&task_id, Some("User cancelled")).unwrap();
1192
1193 let task = docket.get_task(&task_id).unwrap().unwrap();
1194 assert_eq!(task.status, TaskStatus::Cancelled);
1195 assert_eq!(task.error, Some("User cancelled".to_string()));
1196 }
1197
1198 #[test]
1199 fn test_docket_stats() {
1200 let docket = Docket::memory();
1201
1202 docket.submit("task1", serde_json::json!({})).unwrap();
1203 docket.submit("task2", serde_json::json!({})).unwrap();
1204 let task3 = docket.submit("task3", serde_json::json!({})).unwrap();
1205 docket.cancel(&task3, None).unwrap();
1206
1207 let stats = docket.stats().unwrap();
1208 assert_eq!(stats.pending, 2);
1209 assert_eq!(stats.cancelled, 1);
1210 }
1211
1212 #[test]
1213 fn test_docket_list_tasks() {
1214 let docket = Docket::memory();
1215
1216 docket.submit("type_a", serde_json::json!({})).unwrap();
1217 docket.submit("type_b", serde_json::json!({})).unwrap();
1218 let cancelled_id = docket.submit("type_a", serde_json::json!({})).unwrap();
1219 docket.cancel(&cancelled_id, None).unwrap();
1220
1221 let all = docket.list_tasks(None, 100).unwrap();
1223 assert_eq!(all.len(), 3);
1224
1225 let pending = docket.list_tasks(Some(TaskStatus::Pending), 100).unwrap();
1227 assert_eq!(pending.len(), 2);
1228
1229 let cancelled = docket.list_tasks(Some(TaskStatus::Cancelled), 100).unwrap();
1231 assert_eq!(cancelled.len(), 1);
1232 }
1233
1234 #[test]
1235 fn test_worker_builder() {
1236 let docket = Docket::memory();
1237
1238 let worker = docket
1239 .worker()
1240 .subscribe("type_a", |_| async { Ok(serde_json::json!({})) })
1241 .subscribe("type_b", |_| async { Ok(serde_json::json!({})) })
1242 .build();
1243
1244 let types = worker.subscribed_types();
1245 assert!(types.contains(&"type_a".to_string()));
1246 assert!(types.contains(&"type_b".to_string()));
1247 }
1248
1249 #[test]
1250 fn test_memory_backend_retry() {
1251 let settings = DocketSettings::memory().with_max_retries(2);
1252 let backend = MemoryDocketBackend::new(settings);
1253
1254 let task = DocketTask::new(
1255 TaskId::from_string("test-1"),
1256 "retry_test".to_string(),
1257 serde_json::json!({}),
1258 0,
1259 2,
1260 );
1261
1262 backend.enqueue(task).unwrap();
1263
1264 let task = backend
1266 .dequeue(&["retry_test".to_string()])
1267 .unwrap()
1268 .unwrap();
1269 backend.nack(&task.id, "error 1").unwrap();
1270
1271 let task = backend
1273 .dequeue(&["retry_test".to_string()])
1274 .unwrap()
1275 .unwrap();
1276 assert_eq!(task.retry_count, 1);
1277 backend.nack(&task.id, "error 2").unwrap();
1278
1279 let task = backend.dequeue(&["retry_test".to_string()]).unwrap();
1281 assert!(task.is_none());
1282
1283 let task = backend
1285 .get_task(&TaskId::from_string("test-1"))
1286 .unwrap()
1287 .unwrap();
1288 assert_eq!(task.status, TaskStatus::Failed);
1289 }
1290
1291 #[test]
1292 fn test_docket_task_to_info() {
1293 let task = DocketTask::new(
1294 TaskId::from_string("test-info"),
1295 "test_type".to_string(),
1296 serde_json::json!({"data": 42}),
1297 5,
1298 3,
1299 );
1300
1301 let info = task.to_task_info();
1302 assert_eq!(info.id.to_string(), "test-info");
1303 assert_eq!(info.task_type, "test_type");
1304 assert_eq!(info.status, TaskStatus::Pending);
1305 assert!(info.started_at.is_none());
1306 }
1307
1308 #[test]
1309 fn test_worker_process_one() {
1310 use fastmcp_core::block_on;
1311
1312 let docket = Docket::memory();
1313
1314 let task_id = docket
1316 .submit("process_test", serde_json::json!({"x": 1}))
1317 .unwrap();
1318
1319 let worker = docket
1321 .worker()
1322 .subscribe("process_test", |task| async move {
1323 let x = task.params.get("x").and_then(|v| v.as_i64()).unwrap_or(0);
1324 Ok(serde_json::json!({"result": x * 2}))
1325 })
1326 .build();
1327
1328 let cx = Cx::for_testing();
1330 let processed = block_on(worker.process_one(&cx)).unwrap();
1331 assert!(processed);
1332
1333 let task = docket.get_task(&task_id).unwrap().unwrap();
1335 assert_eq!(task.status, TaskStatus::Completed);
1336 assert_eq!(task.result, Some(serde_json::json!({"result": 2})));
1337 }
1338
1339 #[test]
1340 fn test_worker_no_task_available() {
1341 use fastmcp_core::block_on;
1342
1343 let docket = Docket::memory();
1344
1345 let worker = docket
1346 .worker()
1347 .subscribe("empty_test", |_| async { Ok(serde_json::json!({})) })
1348 .build();
1349
1350 let cx = Cx::for_testing();
1351 let processed = block_on(worker.process_one(&cx)).unwrap();
1352 assert!(!processed);
1353 }
1354
1355 #[test]
1356 fn test_submit_options() {
1357 let opts = SubmitOptions::new()
1358 .with_priority(10)
1359 .with_max_retries(5)
1360 .with_delay(Duration::from_secs(60));
1361
1362 assert_eq!(opts.priority, 10);
1363 assert_eq!(opts.max_retries, Some(5));
1364 assert_eq!(opts.delay, Some(Duration::from_secs(60)));
1365 }
1366
1367 #[test]
1368 fn test_docket_error_display() {
1369 let errors = vec![
1370 (
1371 DocketError::NotFound("task-1".into()),
1372 "Task not found: task-1",
1373 ),
1374 (
1375 DocketError::Connection("refused".into()),
1376 "Connection error: refused",
1377 ),
1378 (DocketError::Handler("panic".into()), "Handler error: panic"),
1379 (DocketError::Cancelled, "Operation cancelled"),
1380 ];
1381
1382 for (error, expected) in errors {
1383 assert_eq!(error.to_string(), expected);
1384 }
1385 }
1386}