pub mod compiler;
pub mod error;
pub mod executor;
pub mod functions;
pub mod message;
pub mod task;
pub mod task_context;
pub mod task_executor;
pub mod task_outcome;
pub mod trace;
pub mod utils;
pub mod workflow;
pub mod workflow_executor;
pub use error::{DataflowError, ErrorInfo, Result};
pub use functions::{
AsyncFunctionHandler, BoxedFunctionHandler, CompiledCustomInput, DynAsyncFunctionHandler,
FunctionConfig,
};
pub use message::Message;
pub use task::Task;
pub use task_context::TaskContext;
pub use task_outcome::TaskOutcome;
pub use trace::{ExecutionStep, ExecutionTrace, StepResult};
pub use workflow::{Workflow, WorkflowStatus};
use chrono::Utc;
use datalogic_rs::Engine as DatalogicEngine;
use datavalue::OwnedDataValue;
use std::collections::HashMap;
use std::sync::Arc;
use compiler::LogicCompiler;
use task_executor::TaskExecutor;
use utils::set_nested_value;
use workflow_executor::WorkflowExecutor;
pub struct Engine {
workflows: Arc<Vec<Workflow>>,
channel_index: Arc<HashMap<String, Vec<usize>>>,
workflow_executor: Arc<WorkflowExecutor>,
datalogic: Arc<DatalogicEngine>,
engine_version: Arc<OwnedDataValue>,
}
fn build_channel_index(workflows: &[Workflow]) -> HashMap<String, Vec<usize>> {
let mut index: HashMap<String, Vec<usize>> = HashMap::new();
for (i, workflow) in workflows.iter().enumerate() {
if workflow.status == WorkflowStatus::Active {
index.entry(workflow.channel.clone()).or_default().push(i);
}
}
index
}
impl Engine {
pub fn new(
workflows: Vec<Workflow>,
custom_functions: HashMap<String, BoxedFunctionHandler>,
) -> Result<Self> {
let compiler = LogicCompiler::new();
let mut sorted_workflows = compiler.compile_workflows(workflows)?;
let datalogic = compiler.into_engine();
let task_functions = custom_functions;
precompile_custom_inputs(&mut sorted_workflows, &task_functions)?;
let task_executor = Arc::new(TaskExecutor::new(
Arc::new(task_functions),
Arc::clone(&datalogic),
));
let workflow_executor =
Arc::new(WorkflowExecutor::new(task_executor, Arc::clone(&datalogic)));
let channel_index = build_channel_index(&sorted_workflows);
Ok(Self {
workflows: Arc::new(sorted_workflows),
channel_index: Arc::new(channel_index),
workflow_executor,
datalogic,
engine_version: Arc::new(OwnedDataValue::String(
env!("CARGO_PKG_VERSION").to_string(),
)),
})
}
pub fn builder() -> EngineBuilder {
EngineBuilder::new()
}
pub fn engine_version_value(&self) -> &OwnedDataValue {
&self.engine_version
}
pub fn with_new_workflows(&self, workflows: Vec<Workflow>) -> Result<Self> {
let task_functions = self.workflow_executor.task_functions();
let compiler = LogicCompiler::new();
let mut sorted_workflows = compiler.compile_workflows(workflows)?;
let datalogic = compiler.into_engine();
precompile_custom_inputs(&mut sorted_workflows, &task_functions)?;
let task_executor = Arc::new(TaskExecutor::new(task_functions, Arc::clone(&datalogic)));
let workflow_executor =
Arc::new(WorkflowExecutor::new(task_executor, Arc::clone(&datalogic)));
let channel_index = build_channel_index(&sorted_workflows);
Ok(Self {
workflows: Arc::new(sorted_workflows),
channel_index: Arc::new(channel_index),
workflow_executor,
datalogic,
engine_version: Arc::clone(&self.engine_version),
})
}
pub async fn process_message(&self, message: &mut Message) -> Result<()> {
let now = Utc::now();
set_processing_metadata(&mut message.context, &self.engine_version, now, None);
for workflow in self.workflows.iter() {
self.workflow_executor
.execute(workflow, message, now)
.await?;
}
Ok(())
}
pub async fn process_message_with_trace(
&self,
message: &mut Message,
) -> Result<ExecutionTrace> {
use trace::ExecutionTrace;
let now = Utc::now();
set_processing_metadata(&mut message.context, &self.engine_version, now, None);
let mut trace = ExecutionTrace::new();
for workflow in self.workflows.iter() {
self.workflow_executor
.execute_with_trace(workflow, message, &mut trace, now)
.await?;
}
Ok(trace)
}
pub async fn process_message_for_channel(
&self,
channel: &str,
message: &mut Message,
) -> Result<()> {
let now = Utc::now();
set_processing_metadata(
&mut message.context,
&self.engine_version,
now,
Some(channel),
);
if let Some(indices) = self.channel_index.get(channel) {
for &idx in indices {
self.workflow_executor
.execute(&self.workflows[idx], message, now)
.await?;
}
}
Ok(())
}
pub async fn process_message_for_channel_with_trace(
&self,
channel: &str,
message: &mut Message,
) -> Result<ExecutionTrace> {
use trace::ExecutionTrace;
let now = Utc::now();
set_processing_metadata(
&mut message.context,
&self.engine_version,
now,
Some(channel),
);
let mut trace = ExecutionTrace::new();
if let Some(indices) = self.channel_index.get(channel) {
for &idx in indices {
self.workflow_executor
.execute_with_trace(&self.workflows[idx], message, &mut trace, now)
.await?;
}
}
Ok(trace)
}
pub fn workflows(&self) -> &Arc<Vec<Workflow>> {
&self.workflows
}
pub fn workflow_by_id(&self, id: &str) -> Option<&Workflow> {
self.workflows.iter().find(|w| w.id == id)
}
pub fn datalogic(&self) -> &Arc<DatalogicEngine> {
&self.datalogic
}
}
#[must_use = "EngineBuilder must be `.build()` to produce an Engine"]
#[derive(Default)]
pub struct EngineBuilder {
workflows: Vec<Workflow>,
handlers: HashMap<String, BoxedFunctionHandler>,
}
impl EngineBuilder {
pub fn new() -> Self {
Self::default()
}
pub fn register<F>(mut self, name: impl Into<String>, handler: F) -> Self
where
F: AsyncFunctionHandler,
{
self.handlers.insert(name.into(), Box::new(handler));
self
}
pub fn register_boxed(
mut self,
name: impl Into<String>,
handler: BoxedFunctionHandler,
) -> Self {
self.handlers.insert(name.into(), handler);
self
}
pub fn with_workflow(mut self, workflow: Workflow) -> Self {
self.workflows.push(workflow);
self
}
pub fn with_workflows<I>(mut self, workflows: I) -> Self
where
I: IntoIterator<Item = Workflow>,
{
self.workflows.extend(workflows);
self
}
pub fn build(self) -> Result<Engine> {
Engine::new(self.workflows, self.handlers)
}
}
fn precompile_custom_inputs(
workflows: &mut [Workflow],
handlers: &HashMap<String, BoxedFunctionHandler>,
) -> Result<()> {
for workflow in workflows {
for task in &mut workflow.tasks {
if let FunctionConfig::Custom {
name,
input,
compiled_input,
} = &mut task.function
{
let handler = handlers
.get(name)
.ok_or_else(|| function_not_found_error(name, handlers))?;
let parsed = handler.parse_input_box(input)?;
*compiled_input = Some(CompiledCustomInput(Arc::from(parsed)));
}
}
}
Ok(())
}
fn function_not_found_error(
name: &str,
handlers: &HashMap<String, BoxedFunctionHandler>,
) -> DataflowError {
use crate::engine::functions::config::BUILTIN_FUNCTION_NAMES;
let mut registered: Vec<&str> = handlers.keys().map(String::as_str).collect();
registered.sort_unstable();
let registered_part = if registered.is_empty() {
String::from("none")
} else {
registered.join(", ")
};
DataflowError::FunctionNotFound(format!(
"{name} (registered handlers: {registered_part}; built-ins: {})",
BUILTIN_FUNCTION_NAMES.join(", ")
))
}
fn set_processing_metadata(
context: &mut OwnedDataValue,
engine_version: &Arc<OwnedDataValue>,
now: chrono::DateTime<Utc>,
channel: Option<&str>,
) {
set_nested_value(
context,
"metadata.processed_at",
OwnedDataValue::String(now.to_rfc3339()),
);
set_nested_value(
context,
"metadata.engine_version",
(**engine_version).clone(),
);
if let Some(channel) = channel {
set_nested_value(
context,
"metadata.channel",
OwnedDataValue::String(channel.to_string()),
);
}
}