Skip to main content

celers_canvas/
signature.rs

1use serde::{Deserialize, Serialize};
2use std::collections::HashMap;
3use uuid::Uuid;
4
5/// Signature (a task definition with arguments)
6#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
7pub struct Signature {
8    /// Task name
9    pub task: String,
10
11    /// Positional arguments
12    #[serde(default)]
13    pub args: Vec<serde_json::Value>,
14
15    /// Keyword arguments
16    #[serde(default)]
17    pub kwargs: HashMap<String, serde_json::Value>,
18
19    /// Task options
20    #[serde(default)]
21    pub options: TaskOptions,
22
23    /// Immutability flag (whether args can be replaced)
24    #[serde(default)]
25    pub immutable: bool,
26}
27
28impl Signature {
29    pub fn new(task: String) -> Self {
30        Self {
31            task,
32            args: Vec::new(),
33            kwargs: HashMap::new(),
34            options: TaskOptions::default(),
35            immutable: false,
36        }
37    }
38
39    pub fn with_args(mut self, args: Vec<serde_json::Value>) -> Self {
40        self.args = args;
41        self
42    }
43
44    pub fn with_kwargs(mut self, kwargs: HashMap<String, serde_json::Value>) -> Self {
45        self.kwargs = kwargs;
46        self
47    }
48
49    pub fn with_priority(mut self, priority: u8) -> Self {
50        self.options.priority = Some(priority);
51        self
52    }
53
54    pub fn with_queue(mut self, queue: String) -> Self {
55        self.options.queue = Some(queue);
56        self
57    }
58
59    pub fn with_task_id(mut self, task_id: Uuid) -> Self {
60        self.options.task_id = Some(task_id);
61        self
62    }
63
64    pub fn with_link(mut self, link: Signature) -> Self {
65        self.options.link = Some(Box::new(link));
66        self
67    }
68
69    pub fn with_link_error(mut self, link_error: Signature) -> Self {
70        self.options.link_error = Some(Box::new(link_error));
71        self
72    }
73
74    /// Add a callback to the success callback chain
75    pub fn add_link(mut self, link: Signature) -> Self {
76        self.options.links.push(link);
77        self
78    }
79
80    /// Add a callback to the error callback chain
81    pub fn add_link_error(mut self, link_error: Signature) -> Self {
82        self.options.link_errors.push(link_error);
83        self
84    }
85
86    /// Set the on_retry callback
87    pub fn with_on_retry(mut self, callback: Signature) -> Self {
88        self.options.on_retry = Some(Box::new(callback));
89        self
90    }
91
92    /// Set soft time limit (warning before kill)
93    pub fn with_soft_time_limit(mut self, seconds: u64) -> Self {
94        self.options.soft_time_limit = Some(seconds);
95        self
96    }
97
98    /// Set hard time limit (force kill)
99    pub fn with_time_limit(mut self, seconds: u64) -> Self {
100        self.options.time_limit = Some(seconds);
101        self
102    }
103
104    /// Set retry delay in seconds
105    pub fn with_retry_delay(mut self, seconds: u64) -> Self {
106        self.options.retry_delay = Some(seconds);
107        self
108    }
109
110    /// Set retry backoff factor (exponential multiplier)
111    pub fn with_retry_backoff(mut self, factor: f64) -> Self {
112        self.options.retry_backoff = Some(factor);
113        self
114    }
115
116    /// Set maximum retry delay
117    pub fn with_retry_backoff_max(mut self, seconds: u64) -> Self {
118        self.options.retry_backoff_max = Some(seconds);
119        self
120    }
121
122    /// Enable/disable retry jitter
123    pub fn with_retry_jitter(mut self, jitter: bool) -> Self {
124        self.options.retry_jitter = Some(jitter);
125        self
126    }
127
128    pub fn immutable(mut self) -> Self {
129        self.immutable = true;
130        self
131    }
132
133    /// Check if task has arguments
134    pub fn has_args(&self) -> bool {
135        !self.args.is_empty()
136    }
137
138    /// Check if task has keyword arguments
139    pub fn has_kwargs(&self) -> bool {
140        !self.kwargs.is_empty()
141    }
142
143    /// Check if task is immutable (args cannot be replaced)
144    pub fn is_immutable(&self) -> bool {
145        self.immutable
146    }
147
148    /// Check if task has a specific kwarg
149    pub fn has_kwarg(&self, key: &str) -> bool {
150        self.kwargs.contains_key(key)
151    }
152
153    /// Get a kwarg value
154    pub fn get_kwarg(&self, key: &str) -> Option<&serde_json::Value> {
155        self.kwargs.get(key)
156    }
157
158    /// Add a single kwarg
159    pub fn add_kwarg(mut self, key: impl Into<String>, value: serde_json::Value) -> Self {
160        self.kwargs.insert(key.into(), value);
161        self
162    }
163
164    /// Add a single argument
165    pub fn add_arg(mut self, arg: serde_json::Value) -> Self {
166        self.args.push(arg);
167        self
168    }
169
170    /// Clone the signature
171    pub fn clone_signature(&self) -> Self {
172        self.clone()
173    }
174
175    /// Create an immutable signature (shorthand for `.immutable()`)
176    ///
177    /// This is equivalent to Python Celery's `.si()` method.
178    /// Immutable signatures cannot have their arguments replaced when used in workflows.
179    ///
180    /// # Example
181    /// ```
182    /// use celers_canvas::Signature;
183    ///
184    /// let sig = Signature::new("process".to_string())
185    ///     .with_args(vec![serde_json::json!(1)])
186    ///     .si();
187    ///
188    /// assert!(sig.is_immutable());
189    /// ```
190    pub fn si(self) -> Self {
191        self.immutable()
192    }
193
194    /// Create a partial signature with some arguments pre-filled
195    ///
196    /// The partial signature can have additional arguments added later.
197    /// This is useful for creating task templates with some fixed arguments.
198    ///
199    /// # Example
200    /// ```
201    /// use celers_canvas::Signature;
202    ///
203    /// // Create a partial with first argument fixed
204    /// let partial = Signature::new("add".to_string())
205    ///     .partial(vec![serde_json::json!(10)]);
206    ///
207    /// // Complete with remaining arguments
208    /// let complete = partial.complete(vec![serde_json::json!(5)]);
209    /// assert_eq!(complete.args.len(), 2);
210    /// ```
211    pub fn partial(mut self, args: Vec<serde_json::Value>) -> Self {
212        self.args = args;
213        self
214    }
215
216    /// Complete a partial signature with additional arguments
217    ///
218    /// Appends the provided arguments to the existing arguments.
219    /// If the signature is immutable, returns the signature unchanged.
220    pub fn complete(mut self, additional_args: Vec<serde_json::Value>) -> Self {
221        if self.immutable {
222            return self;
223        }
224        self.args.extend(additional_args);
225        self
226    }
227
228    /// Merge another signature into this one
229    ///
230    /// This combines kwargs from both signatures (the other's kwargs take precedence)
231    /// and inherits options from the other signature if not already set.
232    pub fn merge(mut self, other: Signature) -> Self {
233        // Merge kwargs (other takes precedence)
234        for (key, value) in other.kwargs {
235            self.kwargs.insert(key, value);
236        }
237
238        // Inherit options if not already set
239        if self.options.priority.is_none() {
240            self.options.priority = other.options.priority;
241        }
242        if self.options.queue.is_none() {
243            self.options.queue = other.options.queue;
244        }
245        if self.options.task_id.is_none() {
246            self.options.task_id = other.options.task_id;
247        }
248        if self.options.link.is_none() {
249            self.options.link = other.options.link;
250        }
251        if self.options.link_error.is_none() {
252            self.options.link_error = other.options.link_error;
253        }
254
255        self
256    }
257
258    /// Replace arguments in signature (respects immutability)
259    ///
260    /// If the signature is immutable, returns None.
261    /// Otherwise, returns a new signature with replaced arguments.
262    pub fn replace_args(mut self, args: Vec<serde_json::Value>) -> Option<Self> {
263        if self.immutable {
264            return None;
265        }
266        self.args = args;
267        Some(self)
268    }
269
270    /// Set expiration time in seconds
271    pub fn with_expires(mut self, expires: u64) -> Self {
272        self.options.expires = Some(expires);
273        self
274    }
275
276    /// Set countdown (delay before execution) in seconds
277    pub fn with_countdown(mut self, countdown: u64) -> Self {
278        self.options.countdown = Some(countdown);
279        self
280    }
281
282    /// Set retry policy
283    pub fn with_retries(mut self, max_retries: u32) -> Self {
284        self.options.max_retries = Some(max_retries);
285        self
286    }
287
288    /// Set task routing key
289    pub fn with_routing_key(mut self, routing_key: String) -> Self {
290        self.options.routing_key = Some(routing_key);
291        self
292    }
293
294    /// Set callback argument passing mode
295    ///
296    /// Controls how task result is passed to linked callbacks.
297    ///
298    /// # Example
299    /// ```
300    /// use celers_canvas::{Signature, CallbackArgMode};
301    ///
302    /// let sig = Signature::new("task".to_string())
303    ///     .with_callback_arg_mode(CallbackArgMode::Append);
304    /// ```
305    pub fn with_callback_arg_mode(mut self, mode: CallbackArgMode) -> Self {
306        self.options.callback_arg_mode = mode;
307        self
308    }
309
310    /// Set callback kwarg key (used when CallbackArgMode::Kwarg)
311    ///
312    /// Specifies the keyword argument name for passing the result.
313    /// Defaults to "result" if not set.
314    pub fn with_callback_kwarg_key(mut self, key: impl Into<String>) -> Self {
315        self.options.callback_kwarg_key = Some(key.into());
316        self
317    }
318
319    /// Configure callback to receive result as keyword argument
320    ///
321    /// Shorthand for setting CallbackArgMode::Kwarg with a key.
322    pub fn with_result_as_kwarg(mut self, key: impl Into<String>) -> Self {
323        self.options.callback_arg_mode = CallbackArgMode::Kwarg;
324        self.options.callback_kwarg_key = Some(key.into());
325        self
326    }
327
328    /// Serialize signature to JSON string
329    pub fn to_json(&self) -> Result<String, serde_json::Error> {
330        serde_json::to_string(self)
331    }
332
333    /// Deserialize signature from JSON string
334    pub fn from_json(json: &str) -> Result<Self, serde_json::Error> {
335        serde_json::from_str(json)
336    }
337
338    /// Serialize signature to JSON bytes
339    pub fn to_json_bytes(&self) -> Result<Vec<u8>, serde_json::Error> {
340        serde_json::to_vec(self)
341    }
342
343    /// Deserialize signature from JSON bytes
344    pub fn from_json_bytes(bytes: &[u8]) -> Result<Self, serde_json::Error> {
345        serde_json::from_slice(bytes)
346    }
347
348    /// Clear all arguments from the signature
349    ///
350    /// Returns None if the signature is immutable.
351    ///
352    /// # Example
353    /// ```
354    /// use celers_canvas::Signature;
355    ///
356    /// let sig = Signature::new("task".to_string())
357    ///     .with_args(vec![serde_json::json!(1), serde_json::json!(2)]);
358    ///
359    /// let cleared = sig.clear_args().unwrap();
360    /// assert!(cleared.args.is_empty());
361    /// ```
362    pub fn clear_args(mut self) -> Option<Self> {
363        if self.immutable {
364            return None;
365        }
366        self.args.clear();
367        Some(self)
368    }
369
370    /// Clear all keyword arguments from the signature
371    ///
372    /// # Example
373    /// ```
374    /// use celers_canvas::Signature;
375    /// use std::collections::HashMap;
376    ///
377    /// let mut kwargs = HashMap::new();
378    /// kwargs.insert("key".to_string(), serde_json::json!("value"));
379    ///
380    /// let sig = Signature::new("task".to_string()).with_kwargs(kwargs);
381    /// let cleared = sig.clear_kwargs();
382    /// assert!(cleared.kwargs.is_empty());
383    /// ```
384    pub fn clear_kwargs(mut self) -> Self {
385        self.kwargs.clear();
386        self
387    }
388
389    /// Remove a specific keyword argument
390    ///
391    /// # Example
392    /// ```
393    /// use celers_canvas::Signature;
394    ///
395    /// let sig = Signature::new("task".to_string())
396    ///     .add_kwarg("key1", serde_json::json!("value1"))
397    ///     .add_kwarg("key2", serde_json::json!("value2"));
398    ///
399    /// let modified = sig.remove_kwarg("key1");
400    /// assert!(!modified.has_kwarg("key1"));
401    /// assert!(modified.has_kwarg("key2"));
402    /// ```
403    pub fn remove_kwarg(mut self, key: &str) -> Self {
404        self.kwargs.remove(key);
405        self
406    }
407
408    /// Get the number of positional arguments
409    ///
410    /// # Example
411    /// ```
412    /// use celers_canvas::Signature;
413    ///
414    /// let sig = Signature::new("task".to_string())
415    ///     .with_args(vec![serde_json::json!(1), serde_json::json!(2)]);
416    ///
417    /// assert_eq!(sig.args_count(), 2);
418    /// ```
419    pub fn args_count(&self) -> usize {
420        self.args.len()
421    }
422
423    /// Get the number of keyword arguments
424    ///
425    /// # Example
426    /// ```
427    /// use celers_canvas::Signature;
428    ///
429    /// let sig = Signature::new("task".to_string())
430    ///     .add_kwarg("key1", serde_json::json!("value1"))
431    ///     .add_kwarg("key2", serde_json::json!("value2"));
432    ///
433    /// assert_eq!(sig.kwargs_count(), 2);
434    /// ```
435    pub fn kwargs_count(&self) -> usize {
436        self.kwargs.len()
437    }
438
439    /// Get all keyword argument keys
440    ///
441    /// # Example
442    /// ```
443    /// use celers_canvas::Signature;
444    ///
445    /// let sig = Signature::new("task".to_string())
446    ///     .add_kwarg("key1", serde_json::json!("value1"))
447    ///     .add_kwarg("key2", serde_json::json!("value2"));
448    ///
449    /// let keys = sig.kwarg_keys();
450    /// assert_eq!(keys.len(), 2);
451    /// assert!(keys.contains(&"key1"));
452    /// assert!(keys.contains(&"key2"));
453    /// ```
454    pub fn kwarg_keys(&self) -> Vec<&str> {
455        self.kwargs.keys().map(|k| k.as_str()).collect()
456    }
457
458    /// Check if signature has any retry configuration
459    ///
460    /// # Example
461    /// ```
462    /// use celers_canvas::Signature;
463    ///
464    /// let sig1 = Signature::new("task".to_string()).with_retries(3);
465    /// let sig2 = Signature::new("task".to_string());
466    ///
467    /// assert!(sig1.has_retry_config());
468    /// assert!(!sig2.has_retry_config());
469    /// ```
470    pub fn has_retry_config(&self) -> bool {
471        self.options.max_retries.is_some()
472            || self.options.retry_delay.is_some()
473            || self.options.retry_backoff.is_some()
474    }
475
476    /// Check if signature has any time limit configuration
477    ///
478    /// # Example
479    /// ```
480    /// use celers_canvas::Signature;
481    ///
482    /// let sig1 = Signature::new("task".to_string()).with_time_limit(60);
483    /// let sig2 = Signature::new("task".to_string());
484    ///
485    /// assert!(sig1.has_time_limit_config());
486    /// assert!(!sig2.has_time_limit_config());
487    /// ```
488    pub fn has_time_limit_config(&self) -> bool {
489        self.options.time_limit.is_some() || self.options.soft_time_limit.is_some()
490    }
491
492    /// Create a new signature with the same task name but no arguments
493    ///
494    /// # Example
495    /// ```
496    /// use celers_canvas::Signature;
497    ///
498    /// let sig = Signature::new("task".to_string())
499    ///     .with_args(vec![serde_json::json!(1)])
500    ///     .with_priority(5);
501    ///
502    /// let clean = sig.clone_without_args();
503    /// assert_eq!(clean.task, "task");
504    /// assert!(clean.args.is_empty());
505    /// assert_eq!(clean.options.priority, Some(5)); // Options preserved
506    /// ```
507    pub fn clone_without_args(&self) -> Self {
508        Self {
509            task: self.task.clone(),
510            args: Vec::new(),
511            kwargs: HashMap::new(),
512            options: self.options.clone(),
513            immutable: self.immutable,
514        }
515    }
516
517    /// Calculate the estimated serialized size in bytes
518    ///
519    /// This gives a rough estimate of how much space the signature will take when serialized.
520    ///
521    /// # Example
522    /// ```
523    /// use celers_canvas::Signature;
524    ///
525    /// let sig = Signature::new("task".to_string())
526    ///     .with_args(vec![serde_json::json!(1), serde_json::json!(2)]);
527    ///
528    /// let size = sig.estimated_size();
529    /// assert!(size > 0);
530    /// ```
531    pub fn estimated_size(&self) -> usize {
532        self.to_json().map(|s| s.len()).unwrap_or(0)
533    }
534}
535
536impl std::fmt::Display for Signature {
537    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
538        write!(f, "Signature[task={}]", self.task)?;
539        if !self.args.is_empty() {
540            write!(f, " args={}", self.args.len())?;
541        }
542        if !self.kwargs.is_empty() {
543            write!(f, " kwargs={}", self.kwargs.len())?;
544        }
545        if self.immutable {
546            write!(f, " (immutable)")?;
547        }
548        Ok(())
549    }
550}
551
552/// Callback argument passing mode
553#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, Default)]
554pub enum CallbackArgMode {
555    /// Pass result as first positional argument (default)
556    #[default]
557    Prepend,
558
559    /// Pass result as last positional argument
560    Append,
561
562    /// Pass result as a keyword argument with specified key
563    Kwarg,
564
565    /// Don't pass result to callback (callback uses its own args)
566    None,
567}
568
569impl CallbackArgMode {
570    /// Create a kwarg mode (result passed as "result" kwarg)
571    pub fn kwarg() -> Self {
572        Self::Kwarg
573    }
574
575    /// Check if this mode passes result to callback
576    pub fn passes_result(&self) -> bool {
577        !matches!(self, Self::None)
578    }
579}
580
581impl std::fmt::Display for CallbackArgMode {
582    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
583        match self {
584            Self::Prepend => write!(f, "prepend"),
585            Self::Append => write!(f, "append"),
586            Self::Kwarg => write!(f, "kwarg"),
587            Self::None => write!(f, "none"),
588        }
589    }
590}
591
592/// Helper for serde skip_serializing_if
593fn is_default_callback_arg_mode(mode: &CallbackArgMode) -> bool {
594    *mode == CallbackArgMode::Prepend
595}
596
597/// Task options
598#[derive(Debug, Clone, Serialize, Deserialize, Default, PartialEq)]
599pub struct TaskOptions {
600    /// Task priority (0-9)
601    pub priority: Option<u8>,
602
603    /// Queue name
604    pub queue: Option<String>,
605
606    /// Task ID (for tracking)
607    pub task_id: Option<Uuid>,
608
609    /// Link (callback on success) - single callback for backwards compat
610    pub link: Option<Box<Signature>>,
611
612    /// Link error (callback on failure) - single callback for backwards compat
613    pub link_error: Option<Box<Signature>>,
614
615    /// Multiple success callbacks (executed in order)
616    #[serde(default, skip_serializing_if = "Vec::is_empty")]
617    pub links: Vec<Signature>,
618
619    /// Multiple failure callbacks (executed in order)
620    #[serde(default, skip_serializing_if = "Vec::is_empty")]
621    pub link_errors: Vec<Signature>,
622
623    /// Callback on retry
624    #[serde(skip_serializing_if = "Option::is_none")]
625    pub on_retry: Option<Box<Signature>>,
626
627    /// How to pass result to success callbacks
628    #[serde(default, skip_serializing_if = "is_default_callback_arg_mode")]
629    pub callback_arg_mode: CallbackArgMode,
630
631    /// Key name when using CallbackArgMode::Kwarg
632    #[serde(skip_serializing_if = "Option::is_none")]
633    pub callback_kwarg_key: Option<String>,
634
635    /// Task expiration time in seconds
636    #[serde(skip_serializing_if = "Option::is_none")]
637    pub expires: Option<u64>,
638
639    /// Countdown (delay before execution) in seconds
640    #[serde(skip_serializing_if = "Option::is_none")]
641    pub countdown: Option<u64>,
642
643    /// Maximum number of retries
644    #[serde(skip_serializing_if = "Option::is_none")]
645    pub max_retries: Option<u32>,
646
647    /// Routing key for task distribution
648    #[serde(skip_serializing_if = "Option::is_none")]
649    pub routing_key: Option<String>,
650
651    /// Soft time limit in seconds (warning before kill)
652    #[serde(skip_serializing_if = "Option::is_none")]
653    pub soft_time_limit: Option<u64>,
654
655    /// Hard time limit in seconds (force kill)
656    #[serde(skip_serializing_if = "Option::is_none")]
657    pub time_limit: Option<u64>,
658
659    /// Retry delay in seconds
660    #[serde(skip_serializing_if = "Option::is_none")]
661    pub retry_delay: Option<u64>,
662
663    /// Retry backoff factor (exponential backoff multiplier)
664    #[serde(skip_serializing_if = "Option::is_none")]
665    pub retry_backoff: Option<f64>,
666
667    /// Maximum retry delay in seconds
668    #[serde(skip_serializing_if = "Option::is_none")]
669    pub retry_backoff_max: Option<u64>,
670
671    /// Whether to add jitter to retry delays
672    #[serde(skip_serializing_if = "Option::is_none")]
673    pub retry_jitter: Option<bool>,
674}
675
676impl TaskOptions {
677    /// Check if priority is set
678    pub fn has_priority(&self) -> bool {
679        self.priority.is_some()
680    }
681
682    /// Check if queue is set
683    pub fn has_queue(&self) -> bool {
684        self.queue.is_some()
685    }
686
687    /// Check if task ID is set
688    pub fn has_task_id(&self) -> bool {
689        self.task_id.is_some()
690    }
691
692    /// Check if any link (success callback) is set
693    pub fn has_link(&self) -> bool {
694        self.link.is_some() || !self.links.is_empty()
695    }
696
697    /// Check if any link_error (failure callback) is set
698    pub fn has_link_error(&self) -> bool {
699        self.link_error.is_some() || !self.link_errors.is_empty()
700    }
701
702    /// Check if on_retry callback is set
703    pub fn has_on_retry(&self) -> bool {
704        self.on_retry.is_some()
705    }
706
707    /// Check if expires is set
708    pub fn has_expires(&self) -> bool {
709        self.expires.is_some()
710    }
711
712    /// Check if countdown is set
713    pub fn has_countdown(&self) -> bool {
714        self.countdown.is_some()
715    }
716
717    /// Check if max_retries is set
718    pub fn has_max_retries(&self) -> bool {
719        self.max_retries.is_some()
720    }
721
722    /// Check if routing_key is set
723    pub fn has_routing_key(&self) -> bool {
724        self.routing_key.is_some()
725    }
726
727    /// Check if soft time limit is set
728    pub fn has_soft_time_limit(&self) -> bool {
729        self.soft_time_limit.is_some()
730    }
731
732    /// Check if hard time limit is set
733    pub fn has_time_limit(&self) -> bool {
734        self.time_limit.is_some()
735    }
736
737    /// Get all success callbacks (both single link and multiple links)
738    pub fn all_links(&self) -> Vec<&Signature> {
739        let mut result = Vec::new();
740        if let Some(ref link) = self.link {
741            result.push(link.as_ref());
742        }
743        for link in &self.links {
744            result.push(link);
745        }
746        result
747    }
748
749    /// Get all error callbacks (both single link_error and multiple link_errors)
750    pub fn all_link_errors(&self) -> Vec<&Signature> {
751        let mut result = Vec::new();
752        if let Some(ref link_error) = self.link_error {
753            result.push(link_error.as_ref());
754        }
755        for link_error in &self.link_errors {
756            result.push(link_error);
757        }
758        result
759    }
760
761    /// Calculate retry delay with backoff
762    pub fn calculate_retry_delay(&self, retry_count: u32) -> u64 {
763        let base_delay = self.retry_delay.unwrap_or(1);
764        let backoff = self.retry_backoff.unwrap_or(2.0);
765        let max_delay = self.retry_backoff_max.unwrap_or(3600);
766
767        let delay = (base_delay as f64 * backoff.powi(retry_count as i32)) as u64;
768        delay.min(max_delay)
769    }
770
771    /// Get the callback argument mode
772    pub fn callback_arg_mode(&self) -> CallbackArgMode {
773        self.callback_arg_mode
774    }
775
776    /// Get the callback kwarg key (defaults to "result")
777    pub fn callback_kwarg_key(&self) -> &str {
778        self.callback_kwarg_key.as_deref().unwrap_or("result")
779    }
780
781    /// Prepare a callback signature with result passed according to callback_arg_mode
782    ///
783    /// This modifies the callback signature to include the result value
784    /// according to the configured callback argument passing mode.
785    ///
786    /// # Arguments
787    /// * `callback` - The callback signature to prepare
788    /// * `result` - The result value to pass to the callback
789    ///
790    /// # Returns
791    /// A new signature with the result incorporated
792    pub fn prepare_callback(
793        &self,
794        mut callback: Signature,
795        result: serde_json::Value,
796    ) -> Signature {
797        // Don't modify immutable signatures
798        if callback.immutable {
799            return callback;
800        }
801
802        match self.callback_arg_mode {
803            CallbackArgMode::Prepend => {
804                callback.args.insert(0, result);
805            }
806            CallbackArgMode::Append => {
807                callback.args.push(result);
808            }
809            CallbackArgMode::Kwarg => {
810                let key = self.callback_kwarg_key().to_string();
811                callback.kwargs.insert(key, result);
812            }
813            CallbackArgMode::None => {
814                // Don't modify the callback
815            }
816        }
817
818        callback
819    }
820}
821
822impl std::fmt::Display for TaskOptions {
823    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
824        let mut parts = Vec::new();
825        if let Some(priority) = self.priority {
826            parts.push(format!("priority={}", priority));
827        }
828        if let Some(ref queue) = self.queue {
829            parts.push(format!("queue={}", queue));
830        }
831        if let Some(task_id) = self.task_id {
832            parts.push(format!("task_id={}", &task_id.to_string()[..8]));
833        }
834        if self.link.is_some() {
835            parts.push("link=yes".to_string());
836        }
837        if self.link_error.is_some() {
838            parts.push("link_error=yes".to_string());
839        }
840        if let Some(expires) = self.expires {
841            parts.push(format!("expires={}s", expires));
842        }
843        if let Some(countdown) = self.countdown {
844            parts.push(format!("countdown={}s", countdown));
845        }
846        if let Some(max_retries) = self.max_retries {
847            parts.push(format!("retries={}", max_retries));
848        }
849        if let Some(ref routing_key) = self.routing_key {
850            parts.push(format!("routing={}", routing_key));
851        }
852        if parts.is_empty() {
853            write!(f, "TaskOptions[default]")
854        } else {
855            write!(f, "TaskOptions[{}]", parts.join(", "))
856        }
857    }
858}