use crate::{Branch, CanvasError, Chain, Group, Map, Signature, Switch};
use celers_core::Broker;
use serde::{Deserialize, Serialize};
use uuid::Uuid;
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "element_type")]
pub enum CanvasElement {
Signature(Signature),
Chain(Chain),
Group(Group),
Chord {
header: Group,
body: Signature,
},
Map {
task: Signature,
argsets: Vec<Vec<serde_json::Value>>,
},
Branch(Branch),
Switch(Switch),
}
impl CanvasElement {
pub fn signature(sig: Signature) -> Self {
Self::Signature(sig)
}
pub fn task(name: impl Into<String>, args: Vec<serde_json::Value>) -> Self {
Self::Signature(Signature::new(name.into()).with_args(args))
}
pub fn chain(chain: Chain) -> Self {
Self::Chain(chain)
}
pub fn group(group: Group) -> Self {
Self::Group(group)
}
pub fn chord(header: Group, body: Signature) -> Self {
Self::Chord { header, body }
}
pub fn map(task: Signature, argsets: Vec<Vec<serde_json::Value>>) -> Self {
Self::Map { task, argsets }
}
pub fn branch(branch: Branch) -> Self {
Self::Branch(branch)
}
pub fn switch(switch: Switch) -> Self {
Self::Switch(switch)
}
pub fn is_signature(&self) -> bool {
matches!(self, Self::Signature(_))
}
pub fn is_chain(&self) -> bool {
matches!(self, Self::Chain(_))
}
pub fn is_group(&self) -> bool {
matches!(self, Self::Group(_))
}
pub fn is_chord(&self) -> bool {
matches!(self, Self::Chord { .. })
}
pub fn element_type(&self) -> &'static str {
match self {
Self::Signature(_) => "signature",
Self::Chain(_) => "chain",
Self::Group(_) => "group",
Self::Chord { .. } => "chord",
Self::Map { .. } => "map",
Self::Branch(_) => "branch",
Self::Switch(_) => "switch",
}
}
}
impl std::fmt::Display for CanvasElement {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Signature(sig) => write!(f, "Signature[{}]", sig.task),
Self::Chain(chain) => write!(f, "{}", chain),
Self::Group(group) => write!(f, "{}", group),
Self::Chord { header, body } => {
write!(f, "Chord[header={}, body={}]", header, body.task)
}
Self::Map { task, argsets } => {
write!(f, "Map[task={}, {} argsets]", task.task, argsets.len())
}
Self::Branch(branch) => write!(f, "{}", branch),
Self::Switch(switch) => write!(f, "{}", switch),
}
}
}
impl From<Signature> for CanvasElement {
fn from(sig: Signature) -> Self {
Self::Signature(sig)
}
}
impl From<Chain> for CanvasElement {
fn from(chain: Chain) -> Self {
Self::Chain(chain)
}
}
impl From<Group> for CanvasElement {
fn from(group: Group) -> Self {
Self::Group(group)
}
}
impl From<Branch> for CanvasElement {
fn from(branch: Branch) -> Self {
Self::Branch(branch)
}
}
impl From<Switch> for CanvasElement {
fn from(switch: Switch) -> Self {
Self::Switch(switch)
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct NestedChain {
pub elements: Vec<CanvasElement>,
}
impl NestedChain {
pub fn new() -> Self {
Self {
elements: Vec::new(),
}
}
pub fn then_element(mut self, element: CanvasElement) -> Self {
self.elements.push(element);
self
}
pub fn then_signature(mut self, sig: Signature) -> Self {
self.elements.push(CanvasElement::Signature(sig));
self
}
pub fn then(mut self, task: &str, args: Vec<serde_json::Value>) -> Self {
self.elements.push(CanvasElement::task(task, args));
self
}
pub fn then_group(mut self, group: Group) -> Self {
self.elements.push(CanvasElement::Group(group));
self
}
pub fn then_chord(mut self, header: Group, body: Signature) -> Self {
self.elements.push(CanvasElement::Chord { header, body });
self
}
pub fn then_branch(mut self, branch: Branch) -> Self {
self.elements.push(CanvasElement::Branch(branch));
self
}
pub fn then_chain(mut self, chain: Chain) -> Self {
self.elements.push(CanvasElement::Chain(chain));
self
}
pub fn is_empty(&self) -> bool {
self.elements.is_empty()
}
pub fn len(&self) -> usize {
self.elements.len()
}
pub fn flatten_signatures(&self) -> Option<Vec<Signature>> {
let mut result = Vec::new();
for element in &self.elements {
match element {
CanvasElement::Signature(sig) => result.push(sig.clone()),
CanvasElement::Chain(chain) => {
result.extend(chain.tasks.clone());
}
_ => return None, }
}
Some(result)
}
pub async fn apply<B: Broker>(&self, broker: &B) -> Result<Uuid, CanvasError> {
if self.elements.is_empty() {
return Err(CanvasError::Invalid(
"NestedChain cannot be empty".to_string(),
));
}
let mut last_id = None;
for element in &self.elements {
match element {
CanvasElement::Signature(sig) => {
let chain = Chain {
tasks: vec![sig.clone()],
};
last_id = Some(chain.apply(broker).await?);
}
CanvasElement::Chain(chain) => {
last_id = Some(chain.clone().apply(broker).await?);
}
CanvasElement::Group(group) => {
last_id = Some(group.clone().apply(broker).await?);
}
CanvasElement::Chord { header, body } => {
#[cfg(feature = "backend-redis")]
{
last_id = Some(header.clone().apply(broker).await?);
let _ = body; }
#[cfg(not(feature = "backend-redis"))]
{
last_id = Some(header.clone().apply(broker).await?);
let _ = body; }
}
CanvasElement::Map { task, argsets } => {
let map = Map::new(task.clone(), argsets.clone());
last_id = Some(map.apply(broker).await?);
}
CanvasElement::Branch(_branch) => {
return Err(CanvasError::Invalid(
"Branch elements not supported in NestedChain.apply()".to_string(),
));
}
CanvasElement::Switch(_switch) => {
return Err(CanvasError::Invalid(
"Switch elements not supported in NestedChain.apply()".to_string(),
));
}
}
}
last_id.ok_or_else(|| CanvasError::Invalid("No elements executed".to_string()))
}
}
impl Default for NestedChain {
fn default() -> Self {
Self::new()
}
}
impl std::fmt::Display for NestedChain {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let element_strs: Vec<String> = self.elements.iter().map(|e| format!("{}", e)).collect();
write!(f, "NestedChain[{}]", element_strs.join(" -> "))
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct NestedGroup {
pub elements: Vec<CanvasElement>,
}
impl NestedGroup {
pub fn new() -> Self {
Self {
elements: Vec::new(),
}
}
pub fn add_element(mut self, element: CanvasElement) -> Self {
self.elements.push(element);
self
}
pub fn add_signature(mut self, sig: Signature) -> Self {
self.elements.push(CanvasElement::Signature(sig));
self
}
pub fn add(mut self, task: &str, args: Vec<serde_json::Value>) -> Self {
self.elements.push(CanvasElement::task(task, args));
self
}
pub fn add_chain(mut self, chain: Chain) -> Self {
self.elements.push(CanvasElement::Chain(chain));
self
}
pub fn is_empty(&self) -> bool {
self.elements.is_empty()
}
pub fn len(&self) -> usize {
self.elements.len()
}
pub fn flatten_signatures(&self) -> Option<Vec<Signature>> {
let mut result = Vec::new();
for element in &self.elements {
match element {
CanvasElement::Signature(sig) => result.push(sig.clone()),
_ => return None,
}
}
Some(result)
}
pub async fn apply<B: Broker>(&self, broker: &B) -> Result<Uuid, CanvasError> {
if self.elements.is_empty() {
return Err(CanvasError::Invalid(
"NestedGroup cannot be empty".to_string(),
));
}
let group_id = Uuid::new_v4();
for element in &self.elements {
match element {
CanvasElement::Signature(sig) => {
let chain = Chain {
tasks: vec![sig.clone()],
};
chain.apply(broker).await?;
}
CanvasElement::Chain(chain) => {
chain.clone().apply(broker).await?;
}
CanvasElement::Group(group) => {
group.clone().apply(broker).await?;
}
CanvasElement::Chord { header, body } => {
#[cfg(feature = "backend-redis")]
{
header.clone().apply(broker).await?;
let _ = body; }
#[cfg(not(feature = "backend-redis"))]
{
header.clone().apply(broker).await?;
let _ = body; }
}
CanvasElement::Map { task, argsets } => {
let map = Map::new(task.clone(), argsets.clone());
map.apply(broker).await?;
}
CanvasElement::Branch(_branch) => {
return Err(CanvasError::Invalid(
"Branch elements not supported in NestedGroup.apply()".to_string(),
));
}
CanvasElement::Switch(_switch) => {
return Err(CanvasError::Invalid(
"Switch elements not supported in NestedGroup.apply()".to_string(),
));
}
}
}
Ok(group_id)
}
}
impl Default for NestedGroup {
fn default() -> Self {
Self::new()
}
}
impl std::fmt::Display for NestedGroup {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let element_strs: Vec<String> = self.elements.iter().map(|e| format!("{}", e)).collect();
write!(f, "NestedGroup[{}]", element_strs.join(" | "))
}
}