Skip to main content

kojin_core/
signature.rs

1use chrono::{DateTime, Utc};
2use serde::{Deserialize, Serialize};
3use std::collections::HashMap;
4
5use crate::message::TaskMessage;
6
7/// A type-erased task invocation descriptor.
8///
9/// Signatures describe *what* to run without running it immediately.
10/// They can be composed into workflows via [`Canvas`](crate::canvas::Canvas).
11#[derive(Debug, Clone, Serialize, Deserialize)]
12pub struct Signature {
13    /// Registered task name.
14    pub task_name: String,
15    /// Target queue.
16    pub queue: String,
17    /// Serialized task payload.
18    pub payload: serde_json::Value,
19    /// Maximum retries.
20    pub max_retries: u32,
21    /// Optional ETA.
22    pub eta: Option<DateTime<Utc>>,
23    /// Arbitrary headers.
24    pub headers: HashMap<String, String>,
25}
26
27impl Signature {
28    /// Create a new signature.
29    pub fn new(
30        task_name: impl Into<String>,
31        queue: impl Into<String>,
32        payload: serde_json::Value,
33    ) -> Self {
34        Self {
35            task_name: task_name.into(),
36            queue: queue.into(),
37            payload,
38            max_retries: 3,
39            eta: None,
40            headers: HashMap::new(),
41        }
42    }
43
44    /// Set max retries.
45    pub fn with_max_retries(mut self, max_retries: u32) -> Self {
46        self.max_retries = max_retries;
47        self
48    }
49
50    /// Set ETA.
51    pub fn with_eta(mut self, eta: DateTime<Utc>) -> Self {
52        self.eta = Some(eta);
53        self
54    }
55
56    /// Add a header.
57    pub fn with_header(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
58        self.headers.insert(key.into(), value.into());
59        self
60    }
61
62    /// Convert this signature into a [`TaskMessage`] ready for enqueuing.
63    pub fn into_message(self) -> TaskMessage {
64        let mut msg = TaskMessage::new(self.task_name, self.queue, self.payload)
65            .with_max_retries(self.max_retries);
66        msg.eta = self.eta;
67        msg.headers = self.headers;
68        msg
69    }
70}
71
72impl std::ops::BitOr for Signature {
73    type Output = crate::canvas::Canvas;
74
75    fn bitor(self, rhs: Self) -> Self::Output {
76        crate::canvas::Canvas::Chain(vec![
77            crate::canvas::Canvas::Single(self),
78            crate::canvas::Canvas::Single(rhs),
79        ])
80    }
81}
82
83impl From<Signature> for crate::canvas::Canvas {
84    fn from(sig: Signature) -> Self {
85        crate::canvas::Canvas::Single(sig)
86    }
87}
88
89#[cfg(test)]
90mod tests {
91    use super::*;
92
93    #[test]
94    fn signature_to_message() {
95        let sig = Signature::new("add", "math", serde_json::json!({"a": 1, "b": 2}))
96            .with_max_retries(5)
97            .with_header("trace", "t1");
98
99        let msg = sig.into_message();
100        assert_eq!(msg.task_name, "add");
101        assert_eq!(msg.queue, "math");
102        assert_eq!(msg.max_retries, 5);
103        assert_eq!(msg.headers.get("trace"), Some(&"t1".to_string()));
104    }
105
106    #[test]
107    fn pipe_operator_creates_chain() {
108        let s1 = Signature::new("a", "q", serde_json::json!({}));
109        let s2 = Signature::new("b", "q", serde_json::json!({}));
110        let canvas = s1 | s2;
111        assert!(matches!(canvas, crate::canvas::Canvas::Chain(_)));
112    }
113}