Skip to main content

dataflow_rs/engine/
compiler.rs

1//! # Workflow Compilation Module
2//!
3//! Pre-compiles all JSONLogic expressions used by workflows and tasks at engine
4//! initialization. Each compiled `Arc<Logic>` is stored directly on the
5//! workflow/task/config struct that owns it — no central `logic_cache`, no
6//! index lookup, no bounds check on the hot path. The `Engine` is wrapped in
7//! `Arc` and is `Send + Sync` so the entire stack is safe to share across
8//! Tokio worker threads.
9
10use crate::engine::error::{DataflowError, Result};
11use crate::engine::functions::integration::{EnrichConfig, HttpCallConfig, PublishKafkaConfig};
12use crate::engine::functions::{FilterConfig, LogConfig, MapConfig, ValidationConfig};
13use crate::engine::{FunctionConfig, Workflow};
14use datalogic_rs::{Engine, Logic};
15use log::debug;
16use serde_json::Value;
17use std::sync::Arc;
18
19/// Compiles JSONLogic expressions and stamps them onto workflow/task/config
20/// structs as `Option<Arc<Logic>>` slots.
21pub struct LogicCompiler {
22    /// Shared datalogic Engine used both for compilation and (later) evaluation.
23    engine: Arc<Engine>,
24}
25
26impl Default for LogicCompiler {
27    fn default() -> Self {
28        Self::new()
29    }
30}
31
32impl LogicCompiler {
33    /// Create a new LogicCompiler with a fresh datalogic `Engine` configured for
34    /// templating mode (preserves object structure in JSONLogic operations).
35    pub fn new() -> Self {
36        Self {
37            engine: Arc::new(Engine::builder().with_templating(true).build()),
38        }
39    }
40
41    /// Get the Engine instance
42    pub fn engine(&self) -> Arc<Engine> {
43        Arc::clone(&self.engine)
44    }
45
46    /// Consume the compiler and return the shared engine.
47    pub fn into_engine(self) -> Arc<Engine> {
48        self.engine
49    }
50
51    /// Compile all workflows and their tasks, returning them sorted by priority.
52    /// Returns `Err` on the first validation or compilation failure — engine
53    /// construction is fail-loud so misconfigured workflows can't silently
54    /// disappear at runtime.
55    pub fn compile_workflows(&self, workflows: Vec<Workflow>) -> Result<Vec<Workflow>> {
56        let mut compiled_workflows = Vec::with_capacity(workflows.len());
57
58        for mut workflow in workflows {
59            workflow.validate()?;
60
61            // Populate the cached Arc<str> ids so audit emission can refcount-bump
62            // rather than reallocate per AuditTrail entry.
63            workflow.id_arc = Arc::from(workflow.id.as_str());
64            for task in &mut workflow.tasks {
65                task.id_arc = Arc::from(task.id.as_str());
66            }
67
68            // Compile the workflow condition (defaults to `true`, which folds
69            // to `None` so the hot path skips the eval — see `compile_condition`).
70            let label = format!("workflow {} condition", workflow.id);
71            workflow.compiled_condition = self.compile_condition(&workflow.condition, &label)?;
72            debug!("Workflow {} condition compiled", workflow.id);
73
74            // Compile task conditions and function-specific logic.
75            self.compile_workflow_tasks(&mut workflow)?;
76
77            // Stamp whether every task is a synchronous built-in. A fully-sync
78            // workflow can be folded into a shared cross-workflow `with_arena`
79            // scope (no `.await`), so the message context is deep-walked into
80            // the arena once per *run* of consecutive fully-sync workflows
81            // instead of once per workflow. Any async/custom task forces the
82            // per-workflow `.await` path.
83            workflow.fully_sync = workflow.tasks.iter().all(|t| t.function.is_sync_builtin());
84
85            compiled_workflows.push(workflow);
86        }
87
88        // Sort by priority once at construction time
89        compiled_workflows.sort_by_key(|w| w.priority);
90        Ok(compiled_workflows)
91    }
92
93    /// Compile task conditions and function logic for a workflow
94    fn compile_workflow_tasks(&self, workflow: &mut Workflow) -> Result<()> {
95        for task in &mut workflow.tasks {
96            let label = format!("task {} condition (workflow {})", task.id, workflow.id);
97            task.compiled_condition = self.compile_condition(&task.condition, &label)?;
98
99            // Compile function-specific logic (map transformations, validation rules, …)
100            self.compile_function_logic(&mut task.function, &task.id, &workflow.id)?;
101        }
102        Ok(())
103    }
104
105    /// Compile function-specific logic based on function type
106    fn compile_function_logic(
107        &self,
108        function: &mut FunctionConfig,
109        task_id: &str,
110        workflow_id: &str,
111    ) -> Result<()> {
112        match function {
113            FunctionConfig::Map { input, .. } => {
114                self.compile_map_logic(input, task_id, workflow_id)
115            }
116            FunctionConfig::Validation { input, .. } => {
117                self.compile_validation_logic(input, task_id, workflow_id)
118            }
119            FunctionConfig::Filter { input, .. } => {
120                self.compile_filter_logic(input, task_id, workflow_id)
121            }
122            FunctionConfig::Log { input, .. } => {
123                self.compile_log_logic(input, task_id, workflow_id)
124            }
125            FunctionConfig::HttpCall { input, .. } => {
126                self.compile_http_call_logic(input, task_id, workflow_id)
127            }
128            FunctionConfig::Enrich { input, .. } => {
129                self.compile_enrich_logic(input, task_id, workflow_id)
130            }
131            FunctionConfig::PublishKafka { input, .. } => {
132                self.compile_publish_kafka_logic(input, task_id, workflow_id)
133            }
134            // Custom and other functions don't need pre-compilation
135            _ => Ok(()),
136        }
137    }
138
139    /// Compile a JSONLogic expression and return the `Arc<Logic>`. Errors are
140    /// surfaced as `DataflowError::LogicEvaluation` with the supplied
141    /// context label for debugging.
142    fn compile(&self, logic: &Value, ctx_label: &str) -> Result<Arc<Logic>> {
143        self.engine
144            .compile_arc(logic)
145            .map_err(|e| DataflowError::LogicEvaluation(format!("{}: {}", ctx_label, e)))
146    }
147
148    /// Compile a workflow/task *condition*, returning `None` when the source is
149    /// the literal `true`. A `None` condition is treated as "always run" by
150    /// `evaluate_condition` / `evaluate_condition_in_arena`, so the hot path
151    /// skips the `engine.evaluate` call — and, in the sync stretch, the
152    /// per-task arena context slice build — entirely for the overwhelmingly
153    /// common default `condition: true`. datalogic already folds a literal
154    /// `true` to a near-free literal-fast-path eval; this avoids even setting
155    /// up the call. Non-literal conditions (including `false` and any real
156    /// expression) compile as normal.
157    fn compile_condition(&self, condition: &Value, ctx_label: &str) -> Result<Option<Arc<Logic>>> {
158        if matches!(condition, Value::Bool(true)) {
159            return Ok(None);
160        }
161        Ok(Some(self.compile(condition, ctx_label)?))
162    }
163
164    /// Compile map transformation logic
165    fn compile_map_logic(
166        &self,
167        config: &mut MapConfig,
168        task_id: &str,
169        workflow_id: &str,
170    ) -> Result<()> {
171        for mapping in &mut config.mappings {
172            // Pre-split the dot path so the hot path doesn't re-split per
173            // write. The `#` prefix is preserved here — it's the explicit
174            // "treat this as an object key, not an array index" hint that
175            // `set_nested_value` consumes when deciding container shape; the
176            // strip happens at lookup time inside `*_parts` helpers.
177            let parts: Vec<Arc<str>> = mapping.path.split('.').map(Arc::from).collect();
178            mapping.path_parts = Arc::from(parts.into_boxed_slice());
179            mapping.path_arc = Arc::from(mapping.path.as_str());
180
181            let label = format!(
182                "map logic for task {} in workflow {} (path {})",
183                task_id, workflow_id, mapping.path
184            );
185            mapping.compiled_logic = Some(self.compile(&mapping.logic, &label)?);
186        }
187        Ok(())
188    }
189
190    /// Compile validation rule logic
191    fn compile_validation_logic(
192        &self,
193        config: &mut ValidationConfig,
194        task_id: &str,
195        workflow_id: &str,
196    ) -> Result<()> {
197        for (idx, rule) in config.rules.iter_mut().enumerate() {
198            let label = format!(
199                "validation rule {} for task {} in workflow {}",
200                idx, task_id, workflow_id
201            );
202            rule.compiled_logic = Some(self.compile(&rule.logic, &label)?);
203        }
204        Ok(())
205    }
206
207    /// Compile log message and field expressions
208    fn compile_log_logic(
209        &self,
210        config: &mut LogConfig,
211        task_id: &str,
212        workflow_id: &str,
213    ) -> Result<()> {
214        let msg_label = format!(
215            "log message for task {} in workflow {}",
216            task_id, workflow_id
217        );
218        config.compiled_message = Some(self.compile(&config.message, &msg_label)?);
219
220        // Compile each field expression. Collect into a fresh Vec, then
221        // assign — keeps the immutable borrow of `config.fields` from
222        // overlapping with the mutable borrow of `config.compiled_fields`.
223        let mut compiled_fields = Vec::with_capacity(config.fields.len());
224        for (key, logic) in &config.fields {
225            let label = format!(
226                "log field '{}' for task {} in workflow {}",
227                key, task_id, workflow_id
228            );
229            compiled_fields.push((key.clone(), Some(self.compile(logic, &label)?)));
230        }
231        config.compiled_fields = compiled_fields;
232        Ok(())
233    }
234
235    /// Compile filter condition logic
236    fn compile_filter_logic(
237        &self,
238        config: &mut FilterConfig,
239        task_id: &str,
240        workflow_id: &str,
241    ) -> Result<()> {
242        let label = format!(
243            "filter condition for task {} in workflow {}",
244            task_id, workflow_id
245        );
246        config.compiled_condition = Some(self.compile(&config.condition, &label)?);
247        Ok(())
248    }
249
250    /// Compile http_call JSONLogic expressions (path_logic, body_logic)
251    fn compile_http_call_logic(
252        &self,
253        config: &mut HttpCallConfig,
254        task_id: &str,
255        workflow_id: &str,
256    ) -> Result<()> {
257        if let Some(logic) = &config.path_logic {
258            let label = format!(
259                "http_call path_logic for task {} in workflow {}",
260                task_id, workflow_id
261            );
262            config.compiled_path_logic = Some(self.compile(logic, &label)?);
263        }
264        if let Some(logic) = &config.body_logic {
265            let label = format!(
266                "http_call body_logic for task {} in workflow {}",
267                task_id, workflow_id
268            );
269            config.compiled_body_logic = Some(self.compile(logic, &label)?);
270        }
271        Ok(())
272    }
273
274    /// Compile enrich JSONLogic expressions (path_logic)
275    fn compile_enrich_logic(
276        &self,
277        config: &mut EnrichConfig,
278        task_id: &str,
279        workflow_id: &str,
280    ) -> Result<()> {
281        if let Some(logic) = &config.path_logic {
282            let label = format!(
283                "enrich path_logic for task {} in workflow {}",
284                task_id, workflow_id
285            );
286            config.compiled_path_logic = Some(self.compile(logic, &label)?);
287        }
288        Ok(())
289    }
290
291    /// Compile publish_kafka JSONLogic expressions (key_logic, value_logic)
292    fn compile_publish_kafka_logic(
293        &self,
294        config: &mut PublishKafkaConfig,
295        task_id: &str,
296        workflow_id: &str,
297    ) -> Result<()> {
298        if let Some(logic) = &config.key_logic {
299            let label = format!(
300                "publish_kafka key_logic for task {} in workflow {}",
301                task_id, workflow_id
302            );
303            config.compiled_key_logic = Some(self.compile(logic, &label)?);
304        }
305        if let Some(logic) = &config.value_logic {
306            let label = format!(
307                "publish_kafka value_logic for task {} in workflow {}",
308                task_id, workflow_id
309            );
310            config.compiled_value_logic = Some(self.compile(logic, &label)?);
311        }
312        Ok(())
313    }
314}