use crate::engine::functions::integration::{EnrichConfig, HttpCallConfig, PublishKafkaConfig};
use crate::engine::functions::{FilterConfig, LogConfig, MapConfig, ValidationConfig};
use crate::engine::{FunctionConfig, Workflow};
use datalogic_rs::{CompiledLogic, DataLogic};
use log::{debug, error};
use serde_json::Value;
use std::sync::Arc;
pub struct LogicCompiler {
datalogic: Arc<DataLogic>,
logic_cache: Vec<Arc<CompiledLogic>>,
}
impl Default for LogicCompiler {
fn default() -> Self {
Self::new()
}
}
impl LogicCompiler {
pub fn new() -> Self {
Self {
datalogic: Arc::new(DataLogic::with_preserve_structure()),
logic_cache: Vec::new(),
}
}
pub fn datalogic(&self) -> Arc<DataLogic> {
Arc::clone(&self.datalogic)
}
pub fn logic_cache(&self) -> &Vec<Arc<CompiledLogic>> {
&self.logic_cache
}
pub fn into_parts(self) -> (Arc<DataLogic>, Vec<Arc<CompiledLogic>>) {
(self.datalogic, self.logic_cache)
}
pub fn compile_workflows(&mut self, workflows: Vec<Workflow>) -> Vec<Workflow> {
let mut compiled_workflows = Vec::new();
for mut workflow in workflows {
if let Err(e) = workflow.validate() {
error!("Invalid workflow {}: {:?}", workflow.id, e);
continue;
}
debug!(
"Compiling condition for workflow {}: {:?}",
workflow.id, workflow.condition
);
match self.compile_logic(&workflow.condition) {
Ok(index) => {
workflow.condition_index = index;
debug!(
"Workflow {} condition compiled at index {:?}",
workflow.id, index
);
self.compile_workflow_tasks(&mut workflow);
compiled_workflows.push(workflow);
}
Err(e) => {
error!(
"Failed to parse condition for workflow {}: {:?}",
workflow.id, e
);
}
}
}
compiled_workflows.sort_by_key(|w| w.priority);
compiled_workflows
}
fn compile_workflow_tasks(&mut self, workflow: &mut Workflow) {
for task in &mut workflow.tasks {
debug!(
"Compiling condition for task {} in workflow {}: {:?}",
task.id, workflow.id, task.condition
);
match self.compile_logic(&task.condition) {
Ok(index) => {
task.condition_index = index;
debug!("Task {} condition compiled at index {:?}", task.id, index);
}
Err(e) => {
error!(
"Failed to parse condition for task {} in workflow {}: {:?}",
task.id, workflow.id, e
);
}
}
self.compile_function_logic(&mut task.function, &task.id, &workflow.id);
}
}
fn compile_function_logic(
&mut self,
function: &mut FunctionConfig,
task_id: &str,
workflow_id: &str,
) {
match function {
FunctionConfig::Map { input, .. } => {
self.compile_map_logic(input, task_id, workflow_id);
}
FunctionConfig::Validation { input, .. } => {
self.compile_validation_logic(input, task_id, workflow_id);
}
FunctionConfig::Filter { input, .. } => {
self.compile_filter_logic(input, task_id, workflow_id);
}
FunctionConfig::Log { input, .. } => {
self.compile_log_logic(input, task_id, workflow_id);
}
FunctionConfig::HttpCall { input, .. } => {
self.compile_http_call_logic(input, task_id, workflow_id);
}
FunctionConfig::Enrich { input, .. } => {
self.compile_enrich_logic(input, task_id, workflow_id);
}
FunctionConfig::PublishKafka { input, .. } => {
self.compile_publish_kafka_logic(input, task_id, workflow_id);
}
_ => {
}
}
}
fn compile_map_logic(&mut self, config: &mut MapConfig, task_id: &str, workflow_id: &str) {
for mapping in &mut config.mappings {
debug!(
"Compiling map logic for task {} in workflow {}: {:?}",
task_id, workflow_id, mapping.logic
);
match self.compile_logic(&mapping.logic) {
Ok(index) => {
mapping.logic_index = index;
debug!(
"Map logic for task {} compiled at index {:?}",
task_id, index
);
}
Err(e) => {
error!(
"Failed to parse map logic for task {} in workflow {}: {:?}",
task_id, workflow_id, e
);
}
}
}
}
fn compile_validation_logic(
&mut self,
config: &mut ValidationConfig,
task_id: &str,
workflow_id: &str,
) {
for rule in &mut config.rules {
debug!(
"Compiling validation logic for task {} in workflow {}: {:?}",
task_id, workflow_id, rule.logic
);
match self.compile_logic(&rule.logic) {
Ok(index) => {
rule.logic_index = index;
debug!(
"Validation logic for task {} compiled at index {:?}",
task_id, index
);
}
Err(e) => {
error!(
"Failed to parse validation logic for task {} in workflow {}: {:?}",
task_id, workflow_id, e
);
}
}
}
}
fn compile_log_logic(&mut self, config: &mut LogConfig, task_id: &str, workflow_id: &str) {
debug!(
"Compiling log message for task {} in workflow {}: {:?}",
task_id, workflow_id, config.message
);
match self.compile_logic(&config.message) {
Ok(index) => {
config.message_index = index;
debug!(
"Log message for task {} compiled at index {:?}",
task_id, index
);
}
Err(e) => {
error!(
"Failed to compile log message for task {} in workflow {}: {:?}",
task_id, workflow_id, e
);
}
}
config.field_indices = config
.fields
.iter()
.map(|(key, logic)| {
let idx = match self.compile_logic(logic) {
Ok(index) => {
debug!(
"Log field '{}' for task {} compiled at index {:?}",
key, task_id, index
);
index
}
Err(e) => {
error!(
"Failed to compile log field '{}' for task {} in workflow {}: {:?}",
key, task_id, workflow_id, e
);
None
}
};
(key.clone(), idx)
})
.collect();
}
fn compile_filter_logic(
&mut self,
config: &mut FilterConfig,
task_id: &str,
workflow_id: &str,
) {
debug!(
"Compiling filter condition for task {} in workflow {}: {:?}",
task_id, workflow_id, config.condition
);
match self.compile_logic(&config.condition) {
Ok(index) => {
config.condition_index = index;
debug!(
"Filter condition for task {} compiled at index {:?}",
task_id, index
);
}
Err(e) => {
error!(
"Failed to compile filter condition for task {} in workflow {}: {:?}",
task_id, workflow_id, e
);
}
}
}
fn compile_http_call_logic(
&mut self,
config: &mut HttpCallConfig,
task_id: &str,
workflow_id: &str,
) {
if let Some(ref logic) = config.path_logic.clone() {
match self.compile_logic(logic) {
Ok(index) => config.path_logic_index = index,
Err(e) => error!(
"Failed to compile http_call path_logic for task {} in workflow {}: {:?}",
task_id, workflow_id, e
),
}
}
if let Some(ref logic) = config.body_logic.clone() {
match self.compile_logic(logic) {
Ok(index) => config.body_logic_index = index,
Err(e) => error!(
"Failed to compile http_call body_logic for task {} in workflow {}: {:?}",
task_id, workflow_id, e
),
}
}
}
fn compile_enrich_logic(
&mut self,
config: &mut EnrichConfig,
task_id: &str,
workflow_id: &str,
) {
if let Some(ref logic) = config.path_logic.clone() {
match self.compile_logic(logic) {
Ok(index) => config.path_logic_index = index,
Err(e) => error!(
"Failed to compile enrich path_logic for task {} in workflow {}: {:?}",
task_id, workflow_id, e
),
}
}
}
fn compile_publish_kafka_logic(
&mut self,
config: &mut PublishKafkaConfig,
task_id: &str,
workflow_id: &str,
) {
if let Some(ref logic) = config.key_logic.clone() {
match self.compile_logic(logic) {
Ok(index) => config.key_logic_index = index,
Err(e) => error!(
"Failed to compile publish_kafka key_logic for task {} in workflow {}: {:?}",
task_id, workflow_id, e
),
}
}
if let Some(ref logic) = config.value_logic.clone() {
match self.compile_logic(logic) {
Ok(index) => config.value_logic_index = index,
Err(e) => error!(
"Failed to compile publish_kafka value_logic for task {} in workflow {}: {:?}",
task_id, workflow_id, e
),
}
}
}
fn compile_logic(&mut self, logic: &Value) -> Result<Option<usize>, String> {
match self.datalogic.compile(logic) {
Ok(compiled) => {
let index = self.logic_cache.len();
self.logic_cache.push(compiled);
Ok(Some(index))
}
Err(e) => Err(format!("Failed to compile logic: {}", e)),
}
}
}