use crate::engine::error::{DataflowError, Result};
use crate::engine::functions::integration::{EnrichConfig, HttpCallConfig, PublishKafkaConfig};
use crate::engine::functions::{FilterConfig, LogConfig, MapConfig, ValidationConfig};
use crate::engine::{FunctionConfig, Workflow};
use datalogic_rs::{Engine, Logic};
use log::debug;
use serde_json::Value;
use std::sync::Arc;
pub struct LogicCompiler {
engine: Arc<Engine>,
}
impl Default for LogicCompiler {
fn default() -> Self {
Self::new()
}
}
impl LogicCompiler {
pub fn new() -> Self {
Self {
engine: Arc::new(Engine::builder().with_templating(true).build()),
}
}
pub fn engine(&self) -> Arc<Engine> {
Arc::clone(&self.engine)
}
pub fn into_engine(self) -> Arc<Engine> {
self.engine
}
pub fn compile_workflows(&self, workflows: Vec<Workflow>) -> Result<Vec<Workflow>> {
let mut compiled_workflows = Vec::with_capacity(workflows.len());
for mut workflow in workflows {
workflow.validate()?;
workflow.id_arc = Arc::from(workflow.id.as_str());
for task in &mut workflow.tasks {
task.id_arc = Arc::from(task.id.as_str());
}
let label = format!("workflow {} condition", workflow.id);
workflow.compiled_condition = Some(self.compile(&workflow.condition, &label)?);
debug!("Workflow {} condition compiled", workflow.id);
self.compile_workflow_tasks(&mut workflow)?;
compiled_workflows.push(workflow);
}
compiled_workflows.sort_by_key(|w| w.priority);
Ok(compiled_workflows)
}
fn compile_workflow_tasks(&self, workflow: &mut Workflow) -> Result<()> {
for task in &mut workflow.tasks {
let label = format!("task {} condition (workflow {})", task.id, workflow.id);
task.compiled_condition = Some(self.compile(&task.condition, &label)?);
self.compile_function_logic(&mut task.function, &task.id, &workflow.id)?;
}
Ok(())
}
fn compile_function_logic(
&self,
function: &mut FunctionConfig,
task_id: &str,
workflow_id: &str,
) -> Result<()> {
match function {
FunctionConfig::Map { input, .. } => {
self.compile_map_logic(input, task_id, workflow_id)
}
FunctionConfig::Validation { input, .. } => {
self.compile_validation_logic(input, task_id, workflow_id)
}
FunctionConfig::Filter { input, .. } => {
self.compile_filter_logic(input, task_id, workflow_id)
}
FunctionConfig::Log { input, .. } => {
self.compile_log_logic(input, task_id, workflow_id)
}
FunctionConfig::HttpCall { input, .. } => {
self.compile_http_call_logic(input, task_id, workflow_id)
}
FunctionConfig::Enrich { input, .. } => {
self.compile_enrich_logic(input, task_id, workflow_id)
}
FunctionConfig::PublishKafka { input, .. } => {
self.compile_publish_kafka_logic(input, task_id, workflow_id)
}
_ => Ok(()),
}
}
fn compile(&self, logic: &Value, ctx_label: &str) -> Result<Arc<Logic>> {
self.engine
.compile_arc(logic)
.map_err(|e| DataflowError::LogicEvaluation(format!("{}: {}", ctx_label, e)))
}
fn compile_map_logic(
&self,
config: &mut MapConfig,
task_id: &str,
workflow_id: &str,
) -> Result<()> {
for mapping in &mut config.mappings {
let parts: Vec<Arc<str>> = mapping.path.split('.').map(Arc::from).collect();
mapping.path_parts = Arc::from(parts.into_boxed_slice());
mapping.path_arc = Arc::from(mapping.path.as_str());
let label = format!(
"map logic for task {} in workflow {} (path {})",
task_id, workflow_id, mapping.path
);
mapping.compiled_logic = Some(self.compile(&mapping.logic, &label)?);
}
Ok(())
}
fn compile_validation_logic(
&self,
config: &mut ValidationConfig,
task_id: &str,
workflow_id: &str,
) -> Result<()> {
for (idx, rule) in config.rules.iter_mut().enumerate() {
let label = format!(
"validation rule {} for task {} in workflow {}",
idx, task_id, workflow_id
);
rule.compiled_logic = Some(self.compile(&rule.logic, &label)?);
}
Ok(())
}
fn compile_log_logic(
&self,
config: &mut LogConfig,
task_id: &str,
workflow_id: &str,
) -> Result<()> {
let msg_label = format!(
"log message for task {} in workflow {}",
task_id, workflow_id
);
config.compiled_message = Some(self.compile(&config.message, &msg_label)?);
let mut compiled_fields = Vec::with_capacity(config.fields.len());
for (key, logic) in &config.fields {
let label = format!(
"log field '{}' for task {} in workflow {}",
key, task_id, workflow_id
);
compiled_fields.push((key.clone(), Some(self.compile(logic, &label)?)));
}
config.compiled_fields = compiled_fields;
Ok(())
}
fn compile_filter_logic(
&self,
config: &mut FilterConfig,
task_id: &str,
workflow_id: &str,
) -> Result<()> {
let label = format!(
"filter condition for task {} in workflow {}",
task_id, workflow_id
);
config.compiled_condition = Some(self.compile(&config.condition, &label)?);
Ok(())
}
fn compile_http_call_logic(
&self,
config: &mut HttpCallConfig,
task_id: &str,
workflow_id: &str,
) -> Result<()> {
if let Some(logic) = &config.path_logic {
let label = format!(
"http_call path_logic for task {} in workflow {}",
task_id, workflow_id
);
config.compiled_path_logic = Some(self.compile(logic, &label)?);
}
if let Some(logic) = &config.body_logic {
let label = format!(
"http_call body_logic for task {} in workflow {}",
task_id, workflow_id
);
config.compiled_body_logic = Some(self.compile(logic, &label)?);
}
Ok(())
}
fn compile_enrich_logic(
&self,
config: &mut EnrichConfig,
task_id: &str,
workflow_id: &str,
) -> Result<()> {
if let Some(logic) = &config.path_logic {
let label = format!(
"enrich path_logic for task {} in workflow {}",
task_id, workflow_id
);
config.compiled_path_logic = Some(self.compile(logic, &label)?);
}
Ok(())
}
fn compile_publish_kafka_logic(
&self,
config: &mut PublishKafkaConfig,
task_id: &str,
workflow_id: &str,
) -> Result<()> {
if let Some(logic) = &config.key_logic {
let label = format!(
"publish_kafka key_logic for task {} in workflow {}",
task_id, workflow_id
);
config.compiled_key_logic = Some(self.compile(logic, &label)?);
}
if let Some(logic) = &config.value_logic {
let label = format!(
"publish_kafka value_logic for task {} in workflow {}",
task_id, workflow_id
);
config.compiled_value_logic = Some(self.compile(logic, &label)?);
}
Ok(())
}
}