pub mod error;
pub mod functions;
pub mod message;
pub mod task;
pub mod workflow;
pub use error::{DataflowError, ErrorInfo, Result};
pub use functions::AsyncFunctionHandler;
pub use message::Message;
pub use task::Task;
pub use workflow::Workflow;
pub use datalogic_rs as jsonlogic;
use chrono::Utc;
use datalogic_rs::DataLogic;
use log::{debug, error, info, warn};
use message::AuditTrail;
use serde_json::{json, Map, Number, Value};
use std::{cell::RefCell, collections::HashMap};
use tokio::time::sleep;
thread_local! {
static THREAD_LOCAL_DATA_LOGIC: RefCell<DataLogic> = RefCell::new(DataLogic::new());
}
#[derive(Debug, Clone)]
pub struct RetryConfig {
pub max_retries: u32,
pub retry_delay_ms: u64,
pub use_backoff: bool,
}
impl Default for RetryConfig {
fn default() -> Self {
Self {
max_retries: 3,
retry_delay_ms: 1000,
use_backoff: true,
}
}
}
pub struct Engine {
workflows: HashMap<String, Workflow>,
task_functions: HashMap<String, Box<dyn AsyncFunctionHandler + Send + Sync>>,
retry_config: RetryConfig,
}
impl Default for Engine {
fn default() -> Self {
Self::new()
}
}
impl Engine {
pub fn new() -> Self {
let mut engine = Self {
workflows: HashMap::new(),
task_functions: HashMap::new(),
retry_config: RetryConfig::default(),
};
for (name, handler) in functions::builtins::get_all_functions() {
engine.register_task_function(name, handler);
}
engine
}
pub fn new_empty() -> Self {
Self {
task_functions: HashMap::new(),
workflows: HashMap::new(),
retry_config: RetryConfig::default(),
}
}
pub fn with_retry_config(mut self, config: RetryConfig) -> Self {
self.retry_config = config;
self
}
pub fn add_workflow(&mut self, workflow: &Workflow) {
if workflow.validate().is_ok() {
self.workflows.insert(workflow.id.clone(), workflow.clone());
} else {
error!("Invalid workflow: {}", workflow.id);
}
}
pub fn register_task_function(
&mut self,
name: String,
handler: Box<dyn AsyncFunctionHandler + Send + Sync>,
) {
self.task_functions.insert(name, handler);
}
pub fn has_function(&self, name: &str) -> bool {
self.task_functions.contains_key(name)
}
pub async fn process_message(&self, message: &mut Message) -> Result<()> {
debug!(
"Processing message {} sequentially through workflows",
message.id
);
let mut sorted_workflows: Vec<_> = self.workflows.iter().collect();
sorted_workflows.sort_by_key(|(id, _)| id.as_str());
for (_, workflow) in sorted_workflows {
let condition = workflow.condition.clone().unwrap_or(Value::Bool(true));
if !self
.evaluate_condition(&condition, &message.metadata)
.await?
{
debug!("Workflow {} skipped - condition not met", workflow.id);
continue;
}
info!("Processing workflow {}", workflow.id);
let mut fresh_message = Message::new(&message.payload);
fresh_message.data = message.data.clone();
fresh_message.metadata = message.metadata.clone();
fresh_message.temp_data = message.temp_data.clone();
fresh_message.errors = message.errors.clone();
let (workflow_id, workflow_message) = Self::process_workflow(
workflow.clone(),
fresh_message,
&self.task_functions,
&self.retry_config,
)
.await;
message.data = workflow_message.data;
message.metadata = workflow_message.metadata;
message.temp_data = workflow_message.temp_data;
message.audit_trail.extend(workflow_message.audit_trail);
message.errors.extend(workflow_message.errors);
info!("Completed processing workflow {}", workflow_id);
}
debug!(
"Completed processing all workflows for message {}",
message.id
);
Ok(())
}
async fn process_workflow(
workflow: Workflow,
mut message: Message,
task_functions: &HashMap<String, Box<dyn AsyncFunctionHandler + Send + Sync>>,
retry_config: &RetryConfig,
) -> (String, Message) {
let workflow_id = workflow.id.clone();
let mut workflow_errors = Vec::new();
for task in &workflow.tasks {
let task_condition = task.condition.clone().unwrap_or(Value::Bool(true));
let should_execute = THREAD_LOCAL_DATA_LOGIC.with(|data_logic_cell| {
let mut data_logic = data_logic_cell.borrow_mut();
data_logic.reset_arena();
data_logic
.evaluate_json(&task_condition, &message.metadata, None)
.map_err(|e| {
DataflowError::LogicEvaluation(format!("Error evaluating condition: {}", e))
})
.map(|result| result.as_bool().unwrap_or(false))
});
let should_execute = match should_execute {
Ok(result) => result,
Err(e) => {
workflow_errors.push(ErrorInfo::new(
Some(workflow_id.clone()),
Some(task.id.clone()),
e.clone(),
));
false
}
};
if !should_execute {
debug!("Task {} skipped - condition not met", task.id);
continue;
}
if let Some(function) = task_functions.get(&task.function.name) {
let task_id = task.id.clone();
let function_input = task.function.input.clone();
match Self::execute_task_static(
&task_id,
&workflow_id,
&mut message,
&function_input,
function.as_ref(),
retry_config,
)
.await
{
Ok(_) => {
debug!("Task {} completed successfully", task_id);
}
Err(error) => {
workflow_errors.push(ErrorInfo::new(
Some(workflow_id.clone()),
Some(task_id.clone()),
error.clone(),
));
break;
}
}
} else {
let error =
DataflowError::Workflow(format!("Function '{}' not found", task.function.name));
workflow_errors.push(ErrorInfo::new(
Some(workflow_id.clone()),
Some(task.id.clone()),
error,
));
break;
}
}
message.errors.extend(workflow_errors);
(workflow_id, message)
}
async fn execute_task_static(
task_id: &str,
workflow_id: &str,
message: &mut Message,
input_json: &Value,
function: &dyn AsyncFunctionHandler,
retry_config: &RetryConfig,
) -> Result<()> {
info!("Executing task {} in workflow {}", task_id, workflow_id);
let mut last_error = None;
let mut retry_count = 0;
while retry_count <= retry_config.max_retries {
match function.execute(message, input_json).await {
Ok((status_code, changes)) => {
message.audit_trail.push(AuditTrail {
workflow_id: workflow_id.to_string(),
task_id: task_id.to_string(),
timestamp: Utc::now().to_rfc3339(),
changes,
status_code,
});
info!("Task {} completed with status {}", task_id, status_code);
let mut progress = Map::new();
progress.insert("task_id".to_string(), Value::String(task_id.to_string()));
progress.insert(
"workflow_id".to_string(),
Value::String(workflow_id.to_string()),
);
progress.insert(
"status_code".to_string(),
Value::Number(Number::from(status_code)),
);
progress.insert(
"timestamp".to_string(),
Value::String(Utc::now().to_rfc3339()),
);
if retry_count > 0 {
progress.insert(
"retries".to_string(),
Value::Number(Number::from(retry_count)),
);
}
message.metadata["progress"] = json!(progress);
return Ok(());
}
Err(e) => {
last_error = Some(e.clone());
if retry_count < retry_config.max_retries {
warn!(
"Task {} execution failed, retry {}/{}: {:?}",
task_id,
retry_count + 1,
retry_config.max_retries,
e
);
let delay = if retry_config.use_backoff {
retry_config.retry_delay_ms * (2_u64.pow(retry_count))
} else {
retry_config.retry_delay_ms
};
sleep(std::time::Duration::from_millis(delay)).await;
retry_count += 1;
} else {
break;
}
}
}
}
let error = last_error.unwrap_or_else(|| {
DataflowError::Unknown("Unknown error during task execution".to_string())
});
error!(
"Task {} in workflow {} failed after {} retries: {:?}",
task_id, workflow_id, retry_count, error
);
Err(error)
}
async fn evaluate_condition(&self, condition: &Value, data: &Value) -> Result<bool> {
if let Value::Bool(b) = condition {
return Ok(*b);
}
THREAD_LOCAL_DATA_LOGIC.with(|data_logic_cell| {
let mut data_logic = data_logic_cell.borrow_mut();
data_logic.reset_arena();
data_logic
.evaluate_json(condition, data, None)
.map_err(|e| {
DataflowError::LogicEvaluation(format!("Error evaluating condition: {}", e))
})
.map(|result| result.as_bool().unwrap_or(false))
})
}
}