1use crate::engine::error::{DataflowError, ErrorInfo, Result};
7use crate::engine::executor::InternalExecutor;
8use crate::engine::functions::{AsyncFunctionHandler, FILTER_STATUS_HALT, FILTER_STATUS_SKIP};
9use crate::engine::message::{AuditTrail, Change, Message};
10use crate::engine::task_executor::TaskExecutor;
11use crate::engine::trace::{ExecutionStep, ExecutionTrace};
12use crate::engine::workflow::Workflow;
13use chrono::Utc;
14use log::{debug, error, info, warn};
15use serde_json::json;
16use std::collections::HashMap;
17use std::sync::Arc;
18
19enum TaskControlFlow {
21 Continue,
23 HaltWorkflow,
25}
26
27pub struct WorkflowExecutor {
35 task_executor: Arc<TaskExecutor>,
37 internal_executor: Arc<InternalExecutor>,
39}
40
41impl WorkflowExecutor {
42 pub fn new(task_executor: Arc<TaskExecutor>, internal_executor: Arc<InternalExecutor>) -> Self {
44 Self {
45 task_executor,
46 internal_executor,
47 }
48 }
49
50 pub fn task_functions(
52 &self,
53 ) -> Arc<HashMap<String, Box<dyn AsyncFunctionHandler + Send + Sync>>> {
54 self.task_executor.task_functions()
55 }
56
57 pub async fn execute(&self, workflow: &Workflow, message: &mut Message) -> Result<bool> {
72 let context_arc = message.get_context_arc();
74
75 let should_execute = self
77 .internal_executor
78 .evaluate_condition(workflow.condition_index, context_arc)?;
79
80 if !should_execute {
81 debug!("Skipping workflow {} - condition not met", workflow.id);
82 return Ok(false);
83 }
84
85 match self.execute_tasks(workflow, message).await {
87 Ok(_) => {
88 info!("Successfully completed workflow: {}", workflow.id);
89 Ok(true)
90 }
91 Err(e) if workflow.continue_on_error => {
92 warn!(
93 "Workflow {} encountered error but continuing: {:?}",
94 workflow.id, e
95 );
96 message.errors.push(
97 ErrorInfo::builder(
98 "WORKFLOW_ERROR",
99 format!("Workflow {} error: {}", workflow.id, e),
100 )
101 .workflow_id(&workflow.id)
102 .build(),
103 );
104 Ok(true)
105 }
106 Err(e) => {
107 error!("Workflow {} failed: {:?}", workflow.id, e);
108 Err(e)
109 }
110 }
111 }
112
113 pub async fn execute_with_trace(
125 &self,
126 workflow: &Workflow,
127 message: &mut Message,
128 trace: &mut ExecutionTrace,
129 ) -> Result<bool> {
130 let context_arc = message.get_context_arc();
132
133 let should_execute = self
135 .internal_executor
136 .evaluate_condition(workflow.condition_index, context_arc)?;
137
138 if !should_execute {
139 debug!("Skipping workflow {} - condition not met", workflow.id);
140 trace.add_step(ExecutionStep::workflow_skipped(&workflow.id));
142 return Ok(false);
143 }
144
145 match self
147 .execute_tasks_with_trace(workflow, message, trace)
148 .await
149 {
150 Ok(_) => {
151 info!("Successfully completed workflow: {}", workflow.id);
152 Ok(true)
153 }
154 Err(e) if workflow.continue_on_error => {
155 warn!(
156 "Workflow {} encountered error but continuing: {:?}",
157 workflow.id, e
158 );
159 message.errors.push(
160 ErrorInfo::builder(
161 "WORKFLOW_ERROR",
162 format!("Workflow {} error: {}", workflow.id, e),
163 )
164 .workflow_id(&workflow.id)
165 .build(),
166 );
167 Ok(true)
168 }
169 Err(e) => {
170 error!("Workflow {} failed: {:?}", workflow.id, e);
171 Err(e)
172 }
173 }
174 }
175
176 async fn execute_tasks(&self, workflow: &Workflow, message: &mut Message) -> Result<()> {
178 for task in &workflow.tasks {
179 let context_arc = message.get_context_arc();
181
182 let should_execute = self
184 .internal_executor
185 .evaluate_condition(task.condition_index, context_arc)?;
186
187 if !should_execute {
188 debug!("Skipping task {} - condition not met", task.id);
189 continue;
190 }
191
192 let result = self.task_executor.execute(task, message).await;
194
195 match self.handle_task_result(
197 result,
198 &workflow.id,
199 &task.id,
200 task.continue_on_error,
201 message,
202 )? {
203 TaskControlFlow::HaltWorkflow => break,
204 TaskControlFlow::Continue => {}
205 }
206 }
207
208 Ok(())
209 }
210
211 async fn execute_tasks_with_trace(
213 &self,
214 workflow: &Workflow,
215 message: &mut Message,
216 trace: &mut ExecutionTrace,
217 ) -> Result<()> {
218 for task in &workflow.tasks {
219 let context_arc = message.get_context_arc();
221
222 let should_execute = self
224 .internal_executor
225 .evaluate_condition(task.condition_index, context_arc)?;
226
227 if !should_execute {
228 debug!("Skipping task {} - condition not met", task.id);
229 trace.add_step(ExecutionStep::task_skipped(&workflow.id, &task.id));
231 continue;
232 }
233
234 let result = self.task_executor.execute_with_trace(task, message).await;
236
237 let mapping_contexts = match &result {
239 Ok((_, _, contexts)) => contexts.clone(),
240 Err(_) => None,
241 };
242
243 let standard_result = result.map(|(status, changes, _)| (status, changes));
245
246 let control_flow = self.handle_task_result(
248 standard_result,
249 &workflow.id,
250 &task.id,
251 task.continue_on_error,
252 message,
253 )?;
254
255 let mut step = ExecutionStep::executed(&workflow.id, &task.id, message);
257 if let Some(contexts) = mapping_contexts {
258 step = step.with_mapping_contexts(contexts);
259 }
260 trace.add_step(step);
261
262 if let TaskControlFlow::HaltWorkflow = control_flow {
263 break;
264 }
265 }
266
267 Ok(())
268 }
269
270 fn handle_task_result(
272 &self,
273 result: Result<(usize, Vec<Change>)>,
274 workflow_id: &str,
275 task_id: &str,
276 continue_on_error: bool,
277 message: &mut Message,
278 ) -> Result<TaskControlFlow> {
279 match result {
280 Ok((status, changes)) => {
281 if status == FILTER_STATUS_SKIP {
283 debug!("Task {} signaled skip (filter gate)", task_id);
284 return Ok(TaskControlFlow::Continue);
285 }
286
287 message.audit_trail.push(AuditTrail {
289 timestamp: Utc::now(),
290 workflow_id: Arc::from(workflow_id),
291 task_id: Arc::from(task_id),
292 status,
293 changes,
294 });
295
296 if let Some(metadata) = message.context["metadata"].as_object_mut() {
298 if let Some(progress) = metadata.get_mut("progress") {
300 if let Some(progress_obj) = progress.as_object_mut() {
301 progress_obj.insert("workflow_id".to_string(), json!(workflow_id));
302 progress_obj.insert("task_id".to_string(), json!(task_id));
303 progress_obj.insert("status_code".to_string(), json!(status));
304 }
305 } else {
306 metadata.insert(
307 "progress".to_string(),
308 json!({
309 "workflow_id": workflow_id,
310 "task_id": task_id,
311 "status_code": status
312 }),
313 );
314 }
315 }
316 message.invalidate_context_cache();
317
318 if status == FILTER_STATUS_HALT {
320 info!(
321 "Task {} halted workflow {} (filter gate)",
322 task_id, workflow_id
323 );
324 return Ok(TaskControlFlow::HaltWorkflow);
325 }
326
327 if (400..500).contains(&status) {
329 warn!("Task {} returned client error status: {}", task_id, status);
330 } else if status >= 500 {
331 error!("Task {} returned server error status: {}", task_id, status);
332 if !continue_on_error {
333 return Err(DataflowError::Task(format!(
334 "Task {} failed with status {}",
335 task_id, status
336 )));
337 }
338 }
339 Ok(TaskControlFlow::Continue)
340 }
341 Err(e) => {
342 error!("Task {} failed: {:?}", task_id, e);
343
344 message.audit_trail.push(AuditTrail {
346 timestamp: Utc::now(),
347 workflow_id: Arc::from(workflow_id),
348 task_id: Arc::from(task_id),
349 status: 500,
350 changes: vec![],
351 });
352
353 message.errors.push(
355 ErrorInfo::builder("TASK_ERROR", format!("Task {} error: {}", task_id, e))
356 .workflow_id(workflow_id)
357 .task_id(task_id)
358 .build(),
359 );
360
361 if !continue_on_error {
362 Err(e)
363 } else {
364 Ok(TaskControlFlow::Continue)
365 }
366 }
367 }
368 }
369}
370
371#[cfg(test)]
372mod tests {
373 use super::*;
374 use crate::engine::compiler::LogicCompiler;
375 use serde_json::json;
376 use std::collections::HashMap;
377
378 #[tokio::test]
379 async fn test_workflow_executor_skip_condition() {
380 let workflow_json = r#"{
382 "id": "test_workflow",
383 "name": "Test Workflow",
384 "condition": false,
385 "tasks": [{
386 "id": "dummy_task",
387 "name": "Dummy Task",
388 "function": {
389 "name": "map",
390 "input": {"mappings": []}
391 }
392 }]
393 }"#;
394
395 let mut compiler = LogicCompiler::new();
396 let mut workflow = Workflow::from_json(workflow_json).unwrap();
397
398 let workflows = compiler.compile_workflows(vec![workflow.clone()]);
400 if let Some(compiled_workflow) = workflows.iter().find(|w| w.id == "test_workflow") {
401 workflow = compiled_workflow.clone();
402 }
403
404 let (datalogic, logic_cache) = compiler.into_parts();
405 let internal_executor = Arc::new(InternalExecutor::new(datalogic.clone(), logic_cache));
406 let task_executor = Arc::new(TaskExecutor::new(
407 Arc::new(HashMap::new()),
408 internal_executor.clone(),
409 datalogic,
410 ));
411 let workflow_executor = WorkflowExecutor::new(task_executor, internal_executor);
412
413 let mut message = Message::from_value(&json!({}));
414
415 let executed = workflow_executor
417 .execute(&workflow, &mut message)
418 .await
419 .unwrap();
420 assert!(!executed);
421 assert_eq!(message.audit_trail.len(), 0);
422 }
423
424 #[tokio::test]
425 async fn test_workflow_executor_execute_success() {
426 let workflow_json = r#"{
428 "id": "test_workflow",
429 "name": "Test Workflow",
430 "condition": true,
431 "tasks": [{
432 "id": "dummy_task",
433 "name": "Dummy Task",
434 "function": {
435 "name": "map",
436 "input": {"mappings": []}
437 }
438 }]
439 }"#;
440
441 let mut compiler = LogicCompiler::new();
442 let mut workflow = Workflow::from_json(workflow_json).unwrap();
443
444 let workflows = compiler.compile_workflows(vec![workflow.clone()]);
446 if let Some(compiled_workflow) = workflows.iter().find(|w| w.id == "test_workflow") {
447 workflow = compiled_workflow.clone();
448 }
449
450 let (datalogic, logic_cache) = compiler.into_parts();
451 let internal_executor = Arc::new(InternalExecutor::new(datalogic.clone(), logic_cache));
452 let task_executor = Arc::new(TaskExecutor::new(
453 Arc::new(HashMap::new()),
454 internal_executor.clone(),
455 datalogic,
456 ));
457 let workflow_executor = WorkflowExecutor::new(task_executor, internal_executor);
458
459 let mut message = Message::from_value(&json!({}));
460
461 let executed = workflow_executor
463 .execute(&workflow, &mut message)
464 .await
465 .unwrap();
466 assert!(executed);
467 }
468}