1use crate::{CanvasError, Group, Signature};
2use celers_core::Broker;
3#[cfg(feature = "backend-redis")]
4use celers_core::SerializedTask;
5use serde::{Deserialize, Serialize};
6use uuid::Uuid;
7
8#[cfg(feature = "backend-redis")]
9use celers_backend_redis::{ChordState, ResultBackend};
10
11#[cfg(feature = "backend-redis")]
12use chrono::Utc;
13
14#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
18pub struct Chord {
19 pub header: Group,
21
22 pub body: Signature,
24}
25
26impl Chord {
27 pub fn new(header: Group, body: Signature) -> Self {
28 Self { header, body }
29 }
30
31 #[cfg(feature = "backend-redis")]
33 pub async fn apply<B: Broker, R: ResultBackend>(
34 mut self,
35 broker: &B,
36 backend: &mut R,
37 ) -> Result<Uuid, CanvasError> {
38 if self.header.tasks.is_empty() {
39 return Err(CanvasError::Invalid(
40 "Chord header cannot be empty".to_string(),
41 ));
42 }
43
44 let chord_id = Uuid::new_v4();
45 let total = self.header.tasks.len();
46
47 let chord_state = ChordState {
49 chord_id,
50 total,
51 completed: 0,
52 callback: Some(self.body.task.clone()),
53 task_ids: Vec::new(),
54 created_at: Utc::now(),
55 timeout: None,
56 cancelled: false,
57 cancellation_reason: None,
58 retry_count: 0,
59 max_retries: None,
60 };
61
62 backend
63 .chord_init(chord_state)
64 .await
65 .map_err(|e| CanvasError::Broker(format!("Failed to initialize chord: {}", e)))?;
66
67 for sig in &mut self.header.tasks {
69 let args_json = serde_json::json!({
70 "args": sig.args,
71 "kwargs": sig.kwargs
72 });
73 let args_bytes = serde_json::to_vec(&args_json)
74 .map_err(|e| CanvasError::Serialization(e.to_string()))?;
75
76 let mut task = SerializedTask::new(sig.task.clone(), args_bytes);
77
78 if let Some(priority) = sig.options.priority {
79 task = task.with_priority(priority.into());
80 }
81
82 task.metadata.chord_id = Some(chord_id);
84
85 broker
86 .enqueue(task)
87 .await
88 .map_err(|e| CanvasError::Broker(e.to_string()))?;
89 }
90
91 Ok(chord_id)
92 }
93
94 #[cfg(not(feature = "backend-redis"))]
96 pub async fn apply<B: Broker>(self, broker: &B) -> Result<Uuid, CanvasError> {
97 self.header.apply(broker).await
100 }
101}
102
103impl std::fmt::Display for Chord {
104 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
105 write!(
106 f,
107 "Chord[{} tasks] -> callback({})",
108 self.header.tasks.len(),
109 self.body.task
110 )
111 }
112}
113
114#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
118pub struct Map {
119 pub task: Signature,
121
122 pub argsets: Vec<Vec<serde_json::Value>>,
124}
125
126impl Map {
127 pub fn new(task: Signature, argsets: Vec<Vec<serde_json::Value>>) -> Self {
128 Self { task, argsets }
129 }
130
131 pub async fn apply<B: Broker>(self, broker: &B) -> Result<Uuid, CanvasError> {
133 let mut group = Group::new();
134
135 for args in self.argsets {
136 let mut sig = self.task.clone();
137 sig.args = args;
138 group = group.add_signature(sig);
139 }
140
141 group.apply(broker).await
142 }
143
144 pub fn is_empty(&self) -> bool {
146 self.argsets.is_empty()
147 }
148
149 pub fn len(&self) -> usize {
151 self.argsets.len()
152 }
153}
154
155impl std::fmt::Display for Map {
156 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
157 write!(
158 f,
159 "Map[task={}, {} argsets]",
160 self.task.task,
161 self.argsets.len()
162 )
163 }
164}
165
166#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
170pub struct Starmap {
171 pub task: Signature,
173
174 pub argsets: Vec<Vec<serde_json::Value>>,
176}
177
178impl Starmap {
179 pub fn new(task: Signature, argsets: Vec<Vec<serde_json::Value>>) -> Self {
180 Self { task, argsets }
181 }
182
183 pub async fn apply<B: Broker>(self, broker: &B) -> Result<Uuid, CanvasError> {
185 let map = Map::new(self.task, self.argsets);
187 map.apply(broker).await
188 }
189
190 pub fn is_empty(&self) -> bool {
192 self.argsets.is_empty()
193 }
194
195 pub fn len(&self) -> usize {
197 self.argsets.len()
198 }
199}
200
201impl std::fmt::Display for Starmap {
202 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
203 write!(
204 f,
205 "Starmap[task={}, {} argsets]",
206 self.task.task,
207 self.argsets.len()
208 )
209 }
210}
211
212#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
228pub struct Chunks {
229 pub task: Signature,
231
232 pub items: Vec<serde_json::Value>,
234
235 pub chunk_size: usize,
237}
238
239impl Chunks {
240 pub fn new(task: Signature, items: Vec<serde_json::Value>, chunk_size: usize) -> Self {
247 Self {
248 task,
249 items,
250 chunk_size: chunk_size.max(1), }
252 }
253
254 pub fn num_chunks(&self) -> usize {
256 if self.items.is_empty() {
257 0
258 } else {
259 self.items.len().div_ceil(self.chunk_size)
260 }
261 }
262
263 pub fn is_empty(&self) -> bool {
265 self.items.is_empty()
266 }
267
268 pub fn len(&self) -> usize {
270 self.items.len()
271 }
272
273 pub fn to_group(&self) -> Group {
275 let mut group = Group::new();
276
277 for chunk in self.items.chunks(self.chunk_size) {
278 let mut sig = self.task.clone();
279 sig.args = vec![serde_json::json!(chunk)];
280 group = group.add_signature(sig);
281 }
282
283 group
284 }
285
286 pub async fn apply<B: Broker>(self, broker: &B) -> Result<Uuid, CanvasError> {
288 if self.items.is_empty() {
289 return Err(CanvasError::Invalid("Chunks cannot be empty".to_string()));
290 }
291
292 self.to_group().apply(broker).await
293 }
294}
295
296impl std::fmt::Display for Chunks {
297 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
298 write!(
299 f,
300 "Chunks[task={}, {} items, chunk_size={}, {} chunks]",
301 self.task.task,
302 self.items.len(),
303 self.chunk_size,
304 self.num_chunks()
305 )
306 }
307}
308
309#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
314pub struct XMap {
315 pub task: Signature,
317
318 pub argsets: Vec<Vec<serde_json::Value>>,
320
321 pub fail_fast: bool,
323}
324
325impl XMap {
326 pub fn new(task: Signature, argsets: Vec<Vec<serde_json::Value>>) -> Self {
328 Self {
329 task,
330 argsets,
331 fail_fast: false,
332 }
333 }
334
335 pub fn fail_fast(mut self, fail_fast: bool) -> Self {
337 self.fail_fast = fail_fast;
338 self
339 }
340
341 pub fn is_empty(&self) -> bool {
343 self.argsets.is_empty()
344 }
345
346 pub fn len(&self) -> usize {
348 self.argsets.len()
349 }
350
351 pub async fn apply<B: Broker>(self, broker: &B) -> Result<Uuid, CanvasError> {
356 let map = Map::new(self.task, self.argsets);
357 map.apply(broker).await
358 }
359}
360
361impl std::fmt::Display for XMap {
362 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
363 write!(
364 f,
365 "XMap[task={}, {} argsets, fail_fast={}]",
366 self.task.task,
367 self.argsets.len(),
368 self.fail_fast
369 )
370 }
371}
372
373#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
377pub struct XStarmap {
378 pub task: Signature,
380
381 pub argsets: Vec<Vec<serde_json::Value>>,
383
384 pub fail_fast: bool,
386}
387
388impl XStarmap {
389 pub fn new(task: Signature, argsets: Vec<Vec<serde_json::Value>>) -> Self {
391 Self {
392 task,
393 argsets,
394 fail_fast: false,
395 }
396 }
397
398 pub fn fail_fast(mut self, fail_fast: bool) -> Self {
400 self.fail_fast = fail_fast;
401 self
402 }
403
404 pub fn is_empty(&self) -> bool {
406 self.argsets.is_empty()
407 }
408
409 pub fn len(&self) -> usize {
411 self.argsets.len()
412 }
413
414 pub async fn apply<B: Broker>(self, broker: &B) -> Result<Uuid, CanvasError> {
416 let starmap = Starmap::new(self.task, self.argsets);
417 starmap.apply(broker).await
418 }
419}
420
421impl std::fmt::Display for XStarmap {
422 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
423 write!(
424 f,
425 "XStarmap[task={}, {} argsets, fail_fast={}]",
426 self.task.task,
427 self.argsets.len(),
428 self.fail_fast
429 )
430 }
431}