Skip to main content

dataflow_rs/engine/
compiler.rs

1//! # Workflow Compilation Module
2//!
3//! Pre-compiles all JSONLogic expressions used by workflows and tasks at engine
4//! initialization. Each compiled `Arc<Logic>` is stored directly on the
5//! workflow/task/config struct that owns it — no central `logic_cache`, no
6//! index lookup, no bounds check on the hot path. The `Engine` is wrapped in
7//! `Arc` and is `Send + Sync` so the entire stack is safe to share across
8//! Tokio worker threads.
9
10use crate::engine::error::{DataflowError, Result};
11use crate::engine::functions::integration::{EnrichConfig, HttpCallConfig, PublishKafkaConfig};
12use crate::engine::functions::{FilterConfig, LogConfig, MapConfig, ValidationConfig};
13use crate::engine::{FunctionConfig, Workflow};
14use datalogic_rs::{Engine, Logic};
15use log::debug;
16use serde_json::Value;
17use std::sync::Arc;
18
19/// Compiles JSONLogic expressions and stamps them onto workflow/task/config
20/// structs as `Option<Arc<Logic>>` slots.
21pub struct LogicCompiler {
22    /// Shared datalogic Engine used both for compilation and (later) evaluation.
23    engine: Arc<Engine>,
24}
25
26impl Default for LogicCompiler {
27    fn default() -> Self {
28        Self::new()
29    }
30}
31
32impl LogicCompiler {
33    /// Create a new LogicCompiler with a fresh datalogic `Engine` configured for
34    /// templating mode (preserves object structure in JSONLogic operations).
35    pub fn new() -> Self {
36        Self {
37            engine: Arc::new(Engine::builder().with_templating(true).build()),
38        }
39    }
40
41    /// Get the Engine instance
42    pub fn engine(&self) -> Arc<Engine> {
43        Arc::clone(&self.engine)
44    }
45
46    /// Consume the compiler and return the shared engine.
47    pub fn into_engine(self) -> Arc<Engine> {
48        self.engine
49    }
50
51    /// Compile all workflows and their tasks, returning them sorted by priority.
52    /// Returns `Err` on the first validation or compilation failure — engine
53    /// construction is fail-loud so misconfigured workflows can't silently
54    /// disappear at runtime.
55    pub fn compile_workflows(&self, workflows: Vec<Workflow>) -> Result<Vec<Workflow>> {
56        let mut compiled_workflows = Vec::with_capacity(workflows.len());
57
58        for mut workflow in workflows {
59            workflow.validate()?;
60
61            // Populate the cached Arc<str> ids so audit emission can refcount-bump
62            // rather than reallocate per AuditTrail entry.
63            workflow.id_arc = Arc::from(workflow.id.as_str());
64            for task in &mut workflow.tasks {
65                task.id_arc = Arc::from(task.id.as_str());
66            }
67
68            // Compile the workflow condition (required — defaults to `true`).
69            let label = format!("workflow {} condition", workflow.id);
70            workflow.compiled_condition = Some(self.compile(&workflow.condition, &label)?);
71            debug!("Workflow {} condition compiled", workflow.id);
72
73            // Compile task conditions and function-specific logic.
74            self.compile_workflow_tasks(&mut workflow)?;
75
76            compiled_workflows.push(workflow);
77        }
78
79        // Sort by priority once at construction time
80        compiled_workflows.sort_by_key(|w| w.priority);
81        Ok(compiled_workflows)
82    }
83
84    /// Compile task conditions and function logic for a workflow
85    fn compile_workflow_tasks(&self, workflow: &mut Workflow) -> Result<()> {
86        for task in &mut workflow.tasks {
87            let label = format!("task {} condition (workflow {})", task.id, workflow.id);
88            task.compiled_condition = Some(self.compile(&task.condition, &label)?);
89
90            // Compile function-specific logic (map transformations, validation rules, …)
91            self.compile_function_logic(&mut task.function, &task.id, &workflow.id)?;
92        }
93        Ok(())
94    }
95
96    /// Compile function-specific logic based on function type
97    fn compile_function_logic(
98        &self,
99        function: &mut FunctionConfig,
100        task_id: &str,
101        workflow_id: &str,
102    ) -> Result<()> {
103        match function {
104            FunctionConfig::Map { input, .. } => {
105                self.compile_map_logic(input, task_id, workflow_id)
106            }
107            FunctionConfig::Validation { input, .. } => {
108                self.compile_validation_logic(input, task_id, workflow_id)
109            }
110            FunctionConfig::Filter { input, .. } => {
111                self.compile_filter_logic(input, task_id, workflow_id)
112            }
113            FunctionConfig::Log { input, .. } => {
114                self.compile_log_logic(input, task_id, workflow_id)
115            }
116            FunctionConfig::HttpCall { input, .. } => {
117                self.compile_http_call_logic(input, task_id, workflow_id)
118            }
119            FunctionConfig::Enrich { input, .. } => {
120                self.compile_enrich_logic(input, task_id, workflow_id)
121            }
122            FunctionConfig::PublishKafka { input, .. } => {
123                self.compile_publish_kafka_logic(input, task_id, workflow_id)
124            }
125            // Custom and other functions don't need pre-compilation
126            _ => Ok(()),
127        }
128    }
129
130    /// Compile a JSONLogic expression and return the `Arc<Logic>`. Errors are
131    /// surfaced as `DataflowError::LogicEvaluation` with the supplied
132    /// context label for debugging.
133    fn compile(&self, logic: &Value, ctx_label: &str) -> Result<Arc<Logic>> {
134        self.engine
135            .compile_arc(logic)
136            .map_err(|e| DataflowError::LogicEvaluation(format!("{}: {}", ctx_label, e)))
137    }
138
139    /// Compile map transformation logic
140    fn compile_map_logic(
141        &self,
142        config: &mut MapConfig,
143        task_id: &str,
144        workflow_id: &str,
145    ) -> Result<()> {
146        for mapping in &mut config.mappings {
147            // Pre-split the dot path so the hot path doesn't re-split per
148            // write. The `#` prefix is preserved here — it's the explicit
149            // "treat this as an object key, not an array index" hint that
150            // `set_nested_value` consumes when deciding container shape; the
151            // strip happens at lookup time inside `*_parts` helpers.
152            let parts: Vec<Arc<str>> = mapping.path.split('.').map(Arc::from).collect();
153            mapping.path_parts = Arc::from(parts.into_boxed_slice());
154            mapping.path_arc = Arc::from(mapping.path.as_str());
155
156            let label = format!(
157                "map logic for task {} in workflow {} (path {})",
158                task_id, workflow_id, mapping.path
159            );
160            mapping.compiled_logic = Some(self.compile(&mapping.logic, &label)?);
161        }
162        Ok(())
163    }
164
165    /// Compile validation rule logic
166    fn compile_validation_logic(
167        &self,
168        config: &mut ValidationConfig,
169        task_id: &str,
170        workflow_id: &str,
171    ) -> Result<()> {
172        for (idx, rule) in config.rules.iter_mut().enumerate() {
173            let label = format!(
174                "validation rule {} for task {} in workflow {}",
175                idx, task_id, workflow_id
176            );
177            rule.compiled_logic = Some(self.compile(&rule.logic, &label)?);
178        }
179        Ok(())
180    }
181
182    /// Compile log message and field expressions
183    fn compile_log_logic(
184        &self,
185        config: &mut LogConfig,
186        task_id: &str,
187        workflow_id: &str,
188    ) -> Result<()> {
189        let msg_label = format!(
190            "log message for task {} in workflow {}",
191            task_id, workflow_id
192        );
193        config.compiled_message = Some(self.compile(&config.message, &msg_label)?);
194
195        // Compile each field expression. Collect into a fresh Vec, then
196        // assign — keeps the immutable borrow of `config.fields` from
197        // overlapping with the mutable borrow of `config.compiled_fields`.
198        let mut compiled_fields = Vec::with_capacity(config.fields.len());
199        for (key, logic) in &config.fields {
200            let label = format!(
201                "log field '{}' for task {} in workflow {}",
202                key, task_id, workflow_id
203            );
204            compiled_fields.push((key.clone(), Some(self.compile(logic, &label)?)));
205        }
206        config.compiled_fields = compiled_fields;
207        Ok(())
208    }
209
210    /// Compile filter condition logic
211    fn compile_filter_logic(
212        &self,
213        config: &mut FilterConfig,
214        task_id: &str,
215        workflow_id: &str,
216    ) -> Result<()> {
217        let label = format!(
218            "filter condition for task {} in workflow {}",
219            task_id, workflow_id
220        );
221        config.compiled_condition = Some(self.compile(&config.condition, &label)?);
222        Ok(())
223    }
224
225    /// Compile http_call JSONLogic expressions (path_logic, body_logic)
226    fn compile_http_call_logic(
227        &self,
228        config: &mut HttpCallConfig,
229        task_id: &str,
230        workflow_id: &str,
231    ) -> Result<()> {
232        if let Some(logic) = &config.path_logic {
233            let label = format!(
234                "http_call path_logic for task {} in workflow {}",
235                task_id, workflow_id
236            );
237            config.compiled_path_logic = Some(self.compile(logic, &label)?);
238        }
239        if let Some(logic) = &config.body_logic {
240            let label = format!(
241                "http_call body_logic for task {} in workflow {}",
242                task_id, workflow_id
243            );
244            config.compiled_body_logic = Some(self.compile(logic, &label)?);
245        }
246        Ok(())
247    }
248
249    /// Compile enrich JSONLogic expressions (path_logic)
250    fn compile_enrich_logic(
251        &self,
252        config: &mut EnrichConfig,
253        task_id: &str,
254        workflow_id: &str,
255    ) -> Result<()> {
256        if let Some(logic) = &config.path_logic {
257            let label = format!(
258                "enrich path_logic for task {} in workflow {}",
259                task_id, workflow_id
260            );
261            config.compiled_path_logic = Some(self.compile(logic, &label)?);
262        }
263        Ok(())
264    }
265
266    /// Compile publish_kafka JSONLogic expressions (key_logic, value_logic)
267    fn compile_publish_kafka_logic(
268        &self,
269        config: &mut PublishKafkaConfig,
270        task_id: &str,
271        workflow_id: &str,
272    ) -> Result<()> {
273        if let Some(logic) = &config.key_logic {
274            let label = format!(
275                "publish_kafka key_logic for task {} in workflow {}",
276                task_id, workflow_id
277            );
278            config.compiled_key_logic = Some(self.compile(logic, &label)?);
279        }
280        if let Some(logic) = &config.value_logic {
281            let label = format!(
282                "publish_kafka value_logic for task {} in workflow {}",
283                task_id, workflow_id
284            );
285            config.compiled_value_logic = Some(self.compile(logic, &label)?);
286        }
287        Ok(())
288    }
289}