1use crate::{Branch, CanvasError, Chain, Group, Map, Signature, Switch};
2use celers_core::Broker;
3use serde::{Deserialize, Serialize};
4use uuid::Uuid;
5
6#[derive(Debug, Clone, Serialize, Deserialize)]
8#[serde(tag = "element_type")]
9pub enum CanvasElement {
10 Signature(Signature),
12
13 Chain(Chain),
15
16 Group(Group),
18
19 Chord {
21 header: Group,
23 body: Signature,
25 },
26
27 Map {
29 task: Signature,
31 argsets: Vec<Vec<serde_json::Value>>,
33 },
34
35 Branch(Branch),
37
38 Switch(Switch),
40}
41
42impl CanvasElement {
43 pub fn signature(sig: Signature) -> Self {
45 Self::Signature(sig)
46 }
47
48 pub fn task(name: impl Into<String>, args: Vec<serde_json::Value>) -> Self {
50 Self::Signature(Signature::new(name.into()).with_args(args))
51 }
52
53 pub fn chain(chain: Chain) -> Self {
55 Self::Chain(chain)
56 }
57
58 pub fn group(group: Group) -> Self {
60 Self::Group(group)
61 }
62
63 pub fn chord(header: Group, body: Signature) -> Self {
65 Self::Chord { header, body }
66 }
67
68 pub fn map(task: Signature, argsets: Vec<Vec<serde_json::Value>>) -> Self {
70 Self::Map { task, argsets }
71 }
72
73 pub fn branch(branch: Branch) -> Self {
75 Self::Branch(branch)
76 }
77
78 pub fn switch(switch: Switch) -> Self {
80 Self::Switch(switch)
81 }
82
83 pub fn is_signature(&self) -> bool {
85 matches!(self, Self::Signature(_))
86 }
87
88 pub fn is_chain(&self) -> bool {
90 matches!(self, Self::Chain(_))
91 }
92
93 pub fn is_group(&self) -> bool {
95 matches!(self, Self::Group(_))
96 }
97
98 pub fn is_chord(&self) -> bool {
100 matches!(self, Self::Chord { .. })
101 }
102
103 pub fn element_type(&self) -> &'static str {
105 match self {
106 Self::Signature(_) => "signature",
107 Self::Chain(_) => "chain",
108 Self::Group(_) => "group",
109 Self::Chord { .. } => "chord",
110 Self::Map { .. } => "map",
111 Self::Branch(_) => "branch",
112 Self::Switch(_) => "switch",
113 }
114 }
115}
116
117impl std::fmt::Display for CanvasElement {
118 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
119 match self {
120 Self::Signature(sig) => write!(f, "Signature[{}]", sig.task),
121 Self::Chain(chain) => write!(f, "{}", chain),
122 Self::Group(group) => write!(f, "{}", group),
123 Self::Chord { header, body } => {
124 write!(f, "Chord[header={}, body={}]", header, body.task)
125 }
126 Self::Map { task, argsets } => {
127 write!(f, "Map[task={}, {} argsets]", task.task, argsets.len())
128 }
129 Self::Branch(branch) => write!(f, "{}", branch),
130 Self::Switch(switch) => write!(f, "{}", switch),
131 }
132 }
133}
134
135impl From<Signature> for CanvasElement {
136 fn from(sig: Signature) -> Self {
137 Self::Signature(sig)
138 }
139}
140
141impl From<Chain> for CanvasElement {
142 fn from(chain: Chain) -> Self {
143 Self::Chain(chain)
144 }
145}
146
147impl From<Group> for CanvasElement {
148 fn from(group: Group) -> Self {
149 Self::Group(group)
150 }
151}
152
153impl From<Branch> for CanvasElement {
154 fn from(branch: Branch) -> Self {
155 Self::Branch(branch)
156 }
157}
158
159impl From<Switch> for CanvasElement {
160 fn from(switch: Switch) -> Self {
161 Self::Switch(switch)
162 }
163}
164
165#[derive(Debug, Clone, Serialize, Deserialize)]
184pub struct NestedChain {
185 pub elements: Vec<CanvasElement>,
187}
188
189impl NestedChain {
190 pub fn new() -> Self {
192 Self {
193 elements: Vec::new(),
194 }
195 }
196
197 pub fn then_element(mut self, element: CanvasElement) -> Self {
199 self.elements.push(element);
200 self
201 }
202
203 pub fn then_signature(mut self, sig: Signature) -> Self {
205 self.elements.push(CanvasElement::Signature(sig));
206 self
207 }
208
209 pub fn then(mut self, task: &str, args: Vec<serde_json::Value>) -> Self {
211 self.elements.push(CanvasElement::task(task, args));
212 self
213 }
214
215 pub fn then_group(mut self, group: Group) -> Self {
217 self.elements.push(CanvasElement::Group(group));
218 self
219 }
220
221 pub fn then_chord(mut self, header: Group, body: Signature) -> Self {
223 self.elements.push(CanvasElement::Chord { header, body });
224 self
225 }
226
227 pub fn then_branch(mut self, branch: Branch) -> Self {
229 self.elements.push(CanvasElement::Branch(branch));
230 self
231 }
232
233 pub fn then_chain(mut self, chain: Chain) -> Self {
235 self.elements.push(CanvasElement::Chain(chain));
236 self
237 }
238
239 pub fn is_empty(&self) -> bool {
241 self.elements.is_empty()
242 }
243
244 pub fn len(&self) -> usize {
246 self.elements.len()
247 }
248
249 pub fn flatten_signatures(&self) -> Option<Vec<Signature>> {
255 let mut result = Vec::new();
256
257 for element in &self.elements {
258 match element {
259 CanvasElement::Signature(sig) => result.push(sig.clone()),
260 CanvasElement::Chain(chain) => {
261 result.extend(chain.tasks.clone());
262 }
263 _ => return None, }
265 }
266
267 Some(result)
268 }
269
270 pub async fn apply<B: Broker>(&self, broker: &B) -> Result<Uuid, CanvasError> {
277 if self.elements.is_empty() {
278 return Err(CanvasError::Invalid(
279 "NestedChain cannot be empty".to_string(),
280 ));
281 }
282
283 let mut last_id = None;
285 for element in &self.elements {
286 match element {
287 CanvasElement::Signature(sig) => {
288 let chain = Chain {
290 tasks: vec![sig.clone()],
291 };
292 last_id = Some(chain.apply(broker).await?);
293 }
294 CanvasElement::Chain(chain) => {
295 last_id = Some(chain.clone().apply(broker).await?);
296 }
297 CanvasElement::Group(group) => {
298 last_id = Some(group.clone().apply(broker).await?);
299 }
300 CanvasElement::Chord { header, body } => {
301 #[cfg(feature = "backend-redis")]
302 {
303 last_id = Some(header.clone().apply(broker).await?);
306 let _ = body; }
309 #[cfg(not(feature = "backend-redis"))]
310 {
311 last_id = Some(header.clone().apply(broker).await?);
312 let _ = body; }
314 }
315 CanvasElement::Map { task, argsets } => {
316 let map = Map::new(task.clone(), argsets.clone());
317 last_id = Some(map.apply(broker).await?);
318 }
319 CanvasElement::Branch(_branch) => {
320 return Err(CanvasError::Invalid(
322 "Branch elements not supported in NestedChain.apply()".to_string(),
323 ));
324 }
325 CanvasElement::Switch(_switch) => {
326 return Err(CanvasError::Invalid(
328 "Switch elements not supported in NestedChain.apply()".to_string(),
329 ));
330 }
331 }
332 }
333
334 last_id.ok_or_else(|| CanvasError::Invalid("No elements executed".to_string()))
335 }
336}
337
338impl Default for NestedChain {
339 fn default() -> Self {
340 Self::new()
341 }
342}
343
344impl std::fmt::Display for NestedChain {
345 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
346 let element_strs: Vec<String> = self.elements.iter().map(|e| format!("{}", e)).collect();
347 write!(f, "NestedChain[{}]", element_strs.join(" -> "))
348 }
349}
350
351#[derive(Debug, Clone, Serialize, Deserialize)]
369pub struct NestedGroup {
370 pub elements: Vec<CanvasElement>,
372}
373
374impl NestedGroup {
375 pub fn new() -> Self {
377 Self {
378 elements: Vec::new(),
379 }
380 }
381
382 pub fn add_element(mut self, element: CanvasElement) -> Self {
384 self.elements.push(element);
385 self
386 }
387
388 pub fn add_signature(mut self, sig: Signature) -> Self {
390 self.elements.push(CanvasElement::Signature(sig));
391 self
392 }
393
394 pub fn add(mut self, task: &str, args: Vec<serde_json::Value>) -> Self {
396 self.elements.push(CanvasElement::task(task, args));
397 self
398 }
399
400 pub fn add_chain(mut self, chain: Chain) -> Self {
402 self.elements.push(CanvasElement::Chain(chain));
403 self
404 }
405
406 pub fn is_empty(&self) -> bool {
408 self.elements.is_empty()
409 }
410
411 pub fn len(&self) -> usize {
413 self.elements.len()
414 }
415
416 pub fn flatten_signatures(&self) -> Option<Vec<Signature>> {
418 let mut result = Vec::new();
419
420 for element in &self.elements {
421 match element {
422 CanvasElement::Signature(sig) => result.push(sig.clone()),
423 _ => return None,
424 }
425 }
426
427 Some(result)
428 }
429
430 pub async fn apply<B: Broker>(&self, broker: &B) -> Result<Uuid, CanvasError> {
435 if self.elements.is_empty() {
436 return Err(CanvasError::Invalid(
437 "NestedGroup cannot be empty".to_string(),
438 ));
439 }
440
441 let group_id = Uuid::new_v4();
443
444 for element in &self.elements {
446 match element {
447 CanvasElement::Signature(sig) => {
448 let chain = Chain {
449 tasks: vec![sig.clone()],
450 };
451 chain.apply(broker).await?;
452 }
453 CanvasElement::Chain(chain) => {
454 chain.clone().apply(broker).await?;
455 }
456 CanvasElement::Group(group) => {
457 group.clone().apply(broker).await?;
458 }
459 CanvasElement::Chord { header, body } => {
460 #[cfg(feature = "backend-redis")]
461 {
462 header.clone().apply(broker).await?;
465 let _ = body; }
467 #[cfg(not(feature = "backend-redis"))]
468 {
469 header.clone().apply(broker).await?;
470 let _ = body; }
472 }
473 CanvasElement::Map { task, argsets } => {
474 let map = Map::new(task.clone(), argsets.clone());
475 map.apply(broker).await?;
476 }
477 CanvasElement::Branch(_branch) => {
478 return Err(CanvasError::Invalid(
480 "Branch elements not supported in NestedGroup.apply()".to_string(),
481 ));
482 }
483 CanvasElement::Switch(_switch) => {
484 return Err(CanvasError::Invalid(
486 "Switch elements not supported in NestedGroup.apply()".to_string(),
487 ));
488 }
489 }
490 }
491
492 Ok(group_id)
493 }
494}
495
496impl Default for NestedGroup {
497 fn default() -> Self {
498 Self::new()
499 }
500}
501
502impl std::fmt::Display for NestedGroup {
503 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
504 let element_strs: Vec<String> = self.elements.iter().map(|e| format!("{}", e)).collect();
505 write!(f, "NestedGroup[{}]", element_strs.join(" | "))
506 }
507}