1pub mod error;
18pub mod functions;
19pub mod message;
20pub mod task;
21pub mod workflow;
22
23pub use error::{DataflowError, ErrorInfo, Result};
25pub use functions::AsyncFunctionHandler;
26pub use message::Message;
27pub use task::Task;
28pub use workflow::Workflow;
29
30pub use datalogic_rs as jsonlogic;
32
33use chrono::Utc;
34use datalogic_rs::DataLogic;
35use futures::{stream::FuturesUnordered, StreamExt};
36use log::{debug, error, info, warn};
37use message::AuditTrail;
38use serde_json::{json, Map, Number, Value};
39use std::{cell::RefCell, collections::HashMap};
40use tokio::time::sleep;
41
42thread_local! {
44 static THREAD_LOCAL_DATA_LOGIC: RefCell<DataLogic> = RefCell::new(DataLogic::new());
45}
46
47#[derive(Debug, Clone)]
49pub struct RetryConfig {
50 pub max_retries: u32,
52 pub retry_delay_ms: u64,
54 pub use_backoff: bool,
56}
57
58impl Default for RetryConfig {
59 fn default() -> Self {
60 Self {
61 max_retries: 3,
62 retry_delay_ms: 1000,
63 use_backoff: true,
64 }
65 }
66}
67
68pub struct Engine {
73 workflows: HashMap<String, Workflow>,
75 task_functions: HashMap<String, Box<dyn AsyncFunctionHandler + Send + Sync>>,
77 retry_config: RetryConfig,
79 max_concurrency: usize,
81}
82
83impl Default for Engine {
84 fn default() -> Self {
85 Self::new()
86 }
87}
88
89impl Engine {
90 pub fn new() -> Self {
100 let mut engine = Self {
101 workflows: HashMap::new(),
102 task_functions: HashMap::new(),
103 retry_config: RetryConfig::default(),
104 max_concurrency: 10, };
106
107 for (name, handler) in functions::builtins::get_all_functions() {
109 engine.register_task_function(name, handler);
110 }
111
112 engine
113 }
114
115 pub fn new_empty() -> Self {
117 Self {
118 task_functions: HashMap::new(),
119 workflows: HashMap::new(),
120 retry_config: RetryConfig::default(),
121 max_concurrency: 10, }
123 }
124
125 pub fn with_max_concurrency(mut self, max_concurrency: usize) -> Self {
127 self.max_concurrency = max_concurrency;
128 self
129 }
130
131 pub fn with_retry_config(mut self, config: RetryConfig) -> Self {
133 self.retry_config = config;
134 self
135 }
136
137 pub fn add_workflow(&mut self, workflow: &Workflow) {
143 if workflow.validate().is_ok() {
144 self.workflows.insert(workflow.id.clone(), workflow.clone());
145 } else {
146 error!("Invalid workflow: {}", workflow.id);
147 }
148 }
149
150 pub fn register_task_function(
157 &mut self,
158 name: String,
159 handler: Box<dyn AsyncFunctionHandler + Send + Sync>,
160 ) {
161 self.task_functions.insert(name, handler);
162 }
163
164 pub fn has_function(&self, name: &str) -> bool {
166 self.task_functions.contains_key(name)
167 }
168
169 pub async fn process_message(&self, message: &mut Message) -> Result<()> {
184 debug!("Processing message {} asynchronously", message.id);
185
186 let mut workflow_futures = FuturesUnordered::new();
188
189 let mut workflows_to_process = Vec::new();
191
192 for workflow in self.workflows.values() {
193 let condition = workflow.condition.clone().unwrap_or(Value::Bool(true));
195 let metadata_ref = &message.metadata;
196
197 if !self.evaluate_condition(&condition, metadata_ref).await? {
198 debug!("Workflow {} skipped - condition not met", workflow.id);
199 continue;
200 }
201
202 info!("Preparing to process workflow {}", workflow.id);
203 workflows_to_process.push(workflow.clone());
204 }
205
206 let engine_task_functions = &self.task_functions;
208
209 let initial_count = self.max_concurrency.min(workflows_to_process.len());
211 for workflow in workflows_to_process.iter().take(initial_count) {
212 let message_clone = message.clone();
213
214 workflow_futures.push(Self::process_workflow(
215 workflow.clone(),
216 message_clone,
217 engine_task_functions,
218 ));
219 }
220
221 let mut next_workflow_index = initial_count;
223
224 while let Some((workflow_id, workflow_message)) = workflow_futures.next().await {
226 message.data = workflow_message.data;
228 message.metadata = workflow_message.metadata;
229 message.temp_data = workflow_message.temp_data;
230 message.audit_trail.extend(workflow_message.audit_trail);
231 message.errors.extend(workflow_message.errors);
232
233 info!("Completed processing workflow {}", workflow_id);
234
235 if next_workflow_index < workflows_to_process.len() {
237 let workflow = workflows_to_process[next_workflow_index].clone();
238 next_workflow_index += 1;
239
240 let message_clone = message.clone();
241
242 workflow_futures.push(Self::process_workflow(
243 workflow,
244 message_clone,
245 &self.task_functions,
246 ));
247 }
248 }
249
250 Ok(())
251 }
252
253 async fn process_workflow(
255 workflow: Workflow,
256 mut message: Message,
257 task_functions: &HashMap<String, Box<dyn AsyncFunctionHandler + Send + Sync>>,
258 ) -> (String, Message) {
259 let workflow_id = workflow.id.clone();
260 let mut workflow_errors = Vec::new();
261
262 for task in &workflow.tasks {
266 let task_condition = task.condition.clone().unwrap_or(Value::Bool(true));
267
268 let should_execute = THREAD_LOCAL_DATA_LOGIC.with(|data_logic_cell| {
270 let data_logic = data_logic_cell.borrow_mut();
271 data_logic
272 .evaluate_json(&task_condition, &message.metadata, None)
273 .map_err(|e| {
274 DataflowError::LogicEvaluation(format!("Error evaluating condition: {}", e))
275 })
276 .map(|result| result.as_bool().unwrap_or(false))
277 });
278
279 let should_execute = match should_execute {
281 Ok(result) => result,
282 Err(e) => {
283 workflow_errors.push(ErrorInfo::new(
284 Some(workflow_id.clone()),
285 Some(task.id.clone()),
286 e.clone(),
287 ));
288 false
289 }
290 };
291
292 if !should_execute {
293 debug!("Task {} skipped - condition not met", task.id);
294 continue;
295 }
296
297 if let Some(function) = task_functions.get(&task.function.name) {
299 let task_id = task.id.clone();
300 let function_input = task.function.input.clone();
301
302 match Self::execute_task_static(
304 &task_id,
305 &workflow_id,
306 &mut message,
307 &function_input,
308 function.as_ref(),
309 )
310 .await
311 {
312 Ok(_) => {
313 debug!("Task {} completed successfully", task_id);
314 }
315 Err(error) => {
316 workflow_errors.push(ErrorInfo::new(
317 Some(workflow_id.clone()),
318 Some(task_id.clone()),
319 error.clone(),
320 ));
321
322 break;
324 }
325 }
326 } else {
327 let error =
328 DataflowError::Workflow(format!("Function '{}' not found", task.function.name));
329
330 workflow_errors.push(ErrorInfo::new(
331 Some(workflow_id.clone()),
332 Some(task.id.clone()),
333 error,
334 ));
335
336 break;
338 }
339 }
340
341 message.errors.extend(workflow_errors);
343
344 (workflow_id, message)
346 }
347
348 async fn execute_task_static(
350 task_id: &str,
351 workflow_id: &str,
352 message: &mut Message,
353 input_json: &Value,
354 function: &dyn AsyncFunctionHandler,
355 ) -> Result<()> {
356 info!("Executing task {} in workflow {}", task_id, workflow_id);
357
358 let mut last_error = None;
359 let mut retry_count = 0;
360 let max_retries = 3; let retry_delay_ms = 1000; let use_backoff = true; while retry_count <= max_retries {
366 match function.execute(message, input_json).await {
367 Ok((status_code, changes)) => {
368 message.audit_trail.push(AuditTrail {
370 workflow_id: workflow_id.to_string(),
371 task_id: task_id.to_string(),
372 timestamp: Utc::now().to_rfc3339(),
373 changes,
374 status_code,
375 });
376
377 info!("Task {} completed with status {}", task_id, status_code);
378
379 let mut progress = Map::new();
381 progress.insert("task_id".to_string(), Value::String(task_id.to_string()));
382 progress.insert(
383 "workflow_id".to_string(),
384 Value::String(workflow_id.to_string()),
385 );
386 progress.insert(
387 "status_code".to_string(),
388 Value::Number(Number::from(status_code)),
389 );
390 progress.insert(
391 "timestamp".to_string(),
392 Value::String(Utc::now().to_rfc3339()),
393 );
394
395 if retry_count > 0 {
396 progress.insert(
397 "retries".to_string(),
398 Value::Number(Number::from(retry_count)),
399 );
400 }
401
402 message.metadata["progress"] = json!(progress);
403
404 return Ok(());
405 }
406 Err(e) => {
407 last_error = Some(e.clone());
408
409 if retry_count < max_retries {
410 warn!(
411 "Task {} execution failed, retry {}/{}: {:?}",
412 task_id,
413 retry_count + 1,
414 max_retries,
415 e
416 );
417
418 let delay = if use_backoff {
420 retry_delay_ms * (2_u64.pow(retry_count))
421 } else {
422 retry_delay_ms
423 };
424
425 sleep(std::time::Duration::from_millis(delay)).await;
427
428 retry_count += 1;
429 } else {
430 break;
431 }
432 }
433 }
434 }
435
436 let error = last_error.unwrap_or_else(|| {
438 DataflowError::Unknown("Unknown error during task execution".to_string())
439 });
440
441 error!(
442 "Task {} in workflow {} failed after {} retries: {:?}",
443 task_id, workflow_id, retry_count, error
444 );
445
446 Err(error)
447 }
448
449 async fn evaluate_condition(&self, condition: &Value, data: &Value) -> Result<bool> {
451 if let Value::Bool(b) = condition {
453 return Ok(*b);
454 }
455
456 THREAD_LOCAL_DATA_LOGIC.with(|data_logic_cell| {
458 let data_logic = data_logic_cell.borrow_mut();
459 data_logic
460 .evaluate_json(condition, data, None)
461 .map_err(|e| {
462 DataflowError::LogicEvaluation(format!("Error evaluating condition: {}", e))
463 })
464 .map(|result| result.as_bool().unwrap_or(false))
465 })
466 }
467}