1use serde::{Deserialize, Serialize};
2use std::collections::HashMap;
3use uuid::Uuid;
4
5#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
7pub struct Signature {
8 pub task: String,
10
11 #[serde(default)]
13 pub args: Vec<serde_json::Value>,
14
15 #[serde(default)]
17 pub kwargs: HashMap<String, serde_json::Value>,
18
19 #[serde(default)]
21 pub options: TaskOptions,
22
23 #[serde(default)]
25 pub immutable: bool,
26}
27
28impl Signature {
29 pub fn new(task: String) -> Self {
30 Self {
31 task,
32 args: Vec::new(),
33 kwargs: HashMap::new(),
34 options: TaskOptions::default(),
35 immutable: false,
36 }
37 }
38
39 pub fn with_args(mut self, args: Vec<serde_json::Value>) -> Self {
40 self.args = args;
41 self
42 }
43
44 pub fn with_kwargs(mut self, kwargs: HashMap<String, serde_json::Value>) -> Self {
45 self.kwargs = kwargs;
46 self
47 }
48
49 pub fn with_priority(mut self, priority: u8) -> Self {
50 self.options.priority = Some(priority);
51 self
52 }
53
54 pub fn with_queue(mut self, queue: String) -> Self {
55 self.options.queue = Some(queue);
56 self
57 }
58
59 pub fn with_task_id(mut self, task_id: Uuid) -> Self {
60 self.options.task_id = Some(task_id);
61 self
62 }
63
64 pub fn with_link(mut self, link: Signature) -> Self {
65 self.options.link = Some(Box::new(link));
66 self
67 }
68
69 pub fn with_link_error(mut self, link_error: Signature) -> Self {
70 self.options.link_error = Some(Box::new(link_error));
71 self
72 }
73
74 pub fn add_link(mut self, link: Signature) -> Self {
76 self.options.links.push(link);
77 self
78 }
79
80 pub fn add_link_error(mut self, link_error: Signature) -> Self {
82 self.options.link_errors.push(link_error);
83 self
84 }
85
86 pub fn with_on_retry(mut self, callback: Signature) -> Self {
88 self.options.on_retry = Some(Box::new(callback));
89 self
90 }
91
92 pub fn with_soft_time_limit(mut self, seconds: u64) -> Self {
94 self.options.soft_time_limit = Some(seconds);
95 self
96 }
97
98 pub fn with_time_limit(mut self, seconds: u64) -> Self {
100 self.options.time_limit = Some(seconds);
101 self
102 }
103
104 pub fn with_retry_delay(mut self, seconds: u64) -> Self {
106 self.options.retry_delay = Some(seconds);
107 self
108 }
109
110 pub fn with_retry_backoff(mut self, factor: f64) -> Self {
112 self.options.retry_backoff = Some(factor);
113 self
114 }
115
116 pub fn with_retry_backoff_max(mut self, seconds: u64) -> Self {
118 self.options.retry_backoff_max = Some(seconds);
119 self
120 }
121
122 pub fn with_retry_jitter(mut self, jitter: bool) -> Self {
124 self.options.retry_jitter = Some(jitter);
125 self
126 }
127
128 pub fn immutable(mut self) -> Self {
129 self.immutable = true;
130 self
131 }
132
133 pub fn has_args(&self) -> bool {
135 !self.args.is_empty()
136 }
137
138 pub fn has_kwargs(&self) -> bool {
140 !self.kwargs.is_empty()
141 }
142
143 pub fn is_immutable(&self) -> bool {
145 self.immutable
146 }
147
148 pub fn has_kwarg(&self, key: &str) -> bool {
150 self.kwargs.contains_key(key)
151 }
152
153 pub fn get_kwarg(&self, key: &str) -> Option<&serde_json::Value> {
155 self.kwargs.get(key)
156 }
157
158 pub fn add_kwarg(mut self, key: impl Into<String>, value: serde_json::Value) -> Self {
160 self.kwargs.insert(key.into(), value);
161 self
162 }
163
164 pub fn add_arg(mut self, arg: serde_json::Value) -> Self {
166 self.args.push(arg);
167 self
168 }
169
170 pub fn clone_signature(&self) -> Self {
172 self.clone()
173 }
174
175 pub fn si(self) -> Self {
191 self.immutable()
192 }
193
194 pub fn partial(mut self, args: Vec<serde_json::Value>) -> Self {
212 self.args = args;
213 self
214 }
215
216 pub fn complete(mut self, additional_args: Vec<serde_json::Value>) -> Self {
221 if self.immutable {
222 return self;
223 }
224 self.args.extend(additional_args);
225 self
226 }
227
228 pub fn merge(mut self, other: Signature) -> Self {
233 for (key, value) in other.kwargs {
235 self.kwargs.insert(key, value);
236 }
237
238 if self.options.priority.is_none() {
240 self.options.priority = other.options.priority;
241 }
242 if self.options.queue.is_none() {
243 self.options.queue = other.options.queue;
244 }
245 if self.options.task_id.is_none() {
246 self.options.task_id = other.options.task_id;
247 }
248 if self.options.link.is_none() {
249 self.options.link = other.options.link;
250 }
251 if self.options.link_error.is_none() {
252 self.options.link_error = other.options.link_error;
253 }
254
255 self
256 }
257
258 pub fn replace_args(mut self, args: Vec<serde_json::Value>) -> Option<Self> {
263 if self.immutable {
264 return None;
265 }
266 self.args = args;
267 Some(self)
268 }
269
270 pub fn with_expires(mut self, expires: u64) -> Self {
272 self.options.expires = Some(expires);
273 self
274 }
275
276 pub fn with_countdown(mut self, countdown: u64) -> Self {
278 self.options.countdown = Some(countdown);
279 self
280 }
281
282 pub fn with_retries(mut self, max_retries: u32) -> Self {
284 self.options.max_retries = Some(max_retries);
285 self
286 }
287
288 pub fn with_routing_key(mut self, routing_key: String) -> Self {
290 self.options.routing_key = Some(routing_key);
291 self
292 }
293
294 pub fn with_callback_arg_mode(mut self, mode: CallbackArgMode) -> Self {
306 self.options.callback_arg_mode = mode;
307 self
308 }
309
310 pub fn with_callback_kwarg_key(mut self, key: impl Into<String>) -> Self {
315 self.options.callback_kwarg_key = Some(key.into());
316 self
317 }
318
319 pub fn with_result_as_kwarg(mut self, key: impl Into<String>) -> Self {
323 self.options.callback_arg_mode = CallbackArgMode::Kwarg;
324 self.options.callback_kwarg_key = Some(key.into());
325 self
326 }
327
328 pub fn to_json(&self) -> Result<String, serde_json::Error> {
330 serde_json::to_string(self)
331 }
332
333 pub fn from_json(json: &str) -> Result<Self, serde_json::Error> {
335 serde_json::from_str(json)
336 }
337
338 pub fn to_json_bytes(&self) -> Result<Vec<u8>, serde_json::Error> {
340 serde_json::to_vec(self)
341 }
342
343 pub fn from_json_bytes(bytes: &[u8]) -> Result<Self, serde_json::Error> {
345 serde_json::from_slice(bytes)
346 }
347
348 pub fn clear_args(mut self) -> Option<Self> {
363 if self.immutable {
364 return None;
365 }
366 self.args.clear();
367 Some(self)
368 }
369
370 pub fn clear_kwargs(mut self) -> Self {
385 self.kwargs.clear();
386 self
387 }
388
389 pub fn remove_kwarg(mut self, key: &str) -> Self {
404 self.kwargs.remove(key);
405 self
406 }
407
408 pub fn args_count(&self) -> usize {
420 self.args.len()
421 }
422
423 pub fn kwargs_count(&self) -> usize {
436 self.kwargs.len()
437 }
438
439 pub fn kwarg_keys(&self) -> Vec<&str> {
455 self.kwargs.keys().map(|k| k.as_str()).collect()
456 }
457
458 pub fn has_retry_config(&self) -> bool {
471 self.options.max_retries.is_some()
472 || self.options.retry_delay.is_some()
473 || self.options.retry_backoff.is_some()
474 }
475
476 pub fn has_time_limit_config(&self) -> bool {
489 self.options.time_limit.is_some() || self.options.soft_time_limit.is_some()
490 }
491
492 pub fn clone_without_args(&self) -> Self {
508 Self {
509 task: self.task.clone(),
510 args: Vec::new(),
511 kwargs: HashMap::new(),
512 options: self.options.clone(),
513 immutable: self.immutable,
514 }
515 }
516
517 pub fn estimated_size(&self) -> usize {
532 self.to_json().map(|s| s.len()).unwrap_or(0)
533 }
534}
535
536impl std::fmt::Display for Signature {
537 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
538 write!(f, "Signature[task={}]", self.task)?;
539 if !self.args.is_empty() {
540 write!(f, " args={}", self.args.len())?;
541 }
542 if !self.kwargs.is_empty() {
543 write!(f, " kwargs={}", self.kwargs.len())?;
544 }
545 if self.immutable {
546 write!(f, " (immutable)")?;
547 }
548 Ok(())
549 }
550}
551
552#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, Default)]
554pub enum CallbackArgMode {
555 #[default]
557 Prepend,
558
559 Append,
561
562 Kwarg,
564
565 None,
567}
568
569impl CallbackArgMode {
570 pub fn kwarg() -> Self {
572 Self::Kwarg
573 }
574
575 pub fn passes_result(&self) -> bool {
577 !matches!(self, Self::None)
578 }
579}
580
581impl std::fmt::Display for CallbackArgMode {
582 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
583 match self {
584 Self::Prepend => write!(f, "prepend"),
585 Self::Append => write!(f, "append"),
586 Self::Kwarg => write!(f, "kwarg"),
587 Self::None => write!(f, "none"),
588 }
589 }
590}
591
592fn is_default_callback_arg_mode(mode: &CallbackArgMode) -> bool {
594 *mode == CallbackArgMode::Prepend
595}
596
597#[derive(Debug, Clone, Serialize, Deserialize, Default, PartialEq)]
599pub struct TaskOptions {
600 pub priority: Option<u8>,
602
603 pub queue: Option<String>,
605
606 pub task_id: Option<Uuid>,
608
609 pub link: Option<Box<Signature>>,
611
612 pub link_error: Option<Box<Signature>>,
614
615 #[serde(default, skip_serializing_if = "Vec::is_empty")]
617 pub links: Vec<Signature>,
618
619 #[serde(default, skip_serializing_if = "Vec::is_empty")]
621 pub link_errors: Vec<Signature>,
622
623 #[serde(skip_serializing_if = "Option::is_none")]
625 pub on_retry: Option<Box<Signature>>,
626
627 #[serde(default, skip_serializing_if = "is_default_callback_arg_mode")]
629 pub callback_arg_mode: CallbackArgMode,
630
631 #[serde(skip_serializing_if = "Option::is_none")]
633 pub callback_kwarg_key: Option<String>,
634
635 #[serde(skip_serializing_if = "Option::is_none")]
637 pub expires: Option<u64>,
638
639 #[serde(skip_serializing_if = "Option::is_none")]
641 pub countdown: Option<u64>,
642
643 #[serde(skip_serializing_if = "Option::is_none")]
645 pub max_retries: Option<u32>,
646
647 #[serde(skip_serializing_if = "Option::is_none")]
649 pub routing_key: Option<String>,
650
651 #[serde(skip_serializing_if = "Option::is_none")]
653 pub soft_time_limit: Option<u64>,
654
655 #[serde(skip_serializing_if = "Option::is_none")]
657 pub time_limit: Option<u64>,
658
659 #[serde(skip_serializing_if = "Option::is_none")]
661 pub retry_delay: Option<u64>,
662
663 #[serde(skip_serializing_if = "Option::is_none")]
665 pub retry_backoff: Option<f64>,
666
667 #[serde(skip_serializing_if = "Option::is_none")]
669 pub retry_backoff_max: Option<u64>,
670
671 #[serde(skip_serializing_if = "Option::is_none")]
673 pub retry_jitter: Option<bool>,
674}
675
676impl TaskOptions {
677 pub fn has_priority(&self) -> bool {
679 self.priority.is_some()
680 }
681
682 pub fn has_queue(&self) -> bool {
684 self.queue.is_some()
685 }
686
687 pub fn has_task_id(&self) -> bool {
689 self.task_id.is_some()
690 }
691
692 pub fn has_link(&self) -> bool {
694 self.link.is_some() || !self.links.is_empty()
695 }
696
697 pub fn has_link_error(&self) -> bool {
699 self.link_error.is_some() || !self.link_errors.is_empty()
700 }
701
702 pub fn has_on_retry(&self) -> bool {
704 self.on_retry.is_some()
705 }
706
707 pub fn has_expires(&self) -> bool {
709 self.expires.is_some()
710 }
711
712 pub fn has_countdown(&self) -> bool {
714 self.countdown.is_some()
715 }
716
717 pub fn has_max_retries(&self) -> bool {
719 self.max_retries.is_some()
720 }
721
722 pub fn has_routing_key(&self) -> bool {
724 self.routing_key.is_some()
725 }
726
727 pub fn has_soft_time_limit(&self) -> bool {
729 self.soft_time_limit.is_some()
730 }
731
732 pub fn has_time_limit(&self) -> bool {
734 self.time_limit.is_some()
735 }
736
737 pub fn all_links(&self) -> Vec<&Signature> {
739 let mut result = Vec::new();
740 if let Some(ref link) = self.link {
741 result.push(link.as_ref());
742 }
743 for link in &self.links {
744 result.push(link);
745 }
746 result
747 }
748
749 pub fn all_link_errors(&self) -> Vec<&Signature> {
751 let mut result = Vec::new();
752 if let Some(ref link_error) = self.link_error {
753 result.push(link_error.as_ref());
754 }
755 for link_error in &self.link_errors {
756 result.push(link_error);
757 }
758 result
759 }
760
761 pub fn calculate_retry_delay(&self, retry_count: u32) -> u64 {
763 let base_delay = self.retry_delay.unwrap_or(1);
764 let backoff = self.retry_backoff.unwrap_or(2.0);
765 let max_delay = self.retry_backoff_max.unwrap_or(3600);
766
767 let delay = (base_delay as f64 * backoff.powi(retry_count as i32)) as u64;
768 delay.min(max_delay)
769 }
770
771 pub fn callback_arg_mode(&self) -> CallbackArgMode {
773 self.callback_arg_mode
774 }
775
776 pub fn callback_kwarg_key(&self) -> &str {
778 self.callback_kwarg_key.as_deref().unwrap_or("result")
779 }
780
781 pub fn prepare_callback(
793 &self,
794 mut callback: Signature,
795 result: serde_json::Value,
796 ) -> Signature {
797 if callback.immutable {
799 return callback;
800 }
801
802 match self.callback_arg_mode {
803 CallbackArgMode::Prepend => {
804 callback.args.insert(0, result);
805 }
806 CallbackArgMode::Append => {
807 callback.args.push(result);
808 }
809 CallbackArgMode::Kwarg => {
810 let key = self.callback_kwarg_key().to_string();
811 callback.kwargs.insert(key, result);
812 }
813 CallbackArgMode::None => {
814 }
816 }
817
818 callback
819 }
820}
821
822impl std::fmt::Display for TaskOptions {
823 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
824 let mut parts = Vec::new();
825 if let Some(priority) = self.priority {
826 parts.push(format!("priority={}", priority));
827 }
828 if let Some(ref queue) = self.queue {
829 parts.push(format!("queue={}", queue));
830 }
831 if let Some(task_id) = self.task_id {
832 parts.push(format!("task_id={}", &task_id.to_string()[..8]));
833 }
834 if self.link.is_some() {
835 parts.push("link=yes".to_string());
836 }
837 if self.link_error.is_some() {
838 parts.push("link_error=yes".to_string());
839 }
840 if let Some(expires) = self.expires {
841 parts.push(format!("expires={}s", expires));
842 }
843 if let Some(countdown) = self.countdown {
844 parts.push(format!("countdown={}s", countdown));
845 }
846 if let Some(max_retries) = self.max_retries {
847 parts.push(format!("retries={}", max_retries));
848 }
849 if let Some(ref routing_key) = self.routing_key {
850 parts.push(format!("routing={}", routing_key));
851 }
852 if parts.is_empty() {
853 write!(f, "TaskOptions[default]")
854 } else {
855 write!(f, "TaskOptions[{}]", parts.join(", "))
856 }
857 }
858}