pub mod compiler;
pub mod error;
pub mod executor;
pub mod functions;
pub mod message;
pub mod task;
pub mod task_executor;
pub mod utils;
pub mod workflow;
pub mod workflow_executor;
pub use error::{DataflowError, ErrorInfo, Result};
pub use functions::{AsyncFunctionHandler, FunctionConfig};
pub use message::Message;
pub use task::Task;
pub use workflow::Workflow;
use chrono::Utc;
use datalogic_rs::{CompiledLogic, DataLogic};
use serde_json::json;
use std::collections::HashMap;
use std::sync::Arc;
use compiler::LogicCompiler;
use executor::InternalExecutor;
use task_executor::TaskExecutor;
use workflow_executor::WorkflowExecutor;
pub struct Engine {
workflows: Arc<HashMap<String, Workflow>>,
workflow_executor: Arc<WorkflowExecutor>,
datalogic: Arc<DataLogic>,
logic_cache: Vec<Arc<CompiledLogic>>,
}
impl Engine {
pub fn new(
workflows: Vec<Workflow>,
custom_functions: Option<HashMap<String, Box<dyn AsyncFunctionHandler + Send + Sync>>>,
) -> Self {
let mut compiler = LogicCompiler::new();
let workflow_map = compiler.compile_workflows(workflows);
let (datalogic, logic_cache) = compiler.into_parts();
let mut task_functions = custom_functions.unwrap_or_default();
for (name, handler) in functions::builtins::get_all_functions() {
task_functions.insert(name, handler);
}
let internal_executor = Arc::new(InternalExecutor::new(
Arc::clone(&datalogic),
logic_cache.clone(),
));
let task_executor = Arc::new(TaskExecutor::new(
Arc::new(task_functions),
Arc::clone(&internal_executor),
Arc::clone(&datalogic),
));
let workflow_executor = Arc::new(WorkflowExecutor::new(task_executor, internal_executor));
Self {
workflows: Arc::new(workflow_map),
workflow_executor,
datalogic,
logic_cache,
}
}
pub async fn process_message(&self, message: &mut Message) -> Result<()> {
message.context["metadata"]["processed_at"] = json!(Utc::now().to_rfc3339());
message.context["metadata"]["engine_version"] = json!(env!("CARGO_PKG_VERSION"));
message.invalidate_context_cache();
let mut workflows: Vec<_> = self.workflows.values().collect();
workflows.sort_by_key(|w| w.priority);
for workflow in workflows {
self.workflow_executor.execute(workflow, message).await?;
}
Ok(())
}
pub fn workflows(&self) -> &Arc<HashMap<String, Workflow>> {
&self.workflows
}
pub fn datalogic(&self) -> &Arc<DataLogic> {
&self.datalogic
}
pub fn logic_cache(&self) -> &Vec<Arc<CompiledLogic>> {
&self.logic_cache
}
}