use std::collections::HashMap;
use std::fmt::Debug;
use std::iter::Iterator;
use std::sync::Arc;
use std::time::Instant;
use eval::Expr;
use serde::Deserialize;
use serde_json::Value;
use tokio::sync::mpsc::{Receiver, Sender};
use uuid::Uuid;
use crate::node::{types::*, Node};
use crate::reactor::Event;
#[derive(Deserialize, Debug, Clone, PartialEq)]
#[serde(rename_all = "camelCase")]
pub struct Pointer {
pub points_to: String,
expression: Option<String>,
}
#[derive(Deserialize, Debug, Clone, PartialEq)]
#[serde(rename_all = "camelCase")]
pub struct Parameter {
pub key: String,
#[serde(rename = "type")]
kind: String,
pub value: Value,
}
#[derive(Deserialize, Debug, Clone)]
pub struct WorkflowNode {
#[serde(rename = "type")]
kind: WorkflowNodeType,
pub id: String,
pointers: Vec<Pointer>,
pub parameters: Option<Vec<Parameter>>,
}
#[derive(Deserialize, Debug)]
#[serde(rename_all = "camelCase")]
pub struct Workflow {
id: String,
workflow_id: String,
associated_user_id: String,
project_id: String,
workflow: Vec<WorkflowNode>,
}
#[derive(PartialEq, Deserialize, Debug)]
pub enum WorkflowNodeStatus {
NotProcessed,
Pending,
Processing,
Failed,
Success,
BotBusy,
AwaitingBotResponse,
Cancelled,
}
#[derive(PartialEq, Deserialize, Debug, Clone)]
pub enum WorkflowNodeType {
Start,
Parallel,
Exclusive,
Activity,
Trigger,
End,
}
#[derive(PartialEq, Deserialize, Debug, Clone)]
pub enum JobStatus {
NotStarted,
Processing,
Finished,
}
#[derive(Debug, Clone)]
pub struct Job {
id: String,
owner_id: String,
context: Vec<Parameter>,
current: Option<usize>,
cursor_map: HashMap<String, Vec<Pointer>>,
pub nodes: Vec<Arc<Box<dyn Node>>>,
status: JobStatus,
}
impl Job {
pub fn new(wf: &Workflow, reactor_tx: &Sender<Event>) -> (Self, Receiver<Message>) {
let (exec_tx, exec_rx) = tokio::sync::mpsc::channel(20);
let mut nodes: Vec<Arc<Box<dyn Node>>> = Vec::with_capacity(wf.workflow.len());
let mut cursor_map: HashMap<String, Vec<Pointer>> = HashMap::new();
for (i, node) in wf.workflow.iter().enumerate() {
match node.kind {
WorkflowNodeType::Start => {
nodes.push(Arc::new(Box::new(Start::new(node, exec_tx.clone(), i))));
}
WorkflowNodeType::Parallel => {
let dependencies = wf
.workflow
.iter()
.map(|n| &n.pointers)
.flatten()
.filter(|p| p.points_to == node.id)
.count();
let wrapped_deps = if dependencies == 1 {
None
} else {
Some(dependencies)
};
nodes.push(Arc::new(Box::new(Parallel::new(
node,
exec_tx.clone(),
i,
wrapped_deps,
))));
}
WorkflowNodeType::Exclusive => {
nodes.push(Arc::new(Box::new(Exclusive::new(node, exec_tx.clone(), i))));
}
WorkflowNodeType::Activity => {
nodes.push(Arc::new(Box::new(Activity::new(
node,
reactor_tx,
exec_tx.clone(),
i,
))));
}
WorkflowNodeType::Trigger => {
todo!()
}
WorkflowNodeType::End => {
nodes.push(Arc::new(Box::new(End::new(node, exec_tx.clone(), i))));
}
}
cursor_map.insert(node.id.clone(), node.pointers.clone());
}
(
Job {
id: Uuid::new_v4().to_string(),
owner_id: wf.associated_user_id.clone(),
context: vec![],
current: None,
cursor_map,
nodes,
status: JobStatus::NotStarted,
},
exec_rx,
)
}
pub fn next_node(&self, pointer: Option<usize>) -> Option<Vec<Arc<Box<dyn Node>>>> {
if let Some(ptr) = pointer {
let current = &**self.nodes.get(ptr)?;
let points_to = self.cursor_map.get(current.id())?;
let mut next_nodes: Vec<Arc<Box<dyn Node>>> = vec![];
for path in points_to {
if let Some(expression) = &path.expression {
if !Expr::new(expression)
.exec()
.expect("Unable to evaluate expression")
.is_boolean()
{
continue;
}
}
next_nodes.push(
self.nodes
.iter()
.find(|x| path.points_to == x.id())?
.clone(),
)
}
if next_nodes.is_empty() && current.kind() == WorkflowNodeType::End {
None
} else {
Some(next_nodes)
}
} else {
Some(vec![
self.nodes
.iter()
.find(|x| x.kind() == WorkflowNodeType::Start)?
.clone();
1
])
}
}
}
pub enum NodeStatus {
Success,
Failed,
}
pub struct Message {
pub pointer: usize,
pub status: NodeStatus,
pub context: Vec<Parameter>,
}