Skip to main content

celers_canvas/
canvas_element.rs

1use crate::{Branch, CanvasError, Chain, Group, Map, Signature, Switch};
2use celers_core::Broker;
3use serde::{Deserialize, Serialize};
4use uuid::Uuid;
5
6/// A canvas element that can be either a simple signature or a nested workflow
7#[derive(Debug, Clone, Serialize, Deserialize)]
8#[serde(tag = "element_type")]
9pub enum CanvasElement {
10    /// A simple task signature
11    Signature(Signature),
12
13    /// A chain of tasks
14    Chain(Chain),
15
16    /// A group of parallel tasks
17    Group(Group),
18
19    /// A chord (group + callback)
20    Chord {
21        /// Header group
22        header: Group,
23        /// Callback signature
24        body: Signature,
25    },
26
27    /// A map operation
28    Map {
29        /// Task to apply
30        task: Signature,
31        /// Argument sets
32        argsets: Vec<Vec<serde_json::Value>>,
33    },
34
35    /// A conditional branch
36    Branch(Branch),
37
38    /// A switch statement
39    Switch(Switch),
40}
41
42impl CanvasElement {
43    /// Create a signature element
44    pub fn signature(sig: Signature) -> Self {
45        Self::Signature(sig)
46    }
47
48    /// Create a task element (shorthand for signature)
49    pub fn task(name: impl Into<String>, args: Vec<serde_json::Value>) -> Self {
50        Self::Signature(Signature::new(name.into()).with_args(args))
51    }
52
53    /// Create a chain element
54    pub fn chain(chain: Chain) -> Self {
55        Self::Chain(chain)
56    }
57
58    /// Create a group element
59    pub fn group(group: Group) -> Self {
60        Self::Group(group)
61    }
62
63    /// Create a chord element
64    pub fn chord(header: Group, body: Signature) -> Self {
65        Self::Chord { header, body }
66    }
67
68    /// Create a map element
69    pub fn map(task: Signature, argsets: Vec<Vec<serde_json::Value>>) -> Self {
70        Self::Map { task, argsets }
71    }
72
73    /// Create a branch element
74    pub fn branch(branch: Branch) -> Self {
75        Self::Branch(branch)
76    }
77
78    /// Create a switch element
79    pub fn switch(switch: Switch) -> Self {
80        Self::Switch(switch)
81    }
82
83    /// Check if this is a simple signature
84    pub fn is_signature(&self) -> bool {
85        matches!(self, Self::Signature(_))
86    }
87
88    /// Check if this is a chain
89    pub fn is_chain(&self) -> bool {
90        matches!(self, Self::Chain(_))
91    }
92
93    /// Check if this is a group
94    pub fn is_group(&self) -> bool {
95        matches!(self, Self::Group(_))
96    }
97
98    /// Check if this is a chord
99    pub fn is_chord(&self) -> bool {
100        matches!(self, Self::Chord { .. })
101    }
102
103    /// Get the element type as a string
104    pub fn element_type(&self) -> &'static str {
105        match self {
106            Self::Signature(_) => "signature",
107            Self::Chain(_) => "chain",
108            Self::Group(_) => "group",
109            Self::Chord { .. } => "chord",
110            Self::Map { .. } => "map",
111            Self::Branch(_) => "branch",
112            Self::Switch(_) => "switch",
113        }
114    }
115}
116
117impl std::fmt::Display for CanvasElement {
118    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
119        match self {
120            Self::Signature(sig) => write!(f, "Signature[{}]", sig.task),
121            Self::Chain(chain) => write!(f, "{}", chain),
122            Self::Group(group) => write!(f, "{}", group),
123            Self::Chord { header, body } => {
124                write!(f, "Chord[header={}, body={}]", header, body.task)
125            }
126            Self::Map { task, argsets } => {
127                write!(f, "Map[task={}, {} argsets]", task.task, argsets.len())
128            }
129            Self::Branch(branch) => write!(f, "{}", branch),
130            Self::Switch(switch) => write!(f, "{}", switch),
131        }
132    }
133}
134
135impl From<Signature> for CanvasElement {
136    fn from(sig: Signature) -> Self {
137        Self::Signature(sig)
138    }
139}
140
141impl From<Chain> for CanvasElement {
142    fn from(chain: Chain) -> Self {
143        Self::Chain(chain)
144    }
145}
146
147impl From<Group> for CanvasElement {
148    fn from(group: Group) -> Self {
149        Self::Group(group)
150    }
151}
152
153impl From<Branch> for CanvasElement {
154    fn from(branch: Branch) -> Self {
155        Self::Branch(branch)
156    }
157}
158
159impl From<Switch> for CanvasElement {
160    fn from(switch: Switch) -> Self {
161        Self::Switch(switch)
162    }
163}
164
165/// A nested chain that can contain any canvas element
166///
167/// Unlike the basic Chain that only contains Signatures, NestedChain
168/// can contain Groups, Chords, or other Chains as steps.
169///
170/// # Example
171/// ```
172/// use celers_canvas::{NestedChain, CanvasElement, Group, Signature};
173///
174/// let workflow = NestedChain::new()
175///     .then_element(CanvasElement::task("step1".to_string(), vec![]))
176///     .then_element(CanvasElement::group(
177///         Group::new()
178///             .add("parallel_a", vec![])
179///             .add("parallel_b", vec![])
180///     ))
181///     .then_element(CanvasElement::task("step2".to_string(), vec![]));
182/// ```
183#[derive(Debug, Clone, Serialize, Deserialize)]
184pub struct NestedChain {
185    /// Elements in the chain
186    pub elements: Vec<CanvasElement>,
187}
188
189impl NestedChain {
190    /// Create a new empty nested chain
191    pub fn new() -> Self {
192        Self {
193            elements: Vec::new(),
194        }
195    }
196
197    /// Add an element to the chain
198    pub fn then_element(mut self, element: CanvasElement) -> Self {
199        self.elements.push(element);
200        self
201    }
202
203    /// Add a signature to the chain
204    pub fn then_signature(mut self, sig: Signature) -> Self {
205        self.elements.push(CanvasElement::Signature(sig));
206        self
207    }
208
209    /// Add a simple task to the chain
210    pub fn then(mut self, task: &str, args: Vec<serde_json::Value>) -> Self {
211        self.elements.push(CanvasElement::task(task, args));
212        self
213    }
214
215    /// Add a group to the chain (parallel execution point)
216    pub fn then_group(mut self, group: Group) -> Self {
217        self.elements.push(CanvasElement::Group(group));
218        self
219    }
220
221    /// Add a chord to the chain
222    pub fn then_chord(mut self, header: Group, body: Signature) -> Self {
223        self.elements.push(CanvasElement::Chord { header, body });
224        self
225    }
226
227    /// Add a branch to the chain
228    pub fn then_branch(mut self, branch: Branch) -> Self {
229        self.elements.push(CanvasElement::Branch(branch));
230        self
231    }
232
233    /// Add another chain as a nested element
234    pub fn then_chain(mut self, chain: Chain) -> Self {
235        self.elements.push(CanvasElement::Chain(chain));
236        self
237    }
238
239    /// Check if the chain is empty
240    pub fn is_empty(&self) -> bool {
241        self.elements.is_empty()
242    }
243
244    /// Get the number of elements
245    pub fn len(&self) -> usize {
246        self.elements.len()
247    }
248
249    /// Flatten the nested chain into a sequence of signatures where possible
250    ///
251    /// This is useful for simpler execution when nested workflows aren't needed.
252    /// Note: This will return None if the chain contains elements that can't be
253    /// flattened to signatures (groups, chords, etc.)
254    pub fn flatten_signatures(&self) -> Option<Vec<Signature>> {
255        let mut result = Vec::new();
256
257        for element in &self.elements {
258            match element {
259                CanvasElement::Signature(sig) => result.push(sig.clone()),
260                CanvasElement::Chain(chain) => {
261                    result.extend(chain.tasks.clone());
262                }
263                _ => return None, // Can't flatten non-signature elements
264            }
265        }
266
267        Some(result)
268    }
269
270    /// Execute the nested chain sequentially
271    ///
272    /// Each element is executed in order. For complex elements (Groups, Chords),
273    /// they are executed and we wait for them to start before continuing.
274    /// Note: This executes elements sequentially but doesn't wait for completion,
275    /// following Celery's async execution model.
276    pub async fn apply<B: Broker>(&self, broker: &B) -> Result<Uuid, CanvasError> {
277        if self.elements.is_empty() {
278            return Err(CanvasError::Invalid(
279                "NestedChain cannot be empty".to_string(),
280            ));
281        }
282
283        // Execute each element in sequence
284        let mut last_id = None;
285        for element in &self.elements {
286            match element {
287                CanvasElement::Signature(sig) => {
288                    // Convert to Chain for sequential execution
289                    let chain = Chain {
290                        tasks: vec![sig.clone()],
291                    };
292                    last_id = Some(chain.apply(broker).await?);
293                }
294                CanvasElement::Chain(chain) => {
295                    last_id = Some(chain.clone().apply(broker).await?);
296                }
297                CanvasElement::Group(group) => {
298                    last_id = Some(group.clone().apply(broker).await?);
299                }
300                CanvasElement::Chord { header, body } => {
301                    #[cfg(feature = "backend-redis")]
302                    {
303                        // Note: Chord requires backend, but we can't pass it here
304                        // For now, fall back to just executing the group
305                        last_id = Some(header.clone().apply(broker).await?);
306                        // Callback would need to be manually triggered
307                        let _ = body; // Silence unused warning
308                    }
309                    #[cfg(not(feature = "backend-redis"))]
310                    {
311                        last_id = Some(header.clone().apply(broker).await?);
312                        let _ = body; // Silence unused warning
313                    }
314                }
315                CanvasElement::Map { task, argsets } => {
316                    let map = Map::new(task.clone(), argsets.clone());
317                    last_id = Some(map.apply(broker).await?);
318                }
319                CanvasElement::Branch(_branch) => {
320                    // Branches require runtime evaluation, skip for now
321                    return Err(CanvasError::Invalid(
322                        "Branch elements not supported in NestedChain.apply()".to_string(),
323                    ));
324                }
325                CanvasElement::Switch(_switch) => {
326                    // Switch requires runtime evaluation, skip for now
327                    return Err(CanvasError::Invalid(
328                        "Switch elements not supported in NestedChain.apply()".to_string(),
329                    ));
330                }
331            }
332        }
333
334        last_id.ok_or_else(|| CanvasError::Invalid("No elements executed".to_string()))
335    }
336}
337
338impl Default for NestedChain {
339    fn default() -> Self {
340        Self::new()
341    }
342}
343
344impl std::fmt::Display for NestedChain {
345    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
346        let element_strs: Vec<String> = self.elements.iter().map(|e| format!("{}", e)).collect();
347        write!(f, "NestedChain[{}]", element_strs.join(" -> "))
348    }
349}
350
351/// A nested group that can contain any canvas element
352///
353/// Unlike the basic Group that only contains Signatures, NestedGroup
354/// can contain Chains, other Groups, or Chords as parallel tasks.
355///
356/// # Example
357/// ```
358/// use celers_canvas::{NestedGroup, CanvasElement, Chain, Signature};
359///
360/// let workflow = NestedGroup::new()
361///     .add_element(CanvasElement::chain(
362///         Chain::new().then("step1", vec![]).then("step2", vec![])
363///     ))
364///     .add_element(CanvasElement::chain(
365///         Chain::new().then("step3", vec![]).then("step4", vec![])
366///     ));
367/// ```
368#[derive(Debug, Clone, Serialize, Deserialize)]
369pub struct NestedGroup {
370    /// Elements in the group (executed in parallel)
371    pub elements: Vec<CanvasElement>,
372}
373
374impl NestedGroup {
375    /// Create a new empty nested group
376    pub fn new() -> Self {
377        Self {
378            elements: Vec::new(),
379        }
380    }
381
382    /// Add an element to the group
383    pub fn add_element(mut self, element: CanvasElement) -> Self {
384        self.elements.push(element);
385        self
386    }
387
388    /// Add a signature to the group
389    pub fn add_signature(mut self, sig: Signature) -> Self {
390        self.elements.push(CanvasElement::Signature(sig));
391        self
392    }
393
394    /// Add a simple task to the group
395    pub fn add(mut self, task: &str, args: Vec<serde_json::Value>) -> Self {
396        self.elements.push(CanvasElement::task(task, args));
397        self
398    }
399
400    /// Add a chain to the group
401    pub fn add_chain(mut self, chain: Chain) -> Self {
402        self.elements.push(CanvasElement::Chain(chain));
403        self
404    }
405
406    /// Check if the group is empty
407    pub fn is_empty(&self) -> bool {
408        self.elements.is_empty()
409    }
410
411    /// Get the number of elements
412    pub fn len(&self) -> usize {
413        self.elements.len()
414    }
415
416    /// Flatten to signatures if possible
417    pub fn flatten_signatures(&self) -> Option<Vec<Signature>> {
418        let mut result = Vec::new();
419
420        for element in &self.elements {
421            match element {
422                CanvasElement::Signature(sig) => result.push(sig.clone()),
423                _ => return None,
424            }
425        }
426
427        Some(result)
428    }
429
430    /// Execute all elements in parallel
431    ///
432    /// All elements in the group are started concurrently.
433    /// Returns a group ID that can be used to track the parallel execution.
434    pub async fn apply<B: Broker>(&self, broker: &B) -> Result<Uuid, CanvasError> {
435        if self.elements.is_empty() {
436            return Err(CanvasError::Invalid(
437                "NestedGroup cannot be empty".to_string(),
438            ));
439        }
440
441        // Generate a group ID for tracking
442        let group_id = Uuid::new_v4();
443
444        // Execute all elements in parallel
445        for element in &self.elements {
446            match element {
447                CanvasElement::Signature(sig) => {
448                    let chain = Chain {
449                        tasks: vec![sig.clone()],
450                    };
451                    chain.apply(broker).await?;
452                }
453                CanvasElement::Chain(chain) => {
454                    chain.clone().apply(broker).await?;
455                }
456                CanvasElement::Group(group) => {
457                    group.clone().apply(broker).await?;
458                }
459                CanvasElement::Chord { header, body } => {
460                    #[cfg(feature = "backend-redis")]
461                    {
462                        // Note: Chord requires backend, but we can't pass it here
463                        // For now, fall back to just executing the group
464                        header.clone().apply(broker).await?;
465                        let _ = body; // Silence unused warning
466                    }
467                    #[cfg(not(feature = "backend-redis"))]
468                    {
469                        header.clone().apply(broker).await?;
470                        let _ = body; // Silence unused warning
471                    }
472                }
473                CanvasElement::Map { task, argsets } => {
474                    let map = Map::new(task.clone(), argsets.clone());
475                    map.apply(broker).await?;
476                }
477                CanvasElement::Branch(_branch) => {
478                    // Branches require runtime evaluation, skip for now
479                    return Err(CanvasError::Invalid(
480                        "Branch elements not supported in NestedGroup.apply()".to_string(),
481                    ));
482                }
483                CanvasElement::Switch(_switch) => {
484                    // Switch requires runtime evaluation, skip for now
485                    return Err(CanvasError::Invalid(
486                        "Switch elements not supported in NestedGroup.apply()".to_string(),
487                    ));
488                }
489            }
490        }
491
492        Ok(group_id)
493    }
494}
495
496impl Default for NestedGroup {
497    fn default() -> Self {
498        Self::new()
499    }
500}
501
502impl std::fmt::Display for NestedGroup {
503    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
504        let element_strs: Vec<String> = self.elements.iter().map(|e| format!("{}", e)).collect();
505        write!(f, "NestedGroup[{}]", element_strs.join(" | "))
506    }
507}