use crate::{CanvasError, Group, Signature};
use celers_core::Broker;
#[cfg(feature = "backend-redis")]
use celers_core::SerializedTask;
use serde::{Deserialize, Serialize};
use uuid::Uuid;
#[cfg(feature = "backend-redis")]
use celers_backend_redis::{ChordState, ResultBackend};
#[cfg(feature = "backend-redis")]
use chrono::Utc;
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct Chord {
pub header: Group,
pub body: Signature,
}
impl Chord {
pub fn new(header: Group, body: Signature) -> Self {
Self { header, body }
}
#[cfg(feature = "backend-redis")]
pub async fn apply<B: Broker, R: ResultBackend>(
mut self,
broker: &B,
backend: &mut R,
) -> Result<Uuid, CanvasError> {
if self.header.tasks.is_empty() {
return Err(CanvasError::Invalid(
"Chord header cannot be empty".to_string(),
));
}
let chord_id = Uuid::new_v4();
let total = self.header.tasks.len();
let chord_state = ChordState {
chord_id,
total,
completed: 0,
callback: Some(self.body.task.clone()),
task_ids: Vec::new(),
created_at: Utc::now(),
timeout: None,
cancelled: false,
cancellation_reason: None,
retry_count: 0,
max_retries: None,
};
backend
.chord_init(chord_state)
.await
.map_err(|e| CanvasError::Broker(format!("Failed to initialize chord: {}", e)))?;
for sig in &mut self.header.tasks {
let args_json = serde_json::json!({
"args": sig.args,
"kwargs": sig.kwargs
});
let args_bytes = serde_json::to_vec(&args_json)
.map_err(|e| CanvasError::Serialization(e.to_string()))?;
let mut task = SerializedTask::new(sig.task.clone(), args_bytes);
if let Some(priority) = sig.options.priority {
task = task.with_priority(priority.into());
}
task.metadata.chord_id = Some(chord_id);
broker
.enqueue(task)
.await
.map_err(|e| CanvasError::Broker(e.to_string()))?;
}
Ok(chord_id)
}
#[cfg(not(feature = "backend-redis"))]
pub async fn apply<B: Broker>(self, broker: &B) -> Result<Uuid, CanvasError> {
self.header.apply(broker).await
}
}
impl std::fmt::Display for Chord {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"Chord[{} tasks] -> callback({})",
self.header.tasks.len(),
self.body.task
)
}
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct Map {
pub task: Signature,
pub argsets: Vec<Vec<serde_json::Value>>,
}
impl Map {
pub fn new(task: Signature, argsets: Vec<Vec<serde_json::Value>>) -> Self {
Self { task, argsets }
}
pub async fn apply<B: Broker>(self, broker: &B) -> Result<Uuid, CanvasError> {
let mut group = Group::new();
for args in self.argsets {
let mut sig = self.task.clone();
sig.args = args;
group = group.add_signature(sig);
}
group.apply(broker).await
}
pub fn is_empty(&self) -> bool {
self.argsets.is_empty()
}
pub fn len(&self) -> usize {
self.argsets.len()
}
}
impl std::fmt::Display for Map {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"Map[task={}, {} argsets]",
self.task.task,
self.argsets.len()
)
}
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct Starmap {
pub task: Signature,
pub argsets: Vec<Vec<serde_json::Value>>,
}
impl Starmap {
pub fn new(task: Signature, argsets: Vec<Vec<serde_json::Value>>) -> Self {
Self { task, argsets }
}
pub async fn apply<B: Broker>(self, broker: &B) -> Result<Uuid, CanvasError> {
let map = Map::new(self.task, self.argsets);
map.apply(broker).await
}
pub fn is_empty(&self) -> bool {
self.argsets.is_empty()
}
pub fn len(&self) -> usize {
self.argsets.len()
}
}
impl std::fmt::Display for Starmap {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"Starmap[task={}, {} argsets]",
self.task.task,
self.argsets.len()
)
}
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct Chunks {
pub task: Signature,
pub items: Vec<serde_json::Value>,
pub chunk_size: usize,
}
impl Chunks {
pub fn new(task: Signature, items: Vec<serde_json::Value>, chunk_size: usize) -> Self {
Self {
task,
items,
chunk_size: chunk_size.max(1), }
}
pub fn num_chunks(&self) -> usize {
if self.items.is_empty() {
0
} else {
self.items.len().div_ceil(self.chunk_size)
}
}
pub fn is_empty(&self) -> bool {
self.items.is_empty()
}
pub fn len(&self) -> usize {
self.items.len()
}
pub fn to_group(&self) -> Group {
let mut group = Group::new();
for chunk in self.items.chunks(self.chunk_size) {
let mut sig = self.task.clone();
sig.args = vec![serde_json::json!(chunk)];
group = group.add_signature(sig);
}
group
}
pub async fn apply<B: Broker>(self, broker: &B) -> Result<Uuid, CanvasError> {
if self.items.is_empty() {
return Err(CanvasError::Invalid("Chunks cannot be empty".to_string()));
}
self.to_group().apply(broker).await
}
}
impl std::fmt::Display for Chunks {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"Chunks[task={}, {} items, chunk_size={}, {} chunks]",
self.task.task,
self.items.len(),
self.chunk_size,
self.num_chunks()
)
}
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct XMap {
pub task: Signature,
pub argsets: Vec<Vec<serde_json::Value>>,
pub fail_fast: bool,
}
impl XMap {
pub fn new(task: Signature, argsets: Vec<Vec<serde_json::Value>>) -> Self {
Self {
task,
argsets,
fail_fast: false,
}
}
pub fn fail_fast(mut self, fail_fast: bool) -> Self {
self.fail_fast = fail_fast;
self
}
pub fn is_empty(&self) -> bool {
self.argsets.is_empty()
}
pub fn len(&self) -> usize {
self.argsets.len()
}
pub async fn apply<B: Broker>(self, broker: &B) -> Result<Uuid, CanvasError> {
let map = Map::new(self.task, self.argsets);
map.apply(broker).await
}
}
impl std::fmt::Display for XMap {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"XMap[task={}, {} argsets, fail_fast={}]",
self.task.task,
self.argsets.len(),
self.fail_fast
)
}
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct XStarmap {
pub task: Signature,
pub argsets: Vec<Vec<serde_json::Value>>,
pub fail_fast: bool,
}
impl XStarmap {
pub fn new(task: Signature, argsets: Vec<Vec<serde_json::Value>>) -> Self {
Self {
task,
argsets,
fail_fast: false,
}
}
pub fn fail_fast(mut self, fail_fast: bool) -> Self {
self.fail_fast = fail_fast;
self
}
pub fn is_empty(&self) -> bool {
self.argsets.is_empty()
}
pub fn len(&self) -> usize {
self.argsets.len()
}
pub async fn apply<B: Broker>(self, broker: &B) -> Result<Uuid, CanvasError> {
let starmap = Starmap::new(self.task, self.argsets);
starmap.apply(broker).await
}
}
impl std::fmt::Display for XStarmap {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"XStarmap[task={}, {} argsets, fail_fast={}]",
self.task.task,
self.argsets.len(),
self.fail_fast
)
}
}