pub mod compiler;
pub mod error;
pub mod executor;
pub mod functions;
pub mod message;
pub mod task;
pub mod task_executor;
pub mod trace;
pub mod utils;
pub mod workflow;
pub mod workflow_executor;
pub use error::{DataflowError, ErrorInfo, Result};
pub use functions::{AsyncFunctionHandler, FunctionConfig};
pub use message::Message;
pub use task::Task;
pub use trace::{ExecutionStep, ExecutionTrace, StepResult};
pub use workflow::{Workflow, WorkflowStatus};
use chrono::Utc;
use datalogic_rs::{CompiledLogic, DataLogic};
use serde_json::json;
use std::collections::HashMap;
use std::sync::Arc;
use compiler::LogicCompiler;
use executor::InternalExecutor;
use task_executor::TaskExecutor;
use workflow_executor::WorkflowExecutor;
pub struct Engine {
workflows: Arc<Vec<Workflow>>,
channel_index: Arc<HashMap<String, Vec<usize>>>,
workflow_executor: Arc<WorkflowExecutor>,
datalogic: Arc<DataLogic>,
logic_cache: Vec<Arc<CompiledLogic>>,
}
fn build_channel_index(workflows: &[Workflow]) -> HashMap<String, Vec<usize>> {
let mut index: HashMap<String, Vec<usize>> = HashMap::new();
for (i, workflow) in workflows.iter().enumerate() {
if workflow.status == WorkflowStatus::Active {
index.entry(workflow.channel.clone()).or_default().push(i);
}
}
index
}
impl Engine {
pub fn new(
workflows: Vec<Workflow>,
custom_functions: Option<HashMap<String, Box<dyn AsyncFunctionHandler + Send + Sync>>>,
) -> Self {
let mut compiler = LogicCompiler::new();
let sorted_workflows = compiler.compile_workflows(workflows);
let (datalogic, logic_cache) = compiler.into_parts();
let mut task_functions = custom_functions.unwrap_or_default();
for (name, handler) in functions::builtins::get_all_functions() {
task_functions.insert(name, handler);
}
let internal_executor = Arc::new(InternalExecutor::new(
Arc::clone(&datalogic),
logic_cache.clone(),
));
let task_executor = Arc::new(TaskExecutor::new(
Arc::new(task_functions),
Arc::clone(&internal_executor),
Arc::clone(&datalogic),
));
let workflow_executor = Arc::new(WorkflowExecutor::new(task_executor, internal_executor));
let channel_index = build_channel_index(&sorted_workflows);
Self {
workflows: Arc::new(sorted_workflows),
channel_index: Arc::new(channel_index),
workflow_executor,
datalogic,
logic_cache,
}
}
pub fn with_new_workflows(&self, workflows: Vec<Workflow>) -> Self {
let task_functions = self.workflow_executor.task_functions();
let mut compiler = LogicCompiler::new();
let sorted_workflows = compiler.compile_workflows(workflows);
let (datalogic, logic_cache) = compiler.into_parts();
let internal_executor = Arc::new(InternalExecutor::new(
Arc::clone(&datalogic),
logic_cache.clone(),
));
let task_executor = Arc::new(TaskExecutor::new(
task_functions,
Arc::clone(&internal_executor),
Arc::clone(&datalogic),
));
let workflow_executor = Arc::new(WorkflowExecutor::new(task_executor, internal_executor));
let channel_index = build_channel_index(&sorted_workflows);
Self {
workflows: Arc::new(sorted_workflows),
channel_index: Arc::new(channel_index),
workflow_executor,
datalogic,
logic_cache,
}
}
pub async fn process_message(&self, message: &mut Message) -> Result<()> {
message.context["metadata"]["processed_at"] = json!(Utc::now().to_rfc3339());
message.context["metadata"]["engine_version"] = json!(env!("CARGO_PKG_VERSION"));
message.invalidate_context_cache();
for workflow in self.workflows.iter() {
self.workflow_executor.execute(workflow, message).await?;
}
Ok(())
}
pub async fn process_message_with_trace(
&self,
message: &mut Message,
) -> Result<ExecutionTrace> {
use trace::ExecutionTrace;
message.context["metadata"]["processed_at"] = json!(Utc::now().to_rfc3339());
message.context["metadata"]["engine_version"] = json!(env!("CARGO_PKG_VERSION"));
message.invalidate_context_cache();
let mut trace = ExecutionTrace::new();
for workflow in self.workflows.iter() {
self.workflow_executor
.execute_with_trace(workflow, message, &mut trace)
.await?;
}
Ok(trace)
}
pub async fn process_message_for_channel(
&self,
channel: &str,
message: &mut Message,
) -> Result<()> {
message.context["metadata"]["processed_at"] = json!(Utc::now().to_rfc3339());
message.context["metadata"]["engine_version"] = json!(env!("CARGO_PKG_VERSION"));
message.context["metadata"]["channel"] = json!(channel);
message.invalidate_context_cache();
if let Some(indices) = self.channel_index.get(channel) {
for &idx in indices {
self.workflow_executor
.execute(&self.workflows[idx], message)
.await?;
}
}
Ok(())
}
pub async fn process_message_for_channel_with_trace(
&self,
channel: &str,
message: &mut Message,
) -> Result<ExecutionTrace> {
use trace::ExecutionTrace;
message.context["metadata"]["processed_at"] = json!(Utc::now().to_rfc3339());
message.context["metadata"]["engine_version"] = json!(env!("CARGO_PKG_VERSION"));
message.context["metadata"]["channel"] = json!(channel);
message.invalidate_context_cache();
let mut trace = ExecutionTrace::new();
if let Some(indices) = self.channel_index.get(channel) {
for &idx in indices {
self.workflow_executor
.execute_with_trace(&self.workflows[idx], message, &mut trace)
.await?;
}
}
Ok(trace)
}
pub fn workflows(&self) -> &Arc<Vec<Workflow>> {
&self.workflows
}
pub fn workflow_by_id(&self, id: &str) -> Option<&Workflow> {
self.workflows.iter().find(|w| w.id == id)
}
pub fn datalogic(&self) -> &Arc<DataLogic> {
&self.datalogic
}
pub fn logic_cache(&self) -> &Vec<Arc<CompiledLogic>> {
&self.logic_cache
}
}