1use std::collections::{HashMap, VecDeque};
60use std::future::Future;
61use std::pin::Pin;
62use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
63use std::sync::{Arc, RwLock};
64use std::time::Duration;
65
66use asupersync::Cx;
67use fastmcp_core::McpError;
68use fastmcp_core::logging::{debug, info, targets, warn};
69use fastmcp_protocol::{TaskId, TaskInfo, TaskResult, TaskStatus};
70use serde::{Deserialize, Serialize};
71
72#[derive(Debug, Clone)]
78pub struct DocketSettings {
79 pub backend: DocketBackendType,
81 pub queue_prefix: String,
83 pub visibility_timeout: Duration,
85 pub default_task_timeout: Duration,
87 pub max_retries: u32,
89 pub retry_delay: Duration,
91 pub poll_interval: Duration,
93}
94
95impl Default for DocketSettings {
96 fn default() -> Self {
97 Self {
98 backend: DocketBackendType::Memory,
99 queue_prefix: "fastmcp:docket".to_string(),
100 visibility_timeout: Duration::from_secs(30),
101 default_task_timeout: Duration::from_secs(300),
102 max_retries: 3,
103 retry_delay: Duration::from_secs(1),
104 poll_interval: Duration::from_millis(100),
105 }
106 }
107}
108
109impl DocketSettings {
110 #[must_use]
112 pub fn memory() -> Self {
113 Self::default()
114 }
115
116 #[must_use]
118 pub fn redis(url: impl Into<String>) -> Self {
119 Self {
120 backend: DocketBackendType::Redis(RedisSettings {
121 url: url.into(),
122 pool_size: 10,
123 connect_timeout: Duration::from_secs(5),
124 }),
125 ..Self::default()
126 }
127 }
128
129 #[must_use]
131 pub fn with_queue_prefix(mut self, prefix: impl Into<String>) -> Self {
132 self.queue_prefix = prefix.into();
133 self
134 }
135
136 #[must_use]
138 pub fn with_visibility_timeout(mut self, timeout: Duration) -> Self {
139 self.visibility_timeout = timeout;
140 self
141 }
142
143 #[must_use]
145 pub fn with_max_retries(mut self, retries: u32) -> Self {
146 self.max_retries = retries;
147 self
148 }
149
150 #[must_use]
152 pub fn with_poll_interval(mut self, interval: Duration) -> Self {
153 self.poll_interval = interval;
154 self
155 }
156}
157
158#[derive(Debug, Clone)]
160pub enum DocketBackendType {
161 Memory,
163 Redis(RedisSettings),
165}
166
167#[derive(Debug, Clone)]
169pub struct RedisSettings {
170 pub url: String,
172 pub pool_size: usize,
174 pub connect_timeout: Duration,
176}
177
178#[derive(Debug, Clone, Serialize, Deserialize)]
184pub struct DocketTask {
185 pub id: TaskId,
187 pub task_type: String,
189 pub params: serde_json::Value,
191 pub priority: i32,
193 pub retry_count: u32,
195 pub max_retries: u32,
197 pub created_at: String,
199 pub claimed_at: Option<String>,
201 pub status: TaskStatus,
203 pub error: Option<String>,
205 pub result: Option<serde_json::Value>,
207}
208
209impl DocketTask {
210 fn new(
212 id: TaskId,
213 task_type: String,
214 params: serde_json::Value,
215 priority: i32,
216 max_retries: u32,
217 ) -> Self {
218 Self {
219 id,
220 task_type,
221 params,
222 priority,
223 retry_count: 0,
224 max_retries,
225 created_at: chrono::Utc::now().to_rfc3339(),
226 claimed_at: None,
227 status: TaskStatus::Pending,
228 error: None,
229 result: None,
230 }
231 }
232
233 #[must_use]
235 pub fn to_task_info(&self) -> TaskInfo {
236 TaskInfo {
237 id: self.id.clone(),
238 task_type: self.task_type.clone(),
239 status: self.status,
240 progress: None,
241 message: None,
242 created_at: self.created_at.clone(),
243 started_at: self.claimed_at.clone(),
244 completed_at: if self.status.is_terminal() {
245 Some(chrono::Utc::now().to_rfc3339())
246 } else {
247 None
248 },
249 error: self.error.clone(),
250 }
251 }
252
253 #[must_use]
255 pub fn to_task_result(&self) -> Option<TaskResult> {
256 if !self.status.is_terminal() {
257 return None;
258 }
259 Some(TaskResult {
260 id: self.id.clone(),
261 success: self.status == TaskStatus::Completed,
262 data: self.result.clone(),
263 error: self.error.clone(),
264 })
265 }
266}
267
268#[derive(Debug, Clone, Default)]
270pub struct SubmitOptions {
271 pub priority: i32,
273 pub max_retries: Option<u32>,
275 pub delay: Option<Duration>,
277}
278
279impl SubmitOptions {
280 #[must_use]
282 pub fn new() -> Self {
283 Self::default()
284 }
285
286 #[must_use]
288 pub fn with_priority(mut self, priority: i32) -> Self {
289 self.priority = priority;
290 self
291 }
292
293 #[must_use]
295 pub fn with_max_retries(mut self, retries: u32) -> Self {
296 self.max_retries = Some(retries);
297 self
298 }
299
300 #[must_use]
302 pub fn with_delay(mut self, delay: Duration) -> Self {
303 self.delay = Some(delay);
304 self
305 }
306}
307
308pub type DocketResult<T> = Result<T, DocketError>;
314
315#[derive(Debug)]
317pub enum DocketError {
318 NotFound(String),
320 Connection(String),
322 Serialization(String),
324 Handler(String),
326 Backend(String),
328 Cancelled,
330}
331
332impl std::fmt::Display for DocketError {
333 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
334 match self {
335 DocketError::NotFound(msg) => write!(f, "Task not found: {msg}"),
336 DocketError::Connection(msg) => write!(f, "Connection error: {msg}"),
337 DocketError::Serialization(msg) => write!(f, "Serialization error: {msg}"),
338 DocketError::Handler(msg) => write!(f, "Handler error: {msg}"),
339 DocketError::Backend(msg) => write!(f, "Backend error: {msg}"),
340 DocketError::Cancelled => write!(f, "Operation cancelled"),
341 }
342 }
343}
344
345impl std::error::Error for DocketError {}
346
347impl From<DocketError> for McpError {
348 fn from(err: DocketError) -> Self {
349 McpError::internal_error(err.to_string())
350 }
351}
352
353pub trait DocketBackend: Send + Sync {
359 fn enqueue(&self, task: DocketTask) -> DocketResult<()>;
361
362 fn dequeue(&self, task_types: &[String]) -> DocketResult<Option<DocketTask>>;
367
368 fn ack(&self, task_id: &TaskId, result: serde_json::Value) -> DocketResult<()>;
370
371 fn nack(&self, task_id: &TaskId, error: &str) -> DocketResult<()>;
373
374 fn get_task(&self, task_id: &TaskId) -> DocketResult<Option<DocketTask>>;
376
377 fn list_tasks(&self, status: Option<TaskStatus>, limit: usize)
379 -> DocketResult<Vec<DocketTask>>;
380
381 fn cancel(&self, task_id: &TaskId, reason: Option<&str>) -> DocketResult<()>;
383
384 fn stats(&self) -> DocketResult<QueueStats>;
386
387 fn requeue_stale(&self) -> DocketResult<usize>;
389}
390
391#[derive(Debug, Clone, Default)]
393pub struct QueueStats {
394 pub pending: usize,
396 pub in_progress: usize,
398 pub completed: usize,
400 pub failed: usize,
402 pub cancelled: usize,
404}
405
406pub struct MemoryDocketBackend {
415 tasks: RwLock<HashMap<TaskId, DocketTask>>,
417 pending: RwLock<VecDeque<TaskId>>,
419 settings: DocketSettings,
421}
422
423impl MemoryDocketBackend {
424 #[must_use]
426 pub fn new(settings: DocketSettings) -> Self {
427 Self {
428 tasks: RwLock::new(HashMap::new()),
429 pending: RwLock::new(VecDeque::new()),
430 settings,
431 }
432 }
433}
434
435impl DocketBackend for MemoryDocketBackend {
436 fn enqueue(&self, task: DocketTask) -> DocketResult<()> {
437 let task_id = task.id.clone();
438 let priority = task.priority;
439
440 {
441 let mut tasks = self
442 .tasks
443 .write()
444 .map_err(|e| DocketError::Backend(format!("Lock poisoned: {e}")))?;
445 tasks.insert(task_id.clone(), task);
446 }
447
448 {
449 let mut pending = self
450 .pending
451 .write()
452 .map_err(|e| DocketError::Backend(format!("Lock poisoned: {e}")))?;
453
454 let tasks = self
456 .tasks
457 .read()
458 .map_err(|e| DocketError::Backend(format!("Lock poisoned: {e}")))?;
459
460 let pos = pending
461 .iter()
462 .position(|id| tasks.get(id).is_none_or(|t| t.priority < priority))
463 .unwrap_or(pending.len());
464
465 pending.insert(pos, task_id);
466 }
467
468 Ok(())
469 }
470
471 fn dequeue(&self, task_types: &[String]) -> DocketResult<Option<DocketTask>> {
472 let mut pending = self
473 .pending
474 .write()
475 .map_err(|e| DocketError::Backend(format!("Lock poisoned: {e}")))?;
476 let mut tasks = self
477 .tasks
478 .write()
479 .map_err(|e| DocketError::Backend(format!("Lock poisoned: {e}")))?;
480
481 let pos = pending.iter().position(|id| {
483 tasks.get(id).is_some_and(|t| {
484 t.status == TaskStatus::Pending && task_types.contains(&t.task_type)
485 })
486 });
487
488 if let Some(pos) = pos {
489 let task_id = pending.remove(pos).expect("position valid");
490 if let Some(task) = tasks.get_mut(&task_id) {
491 task.status = TaskStatus::Running;
492 task.claimed_at = Some(chrono::Utc::now().to_rfc3339());
493 return Ok(Some(task.clone()));
494 }
495 }
496
497 Ok(None)
498 }
499
500 fn ack(&self, task_id: &TaskId, result: serde_json::Value) -> DocketResult<()> {
501 let mut tasks = self
502 .tasks
503 .write()
504 .map_err(|e| DocketError::Backend(format!("Lock poisoned: {e}")))?;
505
506 let task = tasks
507 .get_mut(task_id)
508 .ok_or_else(|| DocketError::NotFound(task_id.to_string()))?;
509
510 task.status = TaskStatus::Completed;
511 task.result = Some(result);
512
513 Ok(())
514 }
515
516 fn nack(&self, task_id: &TaskId, error: &str) -> DocketResult<()> {
517 let mut tasks = self
518 .tasks
519 .write()
520 .map_err(|e| DocketError::Backend(format!("Lock poisoned: {e}")))?;
521 let mut pending = self
522 .pending
523 .write()
524 .map_err(|e| DocketError::Backend(format!("Lock poisoned: {e}")))?;
525
526 let task = tasks
527 .get_mut(task_id)
528 .ok_or_else(|| DocketError::NotFound(task_id.to_string()))?;
529
530 task.retry_count += 1;
531 task.error = Some(error.to_string());
532
533 if task.retry_count >= task.max_retries {
534 task.status = TaskStatus::Failed;
536 } else {
537 task.status = TaskStatus::Pending;
539 task.claimed_at = None;
540 pending.push_back(task_id.clone());
541 }
542
543 Ok(())
544 }
545
546 fn get_task(&self, task_id: &TaskId) -> DocketResult<Option<DocketTask>> {
547 let tasks = self
548 .tasks
549 .read()
550 .map_err(|e| DocketError::Backend(format!("Lock poisoned: {e}")))?;
551 Ok(tasks.get(task_id).cloned())
552 }
553
554 fn list_tasks(
555 &self,
556 status: Option<TaskStatus>,
557 limit: usize,
558 ) -> DocketResult<Vec<DocketTask>> {
559 let tasks = self
560 .tasks
561 .read()
562 .map_err(|e| DocketError::Backend(format!("Lock poisoned: {e}")))?;
563
564 let iter = tasks
565 .values()
566 .filter(|t| status.is_none_or(|s| t.status == s));
567
568 Ok(iter.take(limit).cloned().collect())
569 }
570
571 fn cancel(&self, task_id: &TaskId, reason: Option<&str>) -> DocketResult<()> {
572 let mut tasks = self
573 .tasks
574 .write()
575 .map_err(|e| DocketError::Backend(format!("Lock poisoned: {e}")))?;
576 let mut pending = self
577 .pending
578 .write()
579 .map_err(|e| DocketError::Backend(format!("Lock poisoned: {e}")))?;
580
581 let task = tasks
582 .get_mut(task_id)
583 .ok_or_else(|| DocketError::NotFound(task_id.to_string()))?;
584
585 if task.status.is_terminal() {
586 return Err(DocketError::Backend(format!(
587 "Cannot cancel task in terminal state: {:?}",
588 task.status
589 )));
590 }
591
592 task.status = TaskStatus::Cancelled;
593 task.error = reason.map(String::from);
594
595 pending.retain(|id| id != task_id);
597
598 Ok(())
599 }
600
601 fn stats(&self) -> DocketResult<QueueStats> {
602 let tasks = self
603 .tasks
604 .read()
605 .map_err(|e| DocketError::Backend(format!("Lock poisoned: {e}")))?;
606
607 let mut stats = QueueStats::default();
608 for task in tasks.values() {
609 match task.status {
610 TaskStatus::Pending => stats.pending += 1,
611 TaskStatus::Running => stats.in_progress += 1,
612 TaskStatus::Completed => stats.completed += 1,
613 TaskStatus::Failed => stats.failed += 1,
614 TaskStatus::Cancelled => stats.cancelled += 1,
615 }
616 }
617
618 Ok(stats)
619 }
620
621 fn requeue_stale(&self) -> DocketResult<usize> {
622 let mut tasks = self
623 .tasks
624 .write()
625 .map_err(|e| DocketError::Backend(format!("Lock poisoned: {e}")))?;
626 let mut pending = self
627 .pending
628 .write()
629 .map_err(|e| DocketError::Backend(format!("Lock poisoned: {e}")))?;
630
631 let now = chrono::Utc::now();
632 let timeout = chrono::Duration::from_std(self.settings.visibility_timeout)
633 .unwrap_or_else(|_| chrono::Duration::seconds(30));
634
635 let mut requeued = 0;
636
637 for task in tasks.values_mut() {
638 if task.status != TaskStatus::Running {
639 continue;
640 }
641
642 if let Some(ref claimed_at) = task.claimed_at {
643 if let Ok(claimed) = chrono::DateTime::parse_from_rfc3339(claimed_at) {
644 if now - claimed.with_timezone(&chrono::Utc) > timeout {
645 task.status = TaskStatus::Pending;
647 task.claimed_at = None;
648 task.retry_count += 1;
649
650 if task.retry_count >= task.max_retries {
651 task.status = TaskStatus::Failed;
652 task.error = Some("Exceeded visibility timeout".to_string());
653 } else {
654 pending.push_back(task.id.clone());
655 requeued += 1;
656 }
657 }
658 }
659 }
660 }
661
662 Ok(requeued)
663 }
664}
665
666#[cfg(feature = "redis")]
675pub struct RedisDocketBackend {
676 client: redis::Client,
677 pool: Vec<std::sync::Mutex<redis::Connection>>,
678 next_conn: std::sync::atomic::AtomicUsize,
679 settings: RedisSettings,
680 docket_settings: DocketSettings,
681}
682
683#[cfg(feature = "redis")]
684impl RedisDocketBackend {
685 pub fn new(
687 redis_settings: RedisSettings,
688 docket_settings: DocketSettings,
689 ) -> DocketResult<Self> {
690 let client = redis::Client::open(redis_settings.url.as_str())
691 .map_err(|e| DocketError::Backend(format!("Redis client init failed: {e}")))?;
692
693 let mut pool = Vec::new();
694 let pool_size = redis_settings.pool_size.max(1);
695 for _ in 0..pool_size {
696 let conn = client
697 .get_connection()
698 .map_err(|e| DocketError::Backend(format!("Redis connect failed: {e}")))?;
699 pool.push(std::sync::Mutex::new(conn));
700 }
701
702 Ok(Self {
703 client,
704 pool,
705 next_conn: std::sync::atomic::AtomicUsize::new(0),
706 settings: redis_settings,
707 docket_settings,
708 })
709 }
710
711 fn key_tasks(&self) -> String {
712 format!("{}:tasks", self.docket_settings.queue_prefix)
713 }
714
715 fn key_running(&self) -> String {
716 format!("{}:running", self.docket_settings.queue_prefix)
717 }
718
719 fn key_types(&self) -> String {
720 format!("{}:types", self.docket_settings.queue_prefix)
721 }
722
723 fn key_queue_member(&self) -> String {
724 format!("{}:queue_member", self.docket_settings.queue_prefix)
726 }
727
728 fn key_queue_type(&self) -> String {
729 format!("{}:queue_type", self.docket_settings.queue_prefix)
731 }
732
733 fn key_pending(&self, task_type: &str) -> String {
734 format!("{}:pending:{task_type}", self.docket_settings.queue_prefix)
735 }
736
737 fn key_delayed(&self, task_type: &str) -> String {
738 format!("{}:delayed:{task_type}", self.docket_settings.queue_prefix)
739 }
740
741 fn now_ms() -> i64 {
742 chrono::Utc::now().timestamp_millis()
743 }
744
745 fn now_rfc3339() -> String {
746 chrono::Utc::now().to_rfc3339()
747 }
748
749 fn encode_member(task: &DocketTask) -> String {
750 let created_ms = chrono::DateTime::parse_from_rfc3339(&task.created_at)
751 .map(|dt| dt.timestamp_millis())
752 .unwrap_or_else(|_| chrono::Utc::now().timestamp_millis());
753 let prio_key: i64 = (i32::MAX as i64) - (task.priority as i64);
754 format!("{prio_key:010}:{created_ms:013}:{}", task.id.0)
755 }
756
757 fn retry_delay_ms(&self, retry_count: u32) -> i64 {
758 let base = self
759 .docket_settings
760 .retry_delay
761 .as_millis()
762 .min(i64::MAX as u128) as i64;
763 if base <= 0 {
764 return 0;
765 }
766 let exp = retry_count.saturating_sub(1).min(30);
767 let factor: i64 = 1i64.checked_shl(exp).unwrap_or(i64::MAX);
768 base.saturating_mul(factor)
769 }
770
771 fn with_conn<T>(
772 &self,
773 f: impl FnOnce(&mut redis::Connection) -> redis::RedisResult<T>,
774 ) -> DocketResult<T> {
775 let idx = self
776 .next_conn
777 .fetch_add(1, std::sync::atomic::Ordering::Relaxed)
778 % self.pool.len();
779 let mut guard = self.pool[idx]
780 .lock()
781 .unwrap_or_else(std::sync::PoisonError::into_inner);
782 f(&mut guard).map_err(|e| DocketError::Backend(format!("Redis error: {e}")))
783 }
784}
785
786#[cfg(feature = "redis")]
787impl DocketBackend for RedisDocketBackend {
788 fn enqueue(&self, task: DocketTask) -> DocketResult<()> {
789 let task_id = task.id.0.clone();
790 let task_type = task.task_type.clone();
791 let member = Self::encode_member(&task);
792 let json = serde_json::to_string(&task)
793 .map_err(|e| DocketError::Backend(format!("Task serialize failed: {e}")))?;
794
795 let tasks_key = self.key_tasks();
796 let types_key = self.key_types();
797 let member_key = self.key_queue_member();
798 let type_key = self.key_queue_type();
799 let pending_key = self.key_pending(&task_type);
800
801 self.with_conn(|conn| {
802 redis::pipe()
803 .atomic()
804 .cmd("HSET")
805 .arg(&tasks_key)
806 .arg(&task_id)
807 .arg(&json)
808 .ignore()
809 .cmd("SADD")
810 .arg(&types_key)
811 .arg(&task_type)
812 .ignore()
813 .cmd("HSET")
814 .arg(&member_key)
815 .arg(&task_id)
816 .arg(&member)
817 .ignore()
818 .cmd("HSET")
819 .arg(&type_key)
820 .arg(&task_id)
821 .arg(&task_type)
822 .ignore()
823 .cmd("ZADD")
824 .arg(&pending_key)
825 .arg(0)
826 .arg(&member)
827 .ignore()
828 .query::<()>(conn)
829 })?;
830
831 Ok(())
832 }
833
834 fn dequeue(&self, task_types: &[String]) -> DocketResult<Option<DocketTask>> {
835 if task_types.is_empty() {
836 return Ok(None);
837 }
838
839 let _ = self.requeue_stale();
841
842 let mut pending_keys: Vec<String> =
843 task_types.iter().map(|t| self.key_pending(t)).collect();
844 let tasks_key = self.key_tasks();
845 let running_key = self.key_running();
846 let member_key = self.key_queue_member();
847
848 pending_keys.push(tasks_key.clone());
850 pending_keys.push(running_key.clone());
851 pending_keys.push(member_key.clone());
852
853 let now_rfc = Self::now_rfc3339();
854 let now_ms = Self::now_ms();
855
856 const LUA: &str = r#"
858local tasks_key = KEYS[#KEYS-2]
859local running_key = KEYS[#KEYS-1]
860local member_key = KEYS[#KEYS]
861
862local now_rfc = ARGV[1]
863local now_ms = tonumber(ARGV[2])
864
865local best = nil
866local best_key = nil
867for i=1,(#KEYS-3) do
868 local k = KEYS[i]
869 local r = redis.call('ZRANGE', k, 0, 0)
870 if r and r[1] then
871 local m = r[1]
872 if (not best) or (m < best) then
873 best = m
874 best_key = k
875 end
876 end
877end
878
879if not best then
880 return nil
881end
882
883redis.call('ZREM', best_key, best)
884
885-- Extract task_id from member "{prio}:{created}:{task_id}"
886local _, _, task_id = string.find(best, "^[^:]+:[^:]+:(.+)$")
887if not task_id then
888 return nil
889end
890
891redis.call('HDEL', member_key, task_id)
892redis.call('ZADD', running_key, now_ms, task_id)
893
894local tjson = redis.call('HGET', tasks_key, task_id)
895if not tjson then
896 return nil
897end
898
899local t = cjson.decode(tjson)
900t["status"] = "running"
901t["claimed_at"] = now_rfc
902local out = cjson.encode(t)
903redis.call('HSET', tasks_key, task_id, out)
904return out
905"#;
906
907 let task_json: Option<String> = self.with_conn(|conn| {
908 let script = redis::Script::new(LUA);
909 let mut inv = script.prepare_invoke();
910 for k in &pending_keys {
911 inv.key(k);
912 }
913 inv.arg(now_rfc).arg(now_ms).invoke(conn)
914 })?;
915
916 let Some(task_json) = task_json else {
917 return Ok(None);
918 };
919
920 let task: DocketTask = serde_json::from_str(&task_json)
921 .map_err(|e| DocketError::Backend(format!("Task deserialize failed: {e}")))?;
922 Ok(Some(task))
923 }
924
925 fn ack(&self, task_id: &TaskId, result: serde_json::Value) -> DocketResult<()> {
926 let task_id_str = task_id.0.clone();
927 let tasks_key = self.key_tasks();
928 let running_key = self.key_running();
929 let member_key = self.key_queue_member();
930 let type_key = self.key_queue_type();
931
932 let mut task = self
933 .get_task(task_id)?
934 .ok_or_else(|| DocketError::NotFound(task_id_str.clone()))?;
935 task.status = TaskStatus::Completed;
936 task.result = Some(result);
937
938 let json = serde_json::to_string(&task)
939 .map_err(|e| DocketError::Backend(format!("Task serialize failed: {e}")))?;
940
941 self.with_conn(|conn| {
942 redis::pipe()
943 .atomic()
944 .cmd("HSET")
945 .arg(&tasks_key)
946 .arg(&task_id_str)
947 .arg(&json)
948 .ignore()
949 .cmd("ZREM")
950 .arg(&running_key)
951 .arg(&task_id_str)
952 .ignore()
953 .cmd("HDEL")
954 .arg(&member_key)
955 .arg(&task_id_str)
956 .ignore()
957 .cmd("HDEL")
958 .arg(&type_key)
959 .arg(&task_id_str)
960 .ignore()
961 .query::<()>(conn)
962 })?;
963
964 Ok(())
965 }
966
967 fn nack(&self, task_id: &TaskId, error: &str) -> DocketResult<()> {
968 let task_id_str = task_id.0.clone();
969 let tasks_key = self.key_tasks();
970 let running_key = self.key_running();
971 let member_key = self.key_queue_member();
972 let type_key = self.key_queue_type();
973
974 let mut task = self
975 .get_task(task_id)?
976 .ok_or_else(|| DocketError::NotFound(task_id_str.clone()))?;
977
978 task.retry_count += 1;
979 task.error = Some(error.to_string());
980
981 self.with_conn(|conn| {
983 redis::cmd("ZREM")
984 .arg(&running_key)
985 .arg(&task_id_str)
986 .query::<()>(conn)
987 })?;
988
989 if task.retry_count >= task.max_retries {
990 task.status = TaskStatus::Failed;
991 let json = serde_json::to_string(&task)
992 .map_err(|e| DocketError::Backend(format!("Task serialize failed: {e}")))?;
993 self.with_conn(|conn| {
994 redis::pipe()
995 .atomic()
996 .cmd("HSET")
997 .arg(&tasks_key)
998 .arg(&task_id_str)
999 .arg(&json)
1000 .ignore()
1001 .cmd("HDEL")
1002 .arg(&member_key)
1003 .arg(&task_id_str)
1004 .ignore()
1005 .cmd("HDEL")
1006 .arg(&type_key)
1007 .arg(&task_id_str)
1008 .ignore()
1009 .query::<()>(conn)
1010 })?;
1011 return Ok(());
1012 }
1013
1014 task.status = TaskStatus::Pending;
1016 task.claimed_at = None;
1017
1018 let member = Self::encode_member(&task);
1019 let task_type = task.task_type.clone();
1020 let delayed_key = self.key_delayed(&task_type);
1021 let available_ms = Self::now_ms().saturating_add(self.retry_delay_ms(task.retry_count));
1022
1023 let json = serde_json::to_string(&task)
1024 .map_err(|e| DocketError::Backend(format!("Task serialize failed: {e}")))?;
1025
1026 self.with_conn(|conn| {
1027 redis::pipe()
1028 .atomic()
1029 .cmd("HSET")
1030 .arg(&tasks_key)
1031 .arg(&task_id_str)
1032 .arg(&json)
1033 .ignore()
1034 .cmd("HSET")
1035 .arg(&member_key)
1036 .arg(&task_id_str)
1037 .arg(&member)
1038 .ignore()
1039 .cmd("HSET")
1040 .arg(&type_key)
1041 .arg(&task_id_str)
1042 .arg(&task_type)
1043 .ignore()
1044 .cmd("ZADD")
1045 .arg(&delayed_key)
1046 .arg(available_ms)
1047 .arg(&member)
1048 .ignore()
1049 .query::<()>(conn)
1050 })?;
1051
1052 Ok(())
1053 }
1054
1055 fn get_task(&self, task_id: &TaskId) -> DocketResult<Option<DocketTask>> {
1056 let tasks_key = self.key_tasks();
1057 let task_id_str = task_id.0.clone();
1058 let json: Option<String> = self.with_conn(|conn| {
1059 redis::cmd("HGET")
1060 .arg(&tasks_key)
1061 .arg(&task_id_str)
1062 .query(conn)
1063 })?;
1064 let Some(json) = json else {
1065 return Ok(None);
1066 };
1067 let task: DocketTask = serde_json::from_str(&json)
1068 .map_err(|e| DocketError::Backend(format!("Task deserialize failed: {e}")))?;
1069 Ok(Some(task))
1070 }
1071
1072 fn list_tasks(
1073 &self,
1074 status: Option<TaskStatus>,
1075 limit: usize,
1076 ) -> DocketResult<Vec<DocketTask>> {
1077 let tasks_key = self.key_tasks();
1078 let values: Vec<String> =
1079 self.with_conn(|conn| redis::cmd("HVALS").arg(&tasks_key).query(conn))?;
1080
1081 let mut tasks = Vec::new();
1082 for json in values {
1083 if let Ok(task) = serde_json::from_str::<DocketTask>(&json) {
1084 if status.is_none_or(|s| task.status == s) {
1085 tasks.push(task);
1086 }
1087 }
1088 }
1089
1090 tasks.sort_by(|a, b| {
1091 a.created_at
1092 .cmp(&b.created_at)
1093 .then_with(|| a.id.0.cmp(&b.id.0))
1094 });
1095 tasks.truncate(limit);
1096 Ok(tasks)
1097 }
1098
1099 fn cancel(&self, task_id: &TaskId, reason: Option<&str>) -> DocketResult<()> {
1100 let task_id_str = task_id.0.clone();
1101 let tasks_key = self.key_tasks();
1102 let running_key = self.key_running();
1103 let member_key = self.key_queue_member();
1104 let type_key = self.key_queue_type();
1105
1106 let Some(mut task) = self.get_task(task_id)? else {
1107 return Err(DocketError::NotFound(task_id_str));
1108 };
1109
1110 task.status = TaskStatus::Cancelled;
1111 task.error = Some(reason.unwrap_or("Cancelled").to_string());
1112 task.result = Some(serde_json::json!({"cancelled": true}));
1113
1114 let json = serde_json::to_string(&task)
1115 .map_err(|e| DocketError::Backend(format!("Task serialize failed: {e}")))?;
1116
1117 let member: Option<String> = self.with_conn(|conn| {
1118 redis::cmd("HGET")
1119 .arg(&member_key)
1120 .arg(&task_id_str)
1121 .query(conn)
1122 })?;
1123 let task_type: Option<String> = self.with_conn(|conn| {
1124 redis::cmd("HGET")
1125 .arg(&type_key)
1126 .arg(&task_id_str)
1127 .query(conn)
1128 })?;
1129
1130 self.with_conn(|conn| {
1131 redis::pipe()
1132 .atomic()
1133 .cmd("HSET")
1134 .arg(&tasks_key)
1135 .arg(&task_id_str)
1136 .arg(&json)
1137 .ignore()
1138 .cmd("ZREM")
1139 .arg(&running_key)
1140 .arg(&task_id_str)
1141 .ignore()
1142 .cmd("HDEL")
1143 .arg(&member_key)
1144 .arg(&task_id_str)
1145 .ignore()
1146 .cmd("HDEL")
1147 .arg(&type_key)
1148 .arg(&task_id_str)
1149 .ignore()
1150 .query::<()>(conn)
1151 })?;
1152
1153 if let (Some(member), Some(task_type)) = (member, task_type) {
1154 let pending_key = self.key_pending(&task_type);
1155 let delayed_key = self.key_delayed(&task_type);
1156 let _ = self.with_conn(|conn| {
1157 redis::pipe()
1158 .atomic()
1159 .cmd("ZREM")
1160 .arg(&pending_key)
1161 .arg(&member)
1162 .ignore()
1163 .cmd("ZREM")
1164 .arg(&delayed_key)
1165 .arg(&member)
1166 .ignore()
1167 .query::<()>(conn)
1168 });
1169 }
1170
1171 Ok(())
1172 }
1173
1174 fn stats(&self) -> DocketResult<QueueStats> {
1175 let tasks = self.list_tasks(None, usize::MAX)?;
1176 let mut stats = QueueStats::default();
1177 for t in tasks {
1178 match t.status {
1179 TaskStatus::Pending => stats.pending += 1,
1180 TaskStatus::Running => stats.in_progress += 1,
1181 TaskStatus::Completed => stats.completed += 1,
1182 TaskStatus::Failed => stats.failed += 1,
1183 TaskStatus::Cancelled => stats.cancelled += 1,
1184 }
1185 }
1186 Ok(stats)
1187 }
1188
1189 fn requeue_stale(&self) -> DocketResult<usize> {
1190 let now_ms = Self::now_ms();
1191 let visibility_ms: i64 = self
1192 .docket_settings
1193 .visibility_timeout
1194 .as_millis()
1195 .min(i64::MAX as u128) as i64;
1196 let cutoff = now_ms.saturating_sub(visibility_ms);
1197
1198 let tasks_key = self.key_tasks();
1199 let running_key = self.key_running();
1200 let types_key = self.key_types();
1201 let member_key = self.key_queue_member();
1202 let type_key = self.key_queue_type();
1203
1204 let types: Vec<String> =
1206 self.with_conn(|conn| redis::cmd("SMEMBERS").arg(&types_key).query(conn))?;
1207 for t in &types {
1208 let delayed_key = self.key_delayed(t);
1209 let pending_key = self.key_pending(t);
1210 let due_members: Vec<String> = self.with_conn(|conn| {
1211 redis::cmd("ZRANGEBYSCORE")
1212 .arg(&delayed_key)
1213 .arg("-inf")
1214 .arg(now_ms)
1215 .query(conn)
1216 })?;
1217 if due_members.is_empty() {
1218 continue;
1219 }
1220 self.with_conn(|conn| {
1221 let mut pipe = redis::pipe();
1222 pipe.atomic();
1223 for m in &due_members {
1224 pipe.cmd("ZREM").arg(&delayed_key).arg(m).ignore();
1225 pipe.cmd("ZADD").arg(&pending_key).arg(0).arg(m).ignore();
1226 }
1227 pipe.query::<()>(conn)
1228 })?;
1229 }
1230
1231 let stale: Vec<String> = self.with_conn(|conn| {
1233 redis::cmd("ZRANGEBYSCORE")
1234 .arg(&running_key)
1235 .arg("-inf")
1236 .arg(cutoff)
1237 .query(conn)
1238 })?;
1239
1240 let mut requeued = 0usize;
1241 for task_id in stale {
1242 let removed: i64 = self.with_conn(|conn| {
1244 redis::cmd("ZREM")
1245 .arg(&running_key)
1246 .arg(&task_id)
1247 .query(conn)
1248 })?;
1249 if removed == 0 {
1250 continue;
1251 }
1252
1253 let id = TaskId::from_string(task_id.clone());
1254 let Some(mut task) = self.get_task(&id)? else {
1255 continue;
1256 };
1257
1258 task.retry_count += 1;
1259 task.claimed_at = None;
1260 task.error = Some("Exceeded visibility timeout".to_string());
1261
1262 if task.retry_count >= task.max_retries {
1263 task.status = TaskStatus::Failed;
1264 let json = serde_json::to_string(&task)
1265 .map_err(|e| DocketError::Backend(format!("Task serialize failed: {e}")))?;
1266 self.with_conn(|conn| {
1267 redis::pipe()
1268 .atomic()
1269 .cmd("HSET")
1270 .arg(&tasks_key)
1271 .arg(&task_id)
1272 .arg(&json)
1273 .ignore()
1274 .cmd("HDEL")
1275 .arg(&member_key)
1276 .arg(&task_id)
1277 .ignore()
1278 .cmd("HDEL")
1279 .arg(&type_key)
1280 .arg(&task_id)
1281 .ignore()
1282 .query::<()>(conn)
1283 })?;
1284 continue;
1285 }
1286
1287 task.status = TaskStatus::Pending;
1288 let member = Self::encode_member(&task);
1289 let task_type = task.task_type.clone();
1290 let delayed_key = self.key_delayed(&task_type);
1291 let available_ms = now_ms.saturating_add(self.retry_delay_ms(task.retry_count));
1292
1293 let json = serde_json::to_string(&task)
1294 .map_err(|e| DocketError::Backend(format!("Task serialize failed: {e}")))?;
1295 self.with_conn(|conn| {
1296 redis::pipe()
1297 .atomic()
1298 .cmd("HSET")
1299 .arg(&tasks_key)
1300 .arg(&task_id)
1301 .arg(&json)
1302 .ignore()
1303 .cmd("HSET")
1304 .arg(&member_key)
1305 .arg(&task_id)
1306 .arg(&member)
1307 .ignore()
1308 .cmd("HSET")
1309 .arg(&type_key)
1310 .arg(&task_id)
1311 .arg(&task_type)
1312 .ignore()
1313 .cmd("ZADD")
1314 .arg(&delayed_key)
1315 .arg(available_ms)
1316 .arg(&member)
1317 .ignore()
1318 .query::<()>(conn)
1319 })?;
1320 requeued += 1;
1321 }
1322
1323 Ok(requeued)
1324 }
1325}
1326
1327pub struct Docket {
1335 backend: Arc<dyn DocketBackend>,
1336 settings: DocketSettings,
1337 task_counter: AtomicU64,
1338}
1339
1340impl Docket {
1341 pub fn new(settings: DocketSettings) -> DocketResult<Self> {
1343 let backend: Arc<dyn DocketBackend> = match &settings.backend {
1344 DocketBackendType::Memory => Arc::new(MemoryDocketBackend::new(settings.clone())),
1345 #[cfg(feature = "redis")]
1346 DocketBackendType::Redis(redis_settings) => Arc::new(RedisDocketBackend::new(
1347 redis_settings.clone(),
1348 settings.clone(),
1349 )?),
1350 #[cfg(not(feature = "redis"))]
1351 DocketBackendType::Redis(_) => {
1352 return Err(DocketError::Backend(
1353 "Redis backend requires 'redis' feature".to_string(),
1354 ));
1355 }
1356 };
1357
1358 Ok(Self {
1359 backend,
1360 settings,
1361 task_counter: AtomicU64::new(0),
1362 })
1363 }
1364
1365 #[must_use]
1367 pub fn memory() -> Self {
1368 Self::new(DocketSettings::memory()).expect("memory backend always succeeds")
1369 }
1370
1371 pub fn submit(
1373 &self,
1374 task_type: impl Into<String>,
1375 params: serde_json::Value,
1376 ) -> DocketResult<TaskId> {
1377 self.submit_with_options(task_type, params, SubmitOptions::default())
1378 }
1379
1380 pub fn submit_with_options(
1382 &self,
1383 task_type: impl Into<String>,
1384 params: serde_json::Value,
1385 options: SubmitOptions,
1386 ) -> DocketResult<TaskId> {
1387 let counter = self.task_counter.fetch_add(1, Ordering::SeqCst);
1388 let task_id = TaskId::from_string(format!("docket-{counter:08x}"));
1389
1390 let max_retries = options.max_retries.unwrap_or(self.settings.max_retries);
1391 let task = DocketTask::new(
1392 task_id.clone(),
1393 task_type.into(),
1394 params,
1395 options.priority,
1396 max_retries,
1397 );
1398
1399 self.backend.enqueue(task)?;
1400
1401 info!(
1402 target: targets::SERVER,
1403 "Docket: submitted task {} (type: {})",
1404 task_id,
1405 task_id
1406 );
1407
1408 Ok(task_id)
1409 }
1410
1411 pub fn get_task(&self, task_id: &TaskId) -> DocketResult<Option<DocketTask>> {
1413 self.backend.get_task(task_id)
1414 }
1415
1416 pub fn list_tasks(
1418 &self,
1419 status: Option<TaskStatus>,
1420 limit: usize,
1421 ) -> DocketResult<Vec<DocketTask>> {
1422 self.backend.list_tasks(status, limit)
1423 }
1424
1425 pub fn cancel(&self, task_id: &TaskId, reason: Option<&str>) -> DocketResult<()> {
1427 self.backend.cancel(task_id, reason)
1428 }
1429
1430 pub fn stats(&self) -> DocketResult<QueueStats> {
1432 self.backend.stats()
1433 }
1434
1435 #[must_use]
1437 pub fn worker(&self) -> WorkerBuilder {
1438 WorkerBuilder::new(Arc::clone(&self.backend), self.settings.clone())
1439 }
1440
1441 #[must_use]
1443 pub fn settings(&self) -> &DocketSettings {
1444 &self.settings
1445 }
1446
1447 #[must_use]
1449 pub fn into_shared(self) -> SharedDocket {
1450 Arc::new(self)
1451 }
1452}
1453
1454impl std::fmt::Debug for Docket {
1455 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1456 f.debug_struct("Docket")
1457 .field("settings", &self.settings)
1458 .field("task_counter", &self.task_counter.load(Ordering::SeqCst))
1459 .finish_non_exhaustive()
1460 }
1461}
1462
1463pub type SharedDocket = Arc<Docket>;
1465
1466pub type TaskHandlerFn = Box<
1472 dyn Fn(DocketTask) -> Pin<Box<dyn Future<Output = DocketResult<serde_json::Value>> + Send>>
1473 + Send
1474 + Sync,
1475>;
1476
1477pub struct WorkerBuilder {
1479 backend: Arc<dyn DocketBackend>,
1480 settings: DocketSettings,
1481 handlers: HashMap<String, TaskHandlerFn>,
1482}
1483
1484impl WorkerBuilder {
1485 fn new(backend: Arc<dyn DocketBackend>, settings: DocketSettings) -> Self {
1486 Self {
1487 backend,
1488 settings,
1489 handlers: HashMap::new(),
1490 }
1491 }
1492
1493 pub fn subscribe<F, Fut>(mut self, task_type: impl Into<String>, handler: F) -> Self
1495 where
1496 F: Fn(DocketTask) -> Fut + Send + Sync + 'static,
1497 Fut: Future<Output = DocketResult<serde_json::Value>> + Send + 'static,
1498 {
1499 let task_type = task_type.into();
1500 let boxed: TaskHandlerFn = Box::new(move |task| Box::pin(handler(task)));
1501 self.handlers.insert(task_type, boxed);
1502 self
1503 }
1504
1505 #[must_use]
1507 pub fn build(self) -> Worker {
1508 Worker {
1509 backend: self.backend,
1510 settings: self.settings,
1511 handlers: Arc::new(self.handlers),
1512 running: Arc::new(AtomicBool::new(false)),
1513 }
1514 }
1515}
1516
1517pub struct Worker {
1519 backend: Arc<dyn DocketBackend>,
1520 settings: DocketSettings,
1521 handlers: Arc<HashMap<String, TaskHandlerFn>>,
1522 running: Arc<AtomicBool>,
1523}
1524
1525impl Worker {
1526 #[must_use]
1528 pub fn subscribed_types(&self) -> Vec<String> {
1529 self.handlers.keys().cloned().collect()
1530 }
1531
1532 #[must_use]
1534 pub fn is_running(&self) -> bool {
1535 self.running.load(Ordering::SeqCst)
1536 }
1537
1538 pub fn stop(&self) {
1540 self.running.store(false, Ordering::SeqCst);
1541 }
1542
1543 pub async fn process_one(&self, cx: &Cx) -> DocketResult<bool> {
1547 let task_types = self.subscribed_types();
1548
1549 if cx.is_cancel_requested() {
1551 return Err(DocketError::Cancelled);
1552 }
1553
1554 let Some(task) = self.backend.dequeue(&task_types)? else {
1556 return Ok(false);
1557 };
1558
1559 let task_id = task.id.clone();
1560 let task_type = task.task_type.clone();
1561
1562 debug!(
1563 target: targets::SERVER,
1564 "Docket worker: processing task {} (type: {})",
1565 task_id,
1566 task_type
1567 );
1568
1569 let Some(handler) = self.handlers.get(&task_type) else {
1571 self.backend.nack(&task_id, "No handler for task type")?;
1573 return Ok(true);
1574 };
1575
1576 let result = handler(task).await;
1578
1579 match result {
1580 Ok(data) => {
1581 self.backend.ack(&task_id, data)?;
1582 info!(
1583 target: targets::SERVER,
1584 "Docket worker: completed task {}",
1585 task_id
1586 );
1587 }
1588 Err(e) => {
1589 let error_msg = e.to_string();
1590 self.backend.nack(&task_id, &error_msg)?;
1591 warn!(
1592 target: targets::SERVER,
1593 "Docket worker: task {} failed: {}",
1594 task_id,
1595 error_msg
1596 );
1597 }
1598 }
1599
1600 Ok(true)
1601 }
1602
1603 pub async fn run(&self, cx: &Cx) -> DocketResult<()> {
1605 self.running.store(true, Ordering::SeqCst);
1606
1607 info!(
1608 target: targets::SERVER,
1609 "Docket worker starting with subscriptions: {:?}",
1610 self.subscribed_types()
1611 );
1612
1613 while self.running.load(Ordering::SeqCst) {
1614 if cx.is_cancel_requested() {
1616 break;
1617 }
1618
1619 let _ = self.backend.requeue_stale();
1621
1622 match self.process_one(cx).await {
1624 Ok(true) => {
1625 continue;
1627 }
1628 Ok(false) => {
1629 std::thread::sleep(self.settings.poll_interval);
1631 }
1632 Err(DocketError::Cancelled) => {
1633 break;
1634 }
1635 Err(e) => {
1636 warn!(
1637 target: targets::SERVER,
1638 "Docket worker error: {}",
1639 e
1640 );
1641 std::thread::sleep(Duration::from_millis(100));
1643 }
1644 }
1645 }
1646
1647 self.running.store(false, Ordering::SeqCst);
1648 info!(target: targets::SERVER, "Docket worker stopped");
1649
1650 Ok(())
1651 }
1652}
1653
1654impl std::fmt::Debug for Worker {
1655 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1656 f.debug_struct("Worker")
1657 .field("subscribed_types", &self.subscribed_types())
1658 .field("running", &self.is_running())
1659 .finish_non_exhaustive()
1660 }
1661}
1662
1663#[cfg(test)]
1668mod tests {
1669 use super::*;
1670 #[cfg(feature = "redis")]
1671 use std::sync::atomic::{AtomicU64, Ordering as AtomicOrdering};
1672
1673 #[cfg(feature = "redis")]
1674 use std::net::TcpListener;
1675 #[cfg(feature = "redis")]
1676 use std::process::{Child, Command, Stdio};
1677 #[cfg(feature = "redis")]
1678 use std::time::Instant;
1679
1680 #[cfg(feature = "redis")]
1681 static REDIS_TEST_SEQ: AtomicU64 = AtomicU64::new(1);
1682
1683 #[cfg(feature = "redis")]
1684 fn next_test_token(label: &str) -> String {
1685 let seq = REDIS_TEST_SEQ.fetch_add(1, AtomicOrdering::SeqCst);
1686 format!("{label}-{}-{seq}", std::process::id())
1687 }
1688
1689 #[cfg(feature = "redis")]
1690 struct TestRedisServer {
1691 child: Child,
1692 url: String,
1693 }
1694
1695 #[cfg(feature = "redis")]
1696 impl TestRedisServer {
1697 fn start() -> Self {
1698 let listener = TcpListener::bind("127.0.0.1:0").expect("bind ephemeral redis port");
1699 let port = listener.local_addr().expect("redis test local addr").port();
1700 drop(listener);
1701
1702 let child = Command::new("redis-server")
1703 .arg("--port")
1704 .arg(port.to_string())
1705 .arg("--save")
1706 .arg("")
1707 .arg("--appendonly")
1708 .arg("no")
1709 .arg("--bind")
1710 .arg("127.0.0.1")
1711 .arg("--protected-mode")
1712 .arg("no")
1713 .stdout(Stdio::null())
1714 .stderr(Stdio::null())
1715 .spawn()
1716 .expect("spawn redis-server");
1717
1718 let url = format!("redis://127.0.0.1:{port}/");
1719 let deadline = Instant::now() + Duration::from_secs(5);
1720 loop {
1721 let ready = redis::Client::open(url.as_str())
1722 .ok()
1723 .and_then(|client| client.get_connection().ok())
1724 .and_then(|mut conn| redis::cmd("PING").query::<String>(&mut conn).ok())
1725 .is_some_and(|pong| pong == "PONG");
1726
1727 if ready {
1728 break;
1729 }
1730 assert!(
1731 Instant::now() < deadline,
1732 "redis-server did not become ready"
1733 );
1734 std::thread::sleep(Duration::from_millis(20));
1735 }
1736
1737 Self { child, url }
1738 }
1739 }
1740
1741 #[cfg(feature = "redis")]
1742 impl Drop for TestRedisServer {
1743 fn drop(&mut self) {
1744 let _ = self.child.kill();
1745 let _ = self.child.wait();
1746 }
1747 }
1748
1749 #[cfg(feature = "redis")]
1750 fn redis_settings_for_test(url: &str) -> DocketSettings {
1751 let mut settings = DocketSettings::redis(url);
1752 settings.queue_prefix = format!("fastmcp:docket:test:{}", next_test_token("queue"));
1753 settings.poll_interval = Duration::from_millis(1);
1754 settings.retry_delay = Duration::from_millis(0);
1755 settings
1756 }
1757
1758 #[test]
1759 fn test_docket_settings_default() {
1760 let settings = DocketSettings::default();
1761 assert!(matches!(settings.backend, DocketBackendType::Memory));
1762 assert_eq!(settings.max_retries, 3);
1763 }
1764
1765 #[test]
1766 fn test_docket_settings_redis() {
1767 let settings = DocketSettings::redis("redis://localhost:6379");
1768 assert!(matches!(settings.backend, DocketBackendType::Redis(_)));
1769 }
1770
1771 #[test]
1772 fn test_docket_settings_builder() {
1773 let settings = DocketSettings::memory()
1774 .with_queue_prefix("test:queue")
1775 .with_max_retries(5)
1776 .with_poll_interval(Duration::from_millis(50));
1777
1778 assert_eq!(settings.queue_prefix, "test:queue");
1779 assert_eq!(settings.max_retries, 5);
1780 assert_eq!(settings.poll_interval, Duration::from_millis(50));
1781 }
1782
1783 #[test]
1784 fn test_docket_memory_creation() {
1785 let docket = Docket::memory();
1786 assert!(matches!(docket.settings.backend, DocketBackendType::Memory));
1787 }
1788
1789 #[test]
1790 fn test_docket_submit_task() {
1791 let docket = Docket::memory();
1792
1793 let task_id = docket
1794 .submit("test_task", serde_json::json!({"key": "value"}))
1795 .unwrap();
1796
1797 assert!(task_id.to_string().starts_with("docket-"));
1798
1799 let task = docket.get_task(&task_id).unwrap().unwrap();
1801 assert_eq!(task.task_type, "test_task");
1802 assert_eq!(task.status, TaskStatus::Pending);
1803 }
1804
1805 #[test]
1806 fn test_docket_submit_with_priority() {
1807 let docket = Docket::memory();
1808
1809 let low_id = docket
1810 .submit_with_options(
1811 "task",
1812 serde_json::json!({"priority": "low"}),
1813 SubmitOptions::new().with_priority(1),
1814 )
1815 .unwrap();
1816
1817 let high_id = docket
1818 .submit_with_options(
1819 "task",
1820 serde_json::json!({"priority": "high"}),
1821 SubmitOptions::new().with_priority(10),
1822 )
1823 .unwrap();
1824
1825 let worker = docket
1827 .worker()
1828 .subscribe("task", |t| async move { Ok(t.params) })
1829 .build();
1830
1831 let types = worker.subscribed_types();
1832 let dequeued = docket.backend.dequeue(&types).unwrap().unwrap();
1833 assert_eq!(dequeued.id, high_id);
1834
1835 docket.backend.ack(&high_id, serde_json::json!({})).unwrap();
1837
1838 let dequeued = docket.backend.dequeue(&types).unwrap().unwrap();
1840 assert_eq!(dequeued.id, low_id);
1841 }
1842
1843 #[test]
1844 fn test_docket_cancel_task() {
1845 let docket = Docket::memory();
1846
1847 let task_id = docket.submit("task", serde_json::json!({})).unwrap();
1848
1849 docket.cancel(&task_id, Some("User cancelled")).unwrap();
1850
1851 let task = docket.get_task(&task_id).unwrap().unwrap();
1852 assert_eq!(task.status, TaskStatus::Cancelled);
1853 assert_eq!(task.error, Some("User cancelled".to_string()));
1854 }
1855
1856 #[test]
1857 fn test_docket_stats() {
1858 let docket = Docket::memory();
1859
1860 docket.submit("task1", serde_json::json!({})).unwrap();
1861 docket.submit("task2", serde_json::json!({})).unwrap();
1862 let task3 = docket.submit("task3", serde_json::json!({})).unwrap();
1863 docket.cancel(&task3, None).unwrap();
1864
1865 let stats = docket.stats().unwrap();
1866 assert_eq!(stats.pending, 2);
1867 assert_eq!(stats.cancelled, 1);
1868 }
1869
1870 #[test]
1871 fn test_docket_list_tasks() {
1872 let docket = Docket::memory();
1873
1874 docket.submit("type_a", serde_json::json!({})).unwrap();
1875 docket.submit("type_b", serde_json::json!({})).unwrap();
1876 let cancelled_id = docket.submit("type_a", serde_json::json!({})).unwrap();
1877 docket.cancel(&cancelled_id, None).unwrap();
1878
1879 let all = docket.list_tasks(None, 100).unwrap();
1881 assert_eq!(all.len(), 3);
1882
1883 let pending = docket.list_tasks(Some(TaskStatus::Pending), 100).unwrap();
1885 assert_eq!(pending.len(), 2);
1886
1887 let cancelled = docket.list_tasks(Some(TaskStatus::Cancelled), 100).unwrap();
1889 assert_eq!(cancelled.len(), 1);
1890 }
1891
1892 #[test]
1893 fn test_worker_builder() {
1894 let docket = Docket::memory();
1895
1896 let worker = docket
1897 .worker()
1898 .subscribe("type_a", |_| async { Ok(serde_json::json!({})) })
1899 .subscribe("type_b", |_| async { Ok(serde_json::json!({})) })
1900 .build();
1901
1902 let types = worker.subscribed_types();
1903 assert!(types.contains(&"type_a".to_string()));
1904 assert!(types.contains(&"type_b".to_string()));
1905 }
1906
1907 #[test]
1908 fn test_memory_backend_retry() {
1909 let settings = DocketSettings::memory().with_max_retries(2);
1910 let backend = MemoryDocketBackend::new(settings);
1911
1912 let task = DocketTask::new(
1913 TaskId::from_string("test-1"),
1914 "retry_test".to_string(),
1915 serde_json::json!({}),
1916 0,
1917 2,
1918 );
1919
1920 backend.enqueue(task).unwrap();
1921
1922 let task = backend
1924 .dequeue(&["retry_test".to_string()])
1925 .unwrap()
1926 .unwrap();
1927 backend.nack(&task.id, "error 1").unwrap();
1928
1929 let task = backend
1931 .dequeue(&["retry_test".to_string()])
1932 .unwrap()
1933 .unwrap();
1934 assert_eq!(task.retry_count, 1);
1935 backend.nack(&task.id, "error 2").unwrap();
1936
1937 let task = backend.dequeue(&["retry_test".to_string()]).unwrap();
1939 assert!(task.is_none());
1940
1941 let task = backend
1943 .get_task(&TaskId::from_string("test-1"))
1944 .unwrap()
1945 .unwrap();
1946 assert_eq!(task.status, TaskStatus::Failed);
1947 }
1948
1949 #[test]
1950 fn test_docket_task_to_info() {
1951 let task = DocketTask::new(
1952 TaskId::from_string("test-info"),
1953 "test_type".to_string(),
1954 serde_json::json!({"data": 42}),
1955 5,
1956 3,
1957 );
1958
1959 let info = task.to_task_info();
1960 assert_eq!(info.id.to_string(), "test-info");
1961 assert_eq!(info.task_type, "test_type");
1962 assert_eq!(info.status, TaskStatus::Pending);
1963 assert!(info.started_at.is_none());
1964 }
1965
1966 #[test]
1967 fn test_worker_process_one() {
1968 use fastmcp_core::block_on;
1969
1970 let docket = Docket::memory();
1971
1972 let task_id = docket
1974 .submit("process_test", serde_json::json!({"x": 1}))
1975 .unwrap();
1976
1977 let worker = docket
1979 .worker()
1980 .subscribe("process_test", |task| async move {
1981 let x = task.params.get("x").and_then(|v| v.as_i64()).unwrap_or(0);
1982 Ok(serde_json::json!({"result": x * 2}))
1983 })
1984 .build();
1985
1986 let cx = Cx::for_testing();
1988 let processed = block_on(worker.process_one(&cx)).unwrap();
1989 assert!(processed);
1990
1991 let task = docket.get_task(&task_id).unwrap().unwrap();
1993 assert_eq!(task.status, TaskStatus::Completed);
1994 assert_eq!(task.result, Some(serde_json::json!({"result": 2})));
1995 }
1996
1997 #[test]
1998 fn test_worker_no_task_available() {
1999 use fastmcp_core::block_on;
2000
2001 let docket = Docket::memory();
2002
2003 let worker = docket
2004 .worker()
2005 .subscribe("empty_test", |_| async { Ok(serde_json::json!({})) })
2006 .build();
2007
2008 let cx = Cx::for_testing();
2009 let processed = block_on(worker.process_one(&cx)).unwrap();
2010 assert!(!processed);
2011 }
2012
2013 #[test]
2014 fn test_submit_options() {
2015 let opts = SubmitOptions::new()
2016 .with_priority(10)
2017 .with_max_retries(5)
2018 .with_delay(Duration::from_secs(60));
2019
2020 assert_eq!(opts.priority, 10);
2021 assert_eq!(opts.max_retries, Some(5));
2022 assert_eq!(opts.delay, Some(Duration::from_secs(60)));
2023 }
2024
2025 #[test]
2026 fn test_docket_error_display() {
2027 let errors = vec![
2028 (
2029 DocketError::NotFound("task-1".into()),
2030 "Task not found: task-1",
2031 ),
2032 (
2033 DocketError::Connection("refused".into()),
2034 "Connection error: refused",
2035 ),
2036 (DocketError::Handler("panic".into()), "Handler error: panic"),
2037 (DocketError::Cancelled, "Operation cancelled"),
2038 ];
2039
2040 for (error, expected) in errors {
2041 assert_eq!(error.to_string(), expected);
2042 }
2043 }
2044
2045 #[cfg(feature = "redis")]
2046 #[test]
2047 fn test_redis_worker_process_round_trip() {
2048 use fastmcp_core::block_on;
2049
2050 let redis_server = TestRedisServer::start();
2051 let settings = redis_settings_for_test(&redis_server.url);
2052 let docket = Docket::new(settings).expect("redis docket");
2053
2054 let task_id = docket
2055 .submit("redis_round_trip", serde_json::json!({"value": 21}))
2056 .expect("submit");
2057
2058 let worker = docket
2059 .worker()
2060 .subscribe("redis_round_trip", |task| async move {
2061 let value = task
2062 .params
2063 .get("value")
2064 .and_then(|v| v.as_i64())
2065 .unwrap_or(0);
2066 Ok(serde_json::json!({"doubled": value * 2}))
2067 })
2068 .build();
2069
2070 let cx = Cx::for_testing();
2071 let processed = block_on(worker.process_one(&cx)).expect("process one");
2072 assert!(processed);
2073
2074 let task = docket
2075 .get_task(&task_id)
2076 .expect("get task")
2077 .expect("task exists");
2078 assert_eq!(task.status, TaskStatus::Completed);
2079 assert_eq!(task.result, Some(serde_json::json!({"doubled": 42})));
2080 }
2081
2082 #[cfg(feature = "redis")]
2083 #[test]
2084 fn test_redis_worker_retries_then_marks_failed() {
2085 use fastmcp_core::block_on;
2086
2087 let redis_server = TestRedisServer::start();
2088 let mut settings = redis_settings_for_test(&redis_server.url);
2089 settings.max_retries = 2;
2090 let docket = Docket::new(settings).expect("redis docket");
2091
2092 let task_id = docket
2093 .submit("redis_retry", serde_json::json!({"attempt": 0}))
2094 .expect("submit");
2095
2096 let worker = docket
2097 .worker()
2098 .subscribe("redis_retry", |_task| async move {
2099 Err(DocketError::Handler("boom".to_string()))
2100 })
2101 .build();
2102
2103 let cx = Cx::for_testing();
2104 assert!(block_on(worker.process_one(&cx)).expect("process first"));
2105 assert!(block_on(worker.process_one(&cx)).expect("process second"));
2106
2107 let task = docket
2108 .get_task(&task_id)
2109 .expect("get task")
2110 .expect("task exists");
2111 assert_eq!(task.status, TaskStatus::Failed);
2112 assert_eq!(task.retry_count, 2);
2113 assert!(task.error.unwrap_or_default().contains("boom"));
2114 }
2115
2116 #[cfg(feature = "redis")]
2117 #[test]
2118 fn test_redis_cancel_pending_task() {
2119 let redis_server = TestRedisServer::start();
2120 let settings = redis_settings_for_test(&redis_server.url);
2121 let docket = Docket::new(settings).expect("redis docket");
2122
2123 let task_id = docket
2124 .submit("redis_cancel", serde_json::json!({"x": 1}))
2125 .expect("submit");
2126 docket
2127 .cancel(&task_id, Some("stopped by test"))
2128 .expect("cancel task");
2129
2130 let task = docket
2131 .get_task(&task_id)
2132 .expect("get task")
2133 .expect("task exists");
2134 assert_eq!(task.status, TaskStatus::Cancelled);
2135 assert_eq!(task.error, Some("stopped by test".to_string()));
2136 }
2137
2138 #[cfg(feature = "redis")]
2139 #[test]
2140 fn test_redis_requeue_stale_running_task() {
2141 let redis_server = TestRedisServer::start();
2142 let mut settings = redis_settings_for_test(&redis_server.url);
2143 settings.visibility_timeout = Duration::from_millis(0);
2144 let docket = Docket::new(settings).expect("redis docket");
2145
2146 let task_id = docket
2147 .submit("redis_stale", serde_json::json!({"x": 1}))
2148 .expect("submit");
2149
2150 let task_types = vec!["redis_stale".to_string()];
2151 let claimed = docket
2152 .backend
2153 .dequeue(&task_types)
2154 .expect("dequeue")
2155 .expect("claimed task");
2156 assert_eq!(claimed.id, task_id);
2157 assert_eq!(claimed.status, TaskStatus::Running);
2158
2159 let requeued = docket.backend.requeue_stale().expect("requeue stale");
2160 assert_eq!(requeued, 1);
2161
2162 let reclaimed = docket
2163 .backend
2164 .dequeue(&task_types)
2165 .expect("dequeue after requeue")
2166 .expect("task available again");
2167 assert_eq!(reclaimed.id, task_id);
2168 assert_eq!(reclaimed.retry_count, 1);
2169 }
2170
2171 #[test]
2176 fn docket_settings_memory_equals_default() {
2177 let mem = DocketSettings::memory();
2178 let def = DocketSettings::default();
2179 assert_eq!(mem.queue_prefix, def.queue_prefix);
2180 assert_eq!(mem.max_retries, def.max_retries);
2181 assert_eq!(mem.visibility_timeout, def.visibility_timeout);
2182 assert_eq!(mem.default_task_timeout, def.default_task_timeout);
2183 assert_eq!(mem.retry_delay, def.retry_delay);
2184 assert_eq!(mem.poll_interval, def.poll_interval);
2185 }
2186
2187 #[test]
2188 fn docket_settings_with_visibility_timeout() {
2189 let s = DocketSettings::memory().with_visibility_timeout(Duration::from_secs(120));
2190 assert_eq!(s.visibility_timeout, Duration::from_secs(120));
2191 }
2192
2193 #[test]
2194 fn docket_settings_debug() {
2195 let s = DocketSettings::memory();
2196 let debug = format!("{:?}", s);
2197 assert!(debug.contains("DocketSettings"));
2198 assert!(debug.contains("fastmcp:docket"));
2199 }
2200
2201 #[test]
2202 fn docket_settings_clone() {
2203 let s = DocketSettings::memory().with_max_retries(7);
2204 let c = s.clone();
2205 assert_eq!(c.max_retries, 7);
2206 }
2207
2208 #[test]
2209 fn docket_settings_redis_pool_size() {
2210 let s = DocketSettings::redis("redis://localhost:6379");
2211 match s.backend {
2212 DocketBackendType::Redis(ref r) => {
2213 assert_eq!(r.pool_size, 10);
2214 assert_eq!(r.connect_timeout, Duration::from_secs(5));
2215 assert_eq!(r.url, "redis://localhost:6379");
2216 }
2217 _ => panic!("expected Redis backend"),
2218 }
2219 }
2220
2221 #[test]
2226 fn docket_backend_type_debug() {
2227 let mem = DocketBackendType::Memory;
2228 let debug = format!("{:?}", mem);
2229 assert!(debug.contains("Memory"));
2230
2231 let redis = DocketBackendType::Redis(RedisSettings {
2232 url: "redis://test".to_string(),
2233 pool_size: 5,
2234 connect_timeout: Duration::from_secs(3),
2235 });
2236 let debug = format!("{:?}", redis);
2237 assert!(debug.contains("Redis"));
2238 }
2239
2240 #[test]
2241 fn redis_settings_clone() {
2242 let r = RedisSettings {
2243 url: "redis://host".to_string(),
2244 pool_size: 3,
2245 connect_timeout: Duration::from_secs(2),
2246 };
2247 let c = r.clone();
2248 assert_eq!(c.url, "redis://host");
2249 assert_eq!(c.pool_size, 3);
2250 }
2251
2252 #[test]
2257 fn docket_task_new_fields() {
2258 let task = DocketTask::new(
2259 TaskId::from_string("t-1"),
2260 "my_type".to_string(),
2261 serde_json::json!({"key": "val"}),
2262 5,
2263 3,
2264 );
2265 assert_eq!(task.task_type, "my_type");
2266 assert_eq!(task.priority, 5);
2267 assert_eq!(task.max_retries, 3);
2268 assert_eq!(task.retry_count, 0);
2269 assert_eq!(task.status, TaskStatus::Pending);
2270 assert!(task.claimed_at.is_none());
2271 assert!(task.error.is_none());
2272 assert!(task.result.is_none());
2273 assert!(!task.created_at.is_empty());
2274 }
2275
2276 #[test]
2277 fn docket_task_debug_and_clone() {
2278 let task = DocketTask::new(
2279 TaskId::from_string("t-dbg"),
2280 "dbg_type".to_string(),
2281 serde_json::json!(null),
2282 0,
2283 1,
2284 );
2285 let debug = format!("{:?}", task);
2286 assert!(debug.contains("DocketTask"));
2287 assert!(debug.contains("dbg_type"));
2288
2289 let cloned = task.clone();
2290 assert_eq!(cloned.task_type, "dbg_type");
2291 }
2292
2293 #[test]
2294 fn docket_task_serialize_deserialize_roundtrip() {
2295 let task = DocketTask::new(
2296 TaskId::from_string("t-ser"),
2297 "ser_type".to_string(),
2298 serde_json::json!({"a": 1}),
2299 2,
2300 4,
2301 );
2302 let json = serde_json::to_string(&task).unwrap();
2303 let deserialized: DocketTask = serde_json::from_str(&json).unwrap();
2304 assert_eq!(deserialized.task_type, "ser_type");
2305 assert_eq!(deserialized.priority, 2);
2306 assert_eq!(deserialized.max_retries, 4);
2307 }
2308
2309 #[test]
2310 fn docket_task_to_task_info_pending() {
2311 let task = DocketTask::new(
2312 TaskId::from_string("t-info"),
2313 "info_type".to_string(),
2314 serde_json::json!({}),
2315 0,
2316 3,
2317 );
2318 let info = task.to_task_info();
2319 assert_eq!(info.status, TaskStatus::Pending);
2320 assert!(info.started_at.is_none());
2321 assert!(info.completed_at.is_none()); }
2323
2324 #[test]
2325 fn docket_task_to_task_info_completed() {
2326 let mut task = DocketTask::new(
2327 TaskId::from_string("t-comp"),
2328 "comp_type".to_string(),
2329 serde_json::json!({}),
2330 0,
2331 3,
2332 );
2333 task.status = TaskStatus::Completed;
2334 task.claimed_at = Some("2025-01-01T00:00:00Z".to_string());
2335 let info = task.to_task_info();
2336 assert_eq!(info.status, TaskStatus::Completed);
2337 assert!(info.started_at.is_some());
2338 assert!(info.completed_at.is_some()); }
2340
2341 #[test]
2342 fn docket_task_to_task_result_non_terminal() {
2343 let task = DocketTask::new(
2344 TaskId::from_string("t-res"),
2345 "res_type".to_string(),
2346 serde_json::json!({}),
2347 0,
2348 3,
2349 );
2350 assert!(task.to_task_result().is_none());
2351 }
2352
2353 #[test]
2354 fn docket_task_to_task_result_completed() {
2355 let mut task = DocketTask::new(
2356 TaskId::from_string("t-res2"),
2357 "res_type".to_string(),
2358 serde_json::json!({}),
2359 0,
2360 3,
2361 );
2362 task.status = TaskStatus::Completed;
2363 task.result = Some(serde_json::json!({"data": 42}));
2364 let result = task.to_task_result().unwrap();
2365 assert!(result.success);
2366 assert_eq!(result.data, Some(serde_json::json!({"data": 42})));
2367 assert!(result.error.is_none());
2368 }
2369
2370 #[test]
2371 fn docket_task_to_task_result_failed() {
2372 let mut task = DocketTask::new(
2373 TaskId::from_string("t-fail"),
2374 "fail_type".to_string(),
2375 serde_json::json!({}),
2376 0,
2377 3,
2378 );
2379 task.status = TaskStatus::Failed;
2380 task.error = Some("something broke".to_string());
2381 let result = task.to_task_result().unwrap();
2382 assert!(!result.success);
2383 assert_eq!(result.error, Some("something broke".to_string()));
2384 }
2385
2386 #[test]
2391 fn submit_options_default() {
2392 let opts = SubmitOptions::default();
2393 assert_eq!(opts.priority, 0);
2394 assert!(opts.max_retries.is_none());
2395 assert!(opts.delay.is_none());
2396 }
2397
2398 #[test]
2399 fn submit_options_debug() {
2400 let opts = SubmitOptions::new().with_priority(3);
2401 let debug = format!("{:?}", opts);
2402 assert!(debug.contains("SubmitOptions"));
2403 assert!(debug.contains('3'));
2404 }
2405
2406 #[test]
2411 fn docket_error_serialization_display() {
2412 let err = DocketError::Serialization("bad json".to_string());
2413 assert_eq!(err.to_string(), "Serialization error: bad json");
2414 }
2415
2416 #[test]
2417 fn docket_error_backend_display() {
2418 let err = DocketError::Backend("lock poisoned".to_string());
2419 assert_eq!(err.to_string(), "Backend error: lock poisoned");
2420 }
2421
2422 #[test]
2423 fn docket_error_is_std_error() {
2424 let err = DocketError::NotFound("x".to_string());
2425 let _: &dyn std::error::Error = &err;
2426 }
2427
2428 #[test]
2429 fn docket_error_into_mcp_error() {
2430 let err = DocketError::Handler("timeout".to_string());
2431 let mcp: McpError = err.into();
2432 assert!(mcp.message.contains("Handler error: timeout"));
2433 }
2434
2435 #[test]
2436 fn docket_error_debug() {
2437 let err = DocketError::Cancelled;
2438 let debug = format!("{:?}", err);
2439 assert!(debug.contains("Cancelled"));
2440 }
2441
2442 #[test]
2447 fn queue_stats_default() {
2448 let stats = QueueStats::default();
2449 assert_eq!(stats.pending, 0);
2450 assert_eq!(stats.in_progress, 0);
2451 assert_eq!(stats.completed, 0);
2452 assert_eq!(stats.failed, 0);
2453 assert_eq!(stats.cancelled, 0);
2454 }
2455
2456 #[test]
2457 fn queue_stats_debug_and_clone() {
2458 let stats = QueueStats {
2459 pending: 1,
2460 in_progress: 2,
2461 completed: 3,
2462 failed: 4,
2463 cancelled: 5,
2464 };
2465 let debug = format!("{:?}", stats);
2466 assert!(debug.contains("QueueStats"));
2467
2468 let c = stats.clone();
2469 assert_eq!(c.pending, 1);
2470 assert_eq!(c.completed, 3);
2471 }
2472
2473 #[test]
2478 fn memory_backend_dequeue_empty() {
2479 let backend = MemoryDocketBackend::new(DocketSettings::memory());
2480 let result = backend.dequeue(&["any".to_string()]).unwrap();
2481 assert!(result.is_none());
2482 }
2483
2484 #[test]
2485 fn memory_backend_dequeue_wrong_type() {
2486 let backend = MemoryDocketBackend::new(DocketSettings::memory());
2487 let task = DocketTask::new(
2488 TaskId::from_string("t-1"),
2489 "type_a".to_string(),
2490 serde_json::json!({}),
2491 0,
2492 3,
2493 );
2494 backend.enqueue(task).unwrap();
2495
2496 let result = backend.dequeue(&["type_b".to_string()]).unwrap();
2498 assert!(result.is_none());
2499 }
2500
2501 #[test]
2502 fn memory_backend_dequeue_sets_running() {
2503 let backend = MemoryDocketBackend::new(DocketSettings::memory());
2504 let task = DocketTask::new(
2505 TaskId::from_string("t-run"),
2506 "work".to_string(),
2507 serde_json::json!({}),
2508 0,
2509 3,
2510 );
2511 backend.enqueue(task).unwrap();
2512
2513 let dequeued = backend.dequeue(&["work".to_string()]).unwrap().unwrap();
2514 assert_eq!(dequeued.status, TaskStatus::Running);
2515 assert!(dequeued.claimed_at.is_some());
2516 }
2517
2518 #[test]
2519 fn memory_backend_ack_nonexistent() {
2520 let backend = MemoryDocketBackend::new(DocketSettings::memory());
2521 let result = backend.ack(&TaskId::from_string("nonexistent"), serde_json::json!({}));
2522 assert!(result.is_err());
2523 }
2524
2525 #[test]
2526 fn memory_backend_nack_nonexistent() {
2527 let backend = MemoryDocketBackend::new(DocketSettings::memory());
2528 let result = backend.nack(&TaskId::from_string("nonexistent"), "err");
2529 assert!(result.is_err());
2530 }
2531
2532 #[test]
2533 fn memory_backend_get_task_nonexistent() {
2534 let backend = MemoryDocketBackend::new(DocketSettings::memory());
2535 let result = backend.get_task(&TaskId::from_string("missing")).unwrap();
2536 assert!(result.is_none());
2537 }
2538
2539 #[test]
2540 fn memory_backend_cancel_terminal_task_fails() {
2541 let backend = MemoryDocketBackend::new(DocketSettings::memory());
2542 let task = DocketTask::new(
2543 TaskId::from_string("t-can"),
2544 "cancel_test".to_string(),
2545 serde_json::json!({}),
2546 0,
2547 3,
2548 );
2549 backend.enqueue(task).unwrap();
2550
2551 let id = TaskId::from_string("t-can");
2552 backend.cancel(&id, None).unwrap();
2553
2554 let result = backend.cancel(&id, Some("again"));
2556 assert!(result.is_err());
2557 }
2558
2559 #[test]
2560 fn memory_backend_cancel_removes_from_pending() {
2561 let backend = MemoryDocketBackend::new(DocketSettings::memory());
2562 let task = DocketTask::new(
2563 TaskId::from_string("t-crp"),
2564 "cancel_pend".to_string(),
2565 serde_json::json!({}),
2566 0,
2567 3,
2568 );
2569 backend.enqueue(task).unwrap();
2570
2571 let id = TaskId::from_string("t-crp");
2572 backend.cancel(&id, Some("bye")).unwrap();
2573
2574 let result = backend.dequeue(&["cancel_pend".to_string()]).unwrap();
2576 assert!(result.is_none());
2577 }
2578
2579 #[test]
2580 fn memory_backend_list_tasks_with_limit() {
2581 let backend = MemoryDocketBackend::new(DocketSettings::memory());
2582 for i in 0..5 {
2583 let task = DocketTask::new(
2584 TaskId::from_string(format!("t-{i}")),
2585 "limit_test".to_string(),
2586 serde_json::json!({}),
2587 0,
2588 3,
2589 );
2590 backend.enqueue(task).unwrap();
2591 }
2592
2593 let tasks = backend.list_tasks(None, 3).unwrap();
2594 assert_eq!(tasks.len(), 3);
2595 }
2596
2597 #[test]
2598 fn memory_backend_list_tasks_filter_by_status() {
2599 let backend = MemoryDocketBackend::new(DocketSettings::memory());
2600 for i in 0..3 {
2601 let task = DocketTask::new(
2602 TaskId::from_string(format!("t-{i}")),
2603 "filter_test".to_string(),
2604 serde_json::json!({}),
2605 0,
2606 3,
2607 );
2608 backend.enqueue(task).unwrap();
2609 }
2610 backend.cancel(&TaskId::from_string("t-1"), None).unwrap();
2612
2613 let pending = backend.list_tasks(Some(TaskStatus::Pending), 100).unwrap();
2614 assert_eq!(pending.len(), 2);
2615
2616 let cancelled = backend
2617 .list_tasks(Some(TaskStatus::Cancelled), 100)
2618 .unwrap();
2619 assert_eq!(cancelled.len(), 1);
2620 }
2621
2622 #[test]
2623 fn memory_backend_stats_all_statuses() {
2624 let backend = MemoryDocketBackend::new(DocketSettings::memory());
2625
2626 for i in 0..2 {
2628 let task = DocketTask::new(
2629 TaskId::from_string(format!("p-{i}")),
2630 "stat_test".to_string(),
2631 serde_json::json!({}),
2632 0,
2633 3,
2634 );
2635 backend.enqueue(task).unwrap();
2636 }
2637
2638 let _running = backend.dequeue(&["stat_test".to_string()]).unwrap();
2640
2641 let task = DocketTask::new(
2643 TaskId::from_string("c-0"),
2644 "stat_test".to_string(),
2645 serde_json::json!({}),
2646 0,
2647 3,
2648 );
2649 backend.enqueue(task).unwrap();
2650 let deq = backend
2651 .dequeue(&["stat_test".to_string()])
2652 .unwrap()
2653 .unwrap();
2654 backend.ack(&deq.id, serde_json::json!({})).unwrap();
2655
2656 let stats = backend.stats().unwrap();
2657 assert!(stats.pending >= 1);
2658 assert!(stats.in_progress >= 1 || stats.completed >= 1);
2659 }
2660
2661 #[test]
2662 fn memory_backend_requeue_stale_no_stale() {
2663 let backend = MemoryDocketBackend::new(DocketSettings::memory());
2664 let task = DocketTask::new(
2665 TaskId::from_string("t-fresh"),
2666 "stale_test".to_string(),
2667 serde_json::json!({}),
2668 0,
2669 3,
2670 );
2671 backend.enqueue(task).unwrap();
2672
2673 let _deq = backend
2675 .dequeue(&["stale_test".to_string()])
2676 .unwrap()
2677 .unwrap();
2678 let requeued = backend.requeue_stale().unwrap();
2679 assert_eq!(requeued, 0);
2680 }
2681
2682 #[test]
2683 fn memory_backend_ack_marks_completed() {
2684 let backend = MemoryDocketBackend::new(DocketSettings::memory());
2685 let task = DocketTask::new(
2686 TaskId::from_string("t-ack"),
2687 "ack_test".to_string(),
2688 serde_json::json!({}),
2689 0,
2690 3,
2691 );
2692 backend.enqueue(task).unwrap();
2693
2694 let deq = backend.dequeue(&["ack_test".to_string()]).unwrap().unwrap();
2695 backend
2696 .ack(&deq.id, serde_json::json!({"done": true}))
2697 .unwrap();
2698
2699 let task = backend
2700 .get_task(&TaskId::from_string("t-ack"))
2701 .unwrap()
2702 .unwrap();
2703 assert_eq!(task.status, TaskStatus::Completed);
2704 assert_eq!(task.result, Some(serde_json::json!({"done": true})));
2705 }
2706
2707 #[test]
2708 fn memory_backend_priority_ordering() {
2709 let backend = MemoryDocketBackend::new(DocketSettings::memory());
2710
2711 let low = DocketTask::new(
2713 TaskId::from_string("low"),
2714 "prio".to_string(),
2715 serde_json::json!({}),
2716 1,
2717 3,
2718 );
2719 let high = DocketTask::new(
2720 TaskId::from_string("high"),
2721 "prio".to_string(),
2722 serde_json::json!({}),
2723 10,
2724 3,
2725 );
2726 backend.enqueue(low).unwrap();
2727 backend.enqueue(high).unwrap();
2728
2729 let first = backend.dequeue(&["prio".to_string()]).unwrap().unwrap();
2731 assert_eq!(first.id.to_string(), "high");
2732 }
2733
2734 #[test]
2739 fn docket_debug() {
2740 let docket = Docket::memory();
2741 let debug = format!("{:?}", docket);
2742 assert!(debug.contains("Docket"));
2743 assert!(debug.contains("settings"));
2744 }
2745
2746 #[test]
2747 fn docket_settings_accessor() {
2748 let docket = Docket::memory();
2749 let settings = docket.settings();
2750 assert!(matches!(settings.backend, DocketBackendType::Memory));
2751 }
2752
2753 #[test]
2754 fn docket_into_shared() {
2755 let docket = Docket::memory();
2756 let shared: SharedDocket = docket.into_shared();
2757 let _clone = Arc::clone(&shared);
2759 assert!(matches!(
2760 shared.settings().backend,
2761 DocketBackendType::Memory
2762 ));
2763 }
2764
2765 #[test]
2766 fn docket_get_task_nonexistent() {
2767 let docket = Docket::memory();
2768 let result = docket
2769 .get_task(&TaskId::from_string("no-such-task"))
2770 .unwrap();
2771 assert!(result.is_none());
2772 }
2773
2774 #[test]
2775 fn docket_cancel_nonexistent_is_error() {
2776 let docket = Docket::memory();
2777 let result = docket.cancel(&TaskId::from_string("no-such"), None);
2778 assert!(result.is_err());
2779 }
2780
2781 #[test]
2782 fn docket_task_ids_are_sequential() {
2783 let docket = Docket::memory();
2784 let id1 = docket.submit("seq", serde_json::json!({})).unwrap();
2785 let id2 = docket.submit("seq", serde_json::json!({})).unwrap();
2786 assert_ne!(id1.to_string(), id2.to_string());
2788 assert!(id1.to_string().starts_with("docket-"));
2789 assert!(id2.to_string().starts_with("docket-"));
2790 }
2791
2792 #[test]
2793 fn docket_submit_with_max_retries_override() {
2794 let docket = Docket::memory();
2795 let id = docket
2796 .submit_with_options(
2797 "retry_over",
2798 serde_json::json!({}),
2799 SubmitOptions::new().with_max_retries(10),
2800 )
2801 .unwrap();
2802
2803 let task = docket.get_task(&id).unwrap().unwrap();
2804 assert_eq!(task.max_retries, 10);
2805 }
2806
2807 #[cfg(not(feature = "redis"))]
2808 #[test]
2809 fn docket_new_redis_without_feature_fails() {
2810 let settings = DocketSettings::redis("redis://localhost:6379");
2811 let result = Docket::new(settings);
2812 assert!(result.is_err());
2813 }
2814
2815 #[test]
2820 fn worker_is_running_initially_false() {
2821 let docket = Docket::memory();
2822 let worker = docket
2823 .worker()
2824 .subscribe("test", |_| async { Ok(serde_json::json!({})) })
2825 .build();
2826 assert!(!worker.is_running());
2827 }
2828
2829 #[test]
2830 fn worker_stop() {
2831 let docket = Docket::memory();
2832 let worker = docket
2833 .worker()
2834 .subscribe("test", |_| async { Ok(serde_json::json!({})) })
2835 .build();
2836
2837 worker.running.store(true, Ordering::SeqCst);
2839 assert!(worker.is_running());
2840
2841 worker.stop();
2842 assert!(!worker.is_running());
2843 }
2844
2845 #[test]
2846 fn worker_debug() {
2847 let docket = Docket::memory();
2848 let worker = docket
2849 .worker()
2850 .subscribe("type_x", |_| async { Ok(serde_json::json!({})) })
2851 .build();
2852 let debug = format!("{:?}", worker);
2853 assert!(debug.contains("Worker"));
2854 assert!(debug.contains("type_x"));
2855 }
2856
2857 #[test]
2858 fn worker_process_one_handler_error_nacks() {
2859 use fastmcp_core::block_on;
2860
2861 let settings = DocketSettings::memory().with_max_retries(2);
2862 let docket = Docket::new(settings).unwrap();
2863
2864 let id = docket.submit("fail_type", serde_json::json!({})).unwrap();
2865
2866 let worker = docket
2867 .worker()
2868 .subscribe("fail_type", |_| async {
2869 Err(DocketError::Handler("boom".to_string()))
2870 })
2871 .build();
2872
2873 let cx = Cx::for_testing();
2874 let processed = block_on(worker.process_one(&cx)).unwrap();
2876 assert!(processed);
2877
2878 let task = docket.get_task(&id).unwrap().unwrap();
2880 assert_eq!(task.retry_count, 1);
2881 assert_eq!(task.status, TaskStatus::Pending);
2882 assert!(task.error.as_deref().unwrap().contains("boom"));
2883
2884 let processed = block_on(worker.process_one(&cx)).unwrap();
2886 assert!(processed);
2887
2888 let task = docket.get_task(&id).unwrap().unwrap();
2889 assert_eq!(task.status, TaskStatus::Failed);
2890 assert_eq!(task.retry_count, 2);
2891 }
2892
2893 #[test]
2898 fn docket_settings_with_poll_interval() {
2899 let s = DocketSettings::memory().with_poll_interval(Duration::from_millis(500));
2900 assert_eq!(s.poll_interval, Duration::from_millis(500));
2901 }
2902
2903 #[test]
2904 fn docket_settings_with_max_retries() {
2905 let s = DocketSettings::memory().with_max_retries(10);
2906 assert_eq!(s.max_retries, 10);
2907 }
2908
2909 #[test]
2914 fn submit_options_clone() {
2915 let opts = SubmitOptions::new().with_priority(5).with_max_retries(3);
2916 let cloned = opts.clone();
2917 assert_eq!(cloned.priority, 5);
2918 assert_eq!(cloned.max_retries, Some(3));
2919 }
2920
2921 #[test]
2926 fn docket_task_to_task_info_running_has_started_at() {
2927 let mut task = DocketTask::new(
2928 TaskId::from_string("t"),
2929 "type".into(),
2930 serde_json::json!({}),
2931 0,
2932 3,
2933 );
2934 task.status = TaskStatus::Running;
2935 task.claimed_at = Some("2026-01-01T00:00:00Z".to_string());
2936
2937 let info = task.to_task_info();
2938 assert_eq!(info.status, TaskStatus::Running);
2939 assert_eq!(info.started_at, Some("2026-01-01T00:00:00Z".to_string()));
2940 assert!(info.completed_at.is_none());
2942 }
2943
2944 #[test]
2945 fn docket_task_to_task_info_cancelled_has_completed_at() {
2946 let mut task = DocketTask::new(
2947 TaskId::from_string("t"),
2948 "type".into(),
2949 serde_json::json!({}),
2950 0,
2951 3,
2952 );
2953 task.status = TaskStatus::Cancelled;
2954 task.error = Some("user cancelled".to_string());
2955
2956 let info = task.to_task_info();
2957 assert_eq!(info.status, TaskStatus::Cancelled);
2958 assert!(info.completed_at.is_some()); assert_eq!(info.error, Some("user cancelled".to_string()));
2960 }
2961
2962 #[test]
2963 fn docket_task_to_task_result_cancelled() {
2964 let mut task = DocketTask::new(
2965 TaskId::from_string("t"),
2966 "type".into(),
2967 serde_json::json!({}),
2968 0,
2969 3,
2970 );
2971 task.status = TaskStatus::Cancelled;
2972 task.error = Some("cancelled by user".to_string());
2973
2974 let result = task.to_task_result().expect("terminal");
2975 assert!(!result.success);
2976 assert_eq!(result.error, Some("cancelled by user".to_string()));
2977 }
2978
2979 #[test]
2984 fn memory_backend_cancel_running_task() {
2985 let backend = MemoryDocketBackend::new(DocketSettings::memory());
2986 let task = DocketTask::new(
2987 TaskId::from_string("t1"),
2988 "work".into(),
2989 serde_json::json!({}),
2990 0,
2991 3,
2992 );
2993 backend.enqueue(task).unwrap();
2994
2995 let _claimed = backend.dequeue(&["work".to_string()]).unwrap().unwrap();
2997
2998 backend
3000 .cancel(&TaskId::from_string("t1"), Some("force cancel"))
3001 .unwrap();
3002
3003 let task = backend
3004 .get_task(&TaskId::from_string("t1"))
3005 .unwrap()
3006 .unwrap();
3007 assert_eq!(task.status, TaskStatus::Cancelled);
3008 assert_eq!(task.error, Some("force cancel".to_string()));
3009 }
3010
3011 #[test]
3016 fn memory_backend_requeue_stale_with_stale_task() {
3017 let settings = DocketSettings::memory().with_visibility_timeout(Duration::from_millis(0));
3018 let backend = MemoryDocketBackend::new(settings);
3019
3020 let mut task = DocketTask::new(
3021 TaskId::from_string("t1"),
3022 "work".into(),
3023 serde_json::json!({}),
3024 0,
3025 3,
3026 );
3027 task.status = TaskStatus::Running;
3028 task.claimed_at = Some("2000-01-01T00:00:00Z".to_string()); backend.enqueue(task).unwrap();
3030
3031 {
3033 let mut tasks = backend.tasks.write().unwrap();
3034 let t = tasks.get_mut(&TaskId::from_string("t1")).unwrap();
3035 t.status = TaskStatus::Running;
3036 t.claimed_at = Some("2000-01-01T00:00:00Z".to_string());
3037 }
3038
3039 let requeued = backend.requeue_stale().unwrap();
3040 assert_eq!(requeued, 1);
3041
3042 let task = backend
3043 .get_task(&TaskId::from_string("t1"))
3044 .unwrap()
3045 .unwrap();
3046 assert_eq!(task.status, TaskStatus::Pending);
3047 assert!(task.claimed_at.is_none());
3048 assert_eq!(task.retry_count, 1);
3049 }
3050
3051 #[test]
3052 fn memory_backend_requeue_stale_at_retry_limit_marks_failed() {
3053 let settings = DocketSettings::memory()
3054 .with_visibility_timeout(Duration::from_millis(0))
3055 .with_max_retries(1);
3056 let backend = MemoryDocketBackend::new(settings);
3057
3058 let task = DocketTask::new(
3059 TaskId::from_string("t1"),
3060 "work".into(),
3061 serde_json::json!({}),
3062 0,
3063 1, );
3065 backend.enqueue(task).unwrap();
3066
3067 {
3069 let mut tasks = backend.tasks.write().unwrap();
3070 let t = tasks.get_mut(&TaskId::from_string("t1")).unwrap();
3071 t.status = TaskStatus::Running;
3072 t.claimed_at = Some("2000-01-01T00:00:00Z".to_string());
3073 }
3074
3075 let requeued = backend.requeue_stale().unwrap();
3076 assert_eq!(requeued, 0); let task = backend
3079 .get_task(&TaskId::from_string("t1"))
3080 .unwrap()
3081 .unwrap();
3082 assert_eq!(task.status, TaskStatus::Failed);
3083 assert_eq!(task.error.as_deref(), Some("Exceeded visibility timeout"));
3084 }
3085
3086 #[test]
3091 fn memory_backend_nack_under_limit_requeues() {
3092 let settings = DocketSettings::memory().with_max_retries(3);
3093 let backend = MemoryDocketBackend::new(settings);
3094
3095 let task = DocketTask::new(
3096 TaskId::from_string("t1"),
3097 "work".into(),
3098 serde_json::json!({}),
3099 0,
3100 3,
3101 );
3102 backend.enqueue(task).unwrap();
3103
3104 let _claimed = backend.dequeue(&["work".to_string()]).unwrap().unwrap();
3106
3107 backend.nack(&TaskId::from_string("t1"), "fail1").unwrap();
3109
3110 let task = backend
3111 .get_task(&TaskId::from_string("t1"))
3112 .unwrap()
3113 .unwrap();
3114 assert_eq!(task.status, TaskStatus::Pending);
3115 assert_eq!(task.retry_count, 1);
3116 }
3117
3118 #[test]
3119 fn memory_backend_nack_at_limit_marks_failed() {
3120 let settings = DocketSettings::memory().with_max_retries(1);
3121 let backend = MemoryDocketBackend::new(settings);
3122
3123 let task = DocketTask::new(
3124 TaskId::from_string("t1"),
3125 "work".into(),
3126 serde_json::json!({}),
3127 0,
3128 1,
3129 );
3130 backend.enqueue(task).unwrap();
3131
3132 let _claimed = backend.dequeue(&["work".to_string()]).unwrap().unwrap();
3134
3135 backend.nack(&TaskId::from_string("t1"), "fatal").unwrap();
3137
3138 let task = backend
3139 .get_task(&TaskId::from_string("t1"))
3140 .unwrap()
3141 .unwrap();
3142 assert_eq!(task.status, TaskStatus::Failed);
3143 assert_eq!(task.retry_count, 1);
3144 }
3145
3146 #[test]
3151 fn memory_backend_dequeue_multiple_types() {
3152 let backend = MemoryDocketBackend::new(DocketSettings::memory());
3153
3154 let task_a = DocketTask::new(
3155 TaskId::from_string("a"),
3156 "type_a".into(),
3157 serde_json::json!({}),
3158 0,
3159 3,
3160 );
3161 let task_b = DocketTask::new(
3162 TaskId::from_string("b"),
3163 "type_b".into(),
3164 serde_json::json!({}),
3165 0,
3166 3,
3167 );
3168 backend.enqueue(task_a).unwrap();
3169 backend.enqueue(task_b).unwrap();
3170
3171 let claimed = backend
3173 .dequeue(&["type_a".to_string(), "type_b".to_string()])
3174 .unwrap()
3175 .unwrap();
3176 assert_eq!(claimed.task_type, "type_a");
3177 }
3178
3179 #[test]
3184 fn docket_memory_constructor() {
3185 let d = Docket::memory();
3186 assert!(matches!(d.settings().backend, DocketBackendType::Memory));
3187 }
3188
3189 #[test]
3194 fn worker_subscribed_types_contains_all() {
3195 let docket = Docket::memory();
3196 let worker = docket
3197 .worker()
3198 .subscribe("x", |_| async { Ok(serde_json::json!(1)) })
3199 .subscribe("y", |_| async { Ok(serde_json::json!(2)) })
3200 .subscribe("z", |_| async { Ok(serde_json::json!(3)) })
3201 .build();
3202
3203 let types = worker.subscribed_types();
3204 assert_eq!(types.len(), 3);
3205 assert!(types.contains(&"x".to_string()));
3206 assert!(types.contains(&"y".to_string()));
3207 assert!(types.contains(&"z".to_string()));
3208 }
3209
3210 #[test]
3215 fn worker_process_one_cancelled_cx_returns_error() {
3216 use fastmcp_core::block_on;
3217
3218 let docket = Docket::memory();
3219 docket.submit("cancel_test", serde_json::json!({})).unwrap();
3220
3221 let worker = docket
3222 .worker()
3223 .subscribe("cancel_test", |_| async { Ok(serde_json::json!({})) })
3224 .build();
3225
3226 let cx = Cx::for_testing();
3227 cx.set_cancel_requested(true);
3228
3229 let result = block_on(worker.process_one(&cx));
3230 assert!(result.is_err());
3231 }
3232
3233 #[test]
3238 fn cancel_with_none_reason_sets_no_error() {
3239 let docket = Docket::memory();
3240 let id = docket.submit("t", serde_json::json!({})).unwrap();
3241 docket.cancel(&id, None).unwrap();
3242
3243 let task = docket.get_task(&id).unwrap().unwrap();
3244 assert_eq!(task.status, TaskStatus::Cancelled);
3245 assert!(task.error.is_none());
3246 }
3247
3248 #[test]
3253 fn docket_task_to_task_info_failed_has_completed_at() {
3254 let mut task = DocketTask::new(
3255 TaskId::from_string("t"),
3256 "type".into(),
3257 serde_json::json!({}),
3258 0,
3259 3,
3260 );
3261 task.status = TaskStatus::Failed;
3262 task.error = Some("crash".to_string());
3263
3264 let info = task.to_task_info();
3265 assert_eq!(info.status, TaskStatus::Failed);
3266 assert!(info.completed_at.is_some()); assert_eq!(info.error, Some("crash".to_string()));
3268 }
3269
3270 #[test]
3275 fn memory_backend_cancel_nonexistent_returns_error() {
3276 let backend = MemoryDocketBackend::new(DocketSettings::memory());
3277 let result = backend.cancel(&TaskId::from_string("no-such"), None);
3278 assert!(result.is_err());
3279 }
3280
3281 #[test]
3286 fn submit_with_options_uses_default_max_retries() {
3287 let settings = DocketSettings::memory().with_max_retries(7);
3288 let docket = Docket::new(settings).unwrap();
3289
3290 let id = docket
3292 .submit_with_options("t", serde_json::json!({}), SubmitOptions::default())
3293 .unwrap();
3294
3295 let task = docket.get_task(&id).unwrap().unwrap();
3296 assert_eq!(task.max_retries, 7); }
3298
3299 #[test]
3304 fn memory_backend_same_priority_fifo() {
3305 let backend = MemoryDocketBackend::new(DocketSettings::memory());
3306
3307 for i in 0..3 {
3308 let task = DocketTask::new(
3309 TaskId::from_string(format!("t-{i}")),
3310 "fifo".into(),
3311 serde_json::json!({}),
3312 0, 3,
3314 );
3315 backend.enqueue(task).unwrap();
3316 }
3317
3318 let first = backend.dequeue(&["fifo".to_string()]).unwrap().unwrap();
3319 assert_eq!(first.id.to_string(), "t-0");
3320 let second = backend.dequeue(&["fifo".to_string()]).unwrap().unwrap();
3321 assert_eq!(second.id.to_string(), "t-1");
3322 let third = backend.dequeue(&["fifo".to_string()]).unwrap().unwrap();
3323 assert_eq!(third.id.to_string(), "t-2");
3324 }
3325
3326 #[test]
3331 fn memory_backend_requeue_stale_skips_invalid_claimed_at() {
3332 let settings = DocketSettings::memory().with_visibility_timeout(Duration::from_millis(0));
3333 let backend = MemoryDocketBackend::new(settings);
3334
3335 let task = DocketTask::new(
3336 TaskId::from_string("t1"),
3337 "work".into(),
3338 serde_json::json!({}),
3339 0,
3340 3,
3341 );
3342 backend.enqueue(task).unwrap();
3343
3344 {
3346 let mut tasks = backend.tasks.write().unwrap();
3347 let t = tasks.get_mut(&TaskId::from_string("t1")).unwrap();
3348 t.status = TaskStatus::Running;
3349 t.claimed_at = Some("not-a-valid-timestamp".to_string());
3350 }
3351
3352 let requeued = backend.requeue_stale().unwrap();
3354 assert_eq!(requeued, 0);
3355
3356 let task = backend
3358 .get_task(&TaskId::from_string("t1"))
3359 .unwrap()
3360 .unwrap();
3361 assert_eq!(task.status, TaskStatus::Running);
3362 }
3363
3364 #[test]
3369 fn docket_error_cancelled_into_mcp_error() {
3370 let err = DocketError::Cancelled;
3371 let mcp: McpError = err.into();
3372 assert!(mcp.message.contains("cancelled"));
3373 }
3374
3375 #[cfg(feature = "redis")]
3376 #[test]
3377 fn test_redis_requeue_stale_marks_failed_at_retry_limit() {
3378 let redis_server = TestRedisServer::start();
3379 let mut settings = redis_settings_for_test(&redis_server.url);
3380 settings.visibility_timeout = Duration::from_millis(0);
3381 settings.max_retries = 1;
3382 let docket = Docket::new(settings).expect("redis docket");
3383
3384 let task_id = docket
3385 .submit("redis_stale_fail", serde_json::json!({}))
3386 .expect("submit");
3387
3388 let task_types = vec!["redis_stale_fail".to_string()];
3389 let _claimed = docket
3390 .backend
3391 .dequeue(&task_types)
3392 .expect("dequeue")
3393 .expect("claimed");
3394
3395 let requeued = docket.backend.requeue_stale().expect("requeue stale");
3396 assert_eq!(requeued, 0);
3397
3398 let task = docket
3399 .get_task(&task_id)
3400 .expect("get task")
3401 .expect("task exists");
3402 assert_eq!(task.status, TaskStatus::Failed);
3403 assert_eq!(task.error.as_deref(), Some("Exceeded visibility timeout"));
3404 }
3405}