Skip to main content

celers_canvas/
chord.rs

1use crate::{CanvasError, Group, Signature};
2use celers_core::Broker;
3#[cfg(feature = "backend-redis")]
4use celers_core::SerializedTask;
5use serde::{Deserialize, Serialize};
6use uuid::Uuid;
7
8#[cfg(feature = "backend-redis")]
9use celers_backend_redis::{ChordState, ResultBackend};
10
11#[cfg(feature = "backend-redis")]
12use chrono::Utc;
13
14/// Chord: Parallel execution with callback
15///
16/// (task1 | task2 | task3) -> callback([result1, result2, result3])
17#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
18pub struct Chord {
19    /// Header (parallel tasks)
20    pub header: Group,
21
22    /// Body (callback task)
23    pub body: Signature,
24}
25
26impl Chord {
27    pub fn new(header: Group, body: Signature) -> Self {
28        Self { header, body }
29    }
30
31    /// Apply the chord by initializing state and enqueuing header tasks
32    #[cfg(feature = "backend-redis")]
33    pub async fn apply<B: Broker, R: ResultBackend>(
34        mut self,
35        broker: &B,
36        backend: &mut R,
37    ) -> Result<Uuid, CanvasError> {
38        if self.header.tasks.is_empty() {
39            return Err(CanvasError::Invalid(
40                "Chord header cannot be empty".to_string(),
41            ));
42        }
43
44        let chord_id = Uuid::new_v4();
45        let total = self.header.tasks.len();
46
47        // Initialize chord state in backend
48        let chord_state = ChordState {
49            chord_id,
50            total,
51            completed: 0,
52            callback: Some(self.body.task.clone()),
53            task_ids: Vec::new(),
54            created_at: Utc::now(),
55            timeout: None,
56            cancelled: false,
57            cancellation_reason: None,
58            retry_count: 0,
59            max_retries: None,
60        };
61
62        backend
63            .chord_init(chord_state)
64            .await
65            .map_err(|e| CanvasError::Broker(format!("Failed to initialize chord: {}", e)))?;
66
67        // Enqueue all header tasks with chord_id
68        for sig in &mut self.header.tasks {
69            let args_json = serde_json::json!({
70                "args": sig.args,
71                "kwargs": sig.kwargs
72            });
73            let args_bytes = serde_json::to_vec(&args_json)
74                .map_err(|e| CanvasError::Serialization(e.to_string()))?;
75
76            let mut task = SerializedTask::new(sig.task.clone(), args_bytes);
77
78            if let Some(priority) = sig.options.priority {
79                task = task.with_priority(priority.into());
80            }
81
82            // Set chord_id so worker knows to update chord state on completion
83            task.metadata.chord_id = Some(chord_id);
84
85            broker
86                .enqueue(task)
87                .await
88                .map_err(|e| CanvasError::Broker(e.to_string()))?;
89        }
90
91        Ok(chord_id)
92    }
93
94    /// Apply the chord without a result backend (simpler version)
95    #[cfg(not(feature = "backend-redis"))]
96    pub async fn apply<B: Broker>(self, broker: &B) -> Result<Uuid, CanvasError> {
97        // Without backend, we can only enqueue header tasks
98        // Manual coordination required
99        self.header.apply(broker).await
100    }
101}
102
103impl std::fmt::Display for Chord {
104    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
105        write!(
106            f,
107            "Chord[{} tasks] -> callback({})",
108            self.header.tasks.len(),
109            self.body.task
110        )
111    }
112}
113
114/// Map: Apply task to multiple arguments
115///
116/// map(task, [args1, args2, args3]) -> [result1, result2, result3]
117#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
118pub struct Map {
119    /// Task to apply
120    pub task: Signature,
121
122    /// List of argument sets
123    pub argsets: Vec<Vec<serde_json::Value>>,
124}
125
126impl Map {
127    pub fn new(task: Signature, argsets: Vec<Vec<serde_json::Value>>) -> Self {
128        Self { task, argsets }
129    }
130
131    /// Apply the map by creating a group of tasks with different arguments
132    pub async fn apply<B: Broker>(self, broker: &B) -> Result<Uuid, CanvasError> {
133        let mut group = Group::new();
134
135        for args in self.argsets {
136            let mut sig = self.task.clone();
137            sig.args = args;
138            group = group.add_signature(sig);
139        }
140
141        group.apply(broker).await
142    }
143
144    /// Check if map is empty
145    pub fn is_empty(&self) -> bool {
146        self.argsets.is_empty()
147    }
148
149    /// Get number of argument sets (and thus tasks)
150    pub fn len(&self) -> usize {
151        self.argsets.len()
152    }
153}
154
155impl std::fmt::Display for Map {
156    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
157        write!(
158            f,
159            "Map[task={}, {} argsets]",
160            self.task.task,
161            self.argsets.len()
162        )
163    }
164}
165
166/// Starmap: Like map but unpacks arguments
167///
168/// starmap(task, [(a1, b1), (a2, b2)]) -> [task(a1, b1), task(a2, b2)]
169#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
170pub struct Starmap {
171    /// Task to apply
172    pub task: Signature,
173
174    /// List of argument tuples
175    pub argsets: Vec<Vec<serde_json::Value>>,
176}
177
178impl Starmap {
179    pub fn new(task: Signature, argsets: Vec<Vec<serde_json::Value>>) -> Self {
180        Self { task, argsets }
181    }
182
183    /// Apply the starmap by creating a group of tasks with unpacked arguments
184    pub async fn apply<B: Broker>(self, broker: &B) -> Result<Uuid, CanvasError> {
185        // Starmap is the same as Map - the unpacking happens in task execution
186        let map = Map::new(self.task, self.argsets);
187        map.apply(broker).await
188    }
189
190    /// Check if starmap is empty
191    pub fn is_empty(&self) -> bool {
192        self.argsets.is_empty()
193    }
194
195    /// Get number of argument sets (and thus tasks)
196    pub fn len(&self) -> usize {
197        self.argsets.len()
198    }
199}
200
201impl std::fmt::Display for Starmap {
202    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
203        write!(
204            f,
205            "Starmap[task={}, {} argsets]",
206            self.task.task,
207            self.argsets.len()
208        )
209    }
210}
211
212/// Chunks: Split iterable into chunks for parallel processing
213///
214/// chunks(task, items, chunk_size) -> Group of tasks, each processing a chunk
215///
216/// # Example
217/// ```
218/// use celers_canvas::{Chunks, Signature};
219///
220/// let task = Signature::new("process_batch".to_string());
221/// let items: Vec<serde_json::Value> = (0..100).map(|i| serde_json::json!(i)).collect();
222///
223/// // Process 100 items in chunks of 10 (creates 10 parallel tasks)
224/// let chunks = Chunks::new(task, items, 10);
225/// assert_eq!(chunks.num_chunks(), 10);
226/// ```
227#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
228pub struct Chunks {
229    /// Task to apply to each chunk
230    pub task: Signature,
231
232    /// Items to split into chunks
233    pub items: Vec<serde_json::Value>,
234
235    /// Size of each chunk
236    pub chunk_size: usize,
237}
238
239impl Chunks {
240    /// Create a new Chunks workflow
241    ///
242    /// # Arguments
243    /// * `task` - The task signature to apply to each chunk
244    /// * `items` - Items to split into chunks
245    /// * `chunk_size` - Number of items per chunk
246    pub fn new(task: Signature, items: Vec<serde_json::Value>, chunk_size: usize) -> Self {
247        Self {
248            task,
249            items,
250            chunk_size: chunk_size.max(1), // Minimum chunk size of 1
251        }
252    }
253
254    /// Get the number of chunks that will be created
255    pub fn num_chunks(&self) -> usize {
256        if self.items.is_empty() {
257            0
258        } else {
259            self.items.len().div_ceil(self.chunk_size)
260        }
261    }
262
263    /// Check if chunks is empty
264    pub fn is_empty(&self) -> bool {
265        self.items.is_empty()
266    }
267
268    /// Get total number of items
269    pub fn len(&self) -> usize {
270        self.items.len()
271    }
272
273    /// Convert to a Group for execution
274    pub fn to_group(&self) -> Group {
275        let mut group = Group::new();
276
277        for chunk in self.items.chunks(self.chunk_size) {
278            let mut sig = self.task.clone();
279            sig.args = vec![serde_json::json!(chunk)];
280            group = group.add_signature(sig);
281        }
282
283        group
284    }
285
286    /// Apply the chunks by creating a group of tasks
287    pub async fn apply<B: Broker>(self, broker: &B) -> Result<Uuid, CanvasError> {
288        if self.items.is_empty() {
289            return Err(CanvasError::Invalid("Chunks cannot be empty".to_string()));
290        }
291
292        self.to_group().apply(broker).await
293    }
294}
295
296impl std::fmt::Display for Chunks {
297    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
298        write!(
299            f,
300            "Chunks[task={}, {} items, chunk_size={}, {} chunks]",
301            self.task.task,
302            self.items.len(),
303            self.chunk_size,
304            self.num_chunks()
305        )
306    }
307}
308
309/// XMap: Map with exception handling
310///
311/// Like Map, but continues processing even if some tasks fail.
312/// Failed tasks are tracked separately.
313#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
314pub struct XMap {
315    /// Task to apply
316    pub task: Signature,
317
318    /// List of argument sets
319    pub argsets: Vec<Vec<serde_json::Value>>,
320
321    /// Whether to stop on first error
322    pub fail_fast: bool,
323}
324
325impl XMap {
326    /// Create a new XMap workflow
327    pub fn new(task: Signature, argsets: Vec<Vec<serde_json::Value>>) -> Self {
328        Self {
329            task,
330            argsets,
331            fail_fast: false,
332        }
333    }
334
335    /// Set fail-fast behavior (stop on first error)
336    pub fn fail_fast(mut self, fail_fast: bool) -> Self {
337        self.fail_fast = fail_fast;
338        self
339    }
340
341    /// Check if empty
342    pub fn is_empty(&self) -> bool {
343        self.argsets.is_empty()
344    }
345
346    /// Get number of argument sets
347    pub fn len(&self) -> usize {
348        self.argsets.len()
349    }
350
351    /// Apply the xmap by creating a group of tasks
352    ///
353    /// Note: Exception handling is done at the result collection level,
354    /// not during task submission.
355    pub async fn apply<B: Broker>(self, broker: &B) -> Result<Uuid, CanvasError> {
356        let map = Map::new(self.task, self.argsets);
357        map.apply(broker).await
358    }
359}
360
361impl std::fmt::Display for XMap {
362    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
363        write!(
364            f,
365            "XMap[task={}, {} argsets, fail_fast={}]",
366            self.task.task,
367            self.argsets.len(),
368            self.fail_fast
369        )
370    }
371}
372
373/// XStarmap: Starmap with exception handling
374///
375/// Like Starmap, but continues processing even if some tasks fail.
376#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
377pub struct XStarmap {
378    /// Task to apply
379    pub task: Signature,
380
381    /// List of argument tuples
382    pub argsets: Vec<Vec<serde_json::Value>>,
383
384    /// Whether to stop on first error
385    pub fail_fast: bool,
386}
387
388impl XStarmap {
389    /// Create a new XStarmap workflow
390    pub fn new(task: Signature, argsets: Vec<Vec<serde_json::Value>>) -> Self {
391        Self {
392            task,
393            argsets,
394            fail_fast: false,
395        }
396    }
397
398    /// Set fail-fast behavior (stop on first error)
399    pub fn fail_fast(mut self, fail_fast: bool) -> Self {
400        self.fail_fast = fail_fast;
401        self
402    }
403
404    /// Check if empty
405    pub fn is_empty(&self) -> bool {
406        self.argsets.is_empty()
407    }
408
409    /// Get number of argument sets
410    pub fn len(&self) -> usize {
411        self.argsets.len()
412    }
413
414    /// Apply the xstarmap by creating a group of tasks
415    pub async fn apply<B: Broker>(self, broker: &B) -> Result<Uuid, CanvasError> {
416        let starmap = Starmap::new(self.task, self.argsets);
417        starmap.apply(broker).await
418    }
419}
420
421impl std::fmt::Display for XStarmap {
422    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
423        write!(
424            f,
425            "XStarmap[task={}, {} argsets, fail_fast={}]",
426            self.task.task,
427            self.argsets.len(),
428            self.fail_fast
429        )
430    }
431}