1use crate::engine::error::{DataflowError, ErrorInfo, Result};
7use crate::engine::executor::InternalExecutor;
8use crate::engine::message::{AuditTrail, Change, Message};
9use crate::engine::task_executor::TaskExecutor;
10use crate::engine::trace::{ExecutionStep, ExecutionTrace};
11use crate::engine::workflow::Workflow;
12use chrono::Utc;
13use log::{debug, error, info, warn};
14use serde_json::json;
15use std::sync::Arc;
16
17pub struct WorkflowExecutor {
25 task_executor: Arc<TaskExecutor>,
27 internal_executor: Arc<InternalExecutor>,
29}
30
31impl WorkflowExecutor {
32 pub fn new(task_executor: Arc<TaskExecutor>, internal_executor: Arc<InternalExecutor>) -> Self {
34 Self {
35 task_executor,
36 internal_executor,
37 }
38 }
39
40 pub async fn execute(&self, workflow: &Workflow, message: &mut Message) -> Result<bool> {
55 let context_arc = message.get_context_arc();
57
58 let should_execute = self
60 .internal_executor
61 .evaluate_condition(workflow.condition_index, context_arc)?;
62
63 if !should_execute {
64 debug!("Skipping workflow {} - condition not met", workflow.id);
65 return Ok(false);
66 }
67
68 match self.execute_tasks(workflow, message).await {
70 Ok(_) => {
71 info!("Successfully completed workflow: {}", workflow.id);
72 Ok(true)
73 }
74 Err(e) if workflow.continue_on_error => {
75 warn!(
76 "Workflow {} encountered error but continuing: {:?}",
77 workflow.id, e
78 );
79 message.errors.push(
80 ErrorInfo::builder(
81 "WORKFLOW_ERROR",
82 format!("Workflow {} error: {}", workflow.id, e),
83 )
84 .workflow_id(&workflow.id)
85 .build(),
86 );
87 Ok(true)
88 }
89 Err(e) => {
90 error!("Workflow {} failed: {:?}", workflow.id, e);
91 Err(e)
92 }
93 }
94 }
95
96 pub async fn execute_with_trace(
108 &self,
109 workflow: &Workflow,
110 message: &mut Message,
111 trace: &mut ExecutionTrace,
112 ) -> Result<bool> {
113 let context_arc = message.get_context_arc();
115
116 let should_execute = self
118 .internal_executor
119 .evaluate_condition(workflow.condition_index, context_arc)?;
120
121 if !should_execute {
122 debug!("Skipping workflow {} - condition not met", workflow.id);
123 trace.add_step(ExecutionStep::workflow_skipped(&workflow.id));
125 return Ok(false);
126 }
127
128 match self
130 .execute_tasks_with_trace(workflow, message, trace)
131 .await
132 {
133 Ok(_) => {
134 info!("Successfully completed workflow: {}", workflow.id);
135 Ok(true)
136 }
137 Err(e) if workflow.continue_on_error => {
138 warn!(
139 "Workflow {} encountered error but continuing: {:?}",
140 workflow.id, e
141 );
142 message.errors.push(
143 ErrorInfo::builder(
144 "WORKFLOW_ERROR",
145 format!("Workflow {} error: {}", workflow.id, e),
146 )
147 .workflow_id(&workflow.id)
148 .build(),
149 );
150 Ok(true)
151 }
152 Err(e) => {
153 error!("Workflow {} failed: {:?}", workflow.id, e);
154 Err(e)
155 }
156 }
157 }
158
159 async fn execute_tasks(&self, workflow: &Workflow, message: &mut Message) -> Result<()> {
161 for task in &workflow.tasks {
162 let context_arc = message.get_context_arc();
164
165 let should_execute = self
167 .internal_executor
168 .evaluate_condition(task.condition_index, context_arc)?;
169
170 if !should_execute {
171 debug!("Skipping task {} - condition not met", task.id);
172 continue;
173 }
174
175 let result = self.task_executor.execute(task, message).await;
177
178 self.handle_task_result(
180 result,
181 &workflow.id,
182 &task.id,
183 task.continue_on_error,
184 message,
185 )?;
186 }
187
188 Ok(())
189 }
190
191 async fn execute_tasks_with_trace(
193 &self,
194 workflow: &Workflow,
195 message: &mut Message,
196 trace: &mut ExecutionTrace,
197 ) -> Result<()> {
198 for task in &workflow.tasks {
199 let context_arc = message.get_context_arc();
201
202 let should_execute = self
204 .internal_executor
205 .evaluate_condition(task.condition_index, context_arc)?;
206
207 if !should_execute {
208 debug!("Skipping task {} - condition not met", task.id);
209 trace.add_step(ExecutionStep::task_skipped(&workflow.id, &task.id));
211 continue;
212 }
213
214 let result = self.task_executor.execute(task, message).await;
216
217 self.handle_task_result(
219 result,
220 &workflow.id,
221 &task.id,
222 task.continue_on_error,
223 message,
224 )?;
225
226 trace.add_step(ExecutionStep::executed(&workflow.id, &task.id, message));
228 }
229
230 Ok(())
231 }
232
233 fn handle_task_result(
235 &self,
236 result: Result<(usize, Vec<Change>)>,
237 workflow_id: &str,
238 task_id: &str,
239 continue_on_error: bool,
240 message: &mut Message,
241 ) -> Result<()> {
242 match result {
243 Ok((status, changes)) => {
244 message.audit_trail.push(AuditTrail {
246 timestamp: Utc::now(),
247 workflow_id: Arc::from(workflow_id),
248 task_id: Arc::from(task_id),
249 status,
250 changes,
251 });
252
253 if let Some(metadata) = message.context["metadata"].as_object_mut() {
255 if let Some(progress) = metadata.get_mut("progress") {
257 if let Some(progress_obj) = progress.as_object_mut() {
258 progress_obj.insert("workflow_id".to_string(), json!(workflow_id));
259 progress_obj.insert("task_id".to_string(), json!(task_id));
260 progress_obj.insert("status_code".to_string(), json!(status));
261 }
262 } else {
263 metadata.insert(
264 "progress".to_string(),
265 json!({
266 "workflow_id": workflow_id,
267 "task_id": task_id,
268 "status_code": status
269 }),
270 );
271 }
272 }
273 message.invalidate_context_cache();
274
275 if (400..500).contains(&status) {
277 warn!("Task {} returned client error status: {}", task_id, status);
278 } else if status >= 500 {
279 error!("Task {} returned server error status: {}", task_id, status);
280 if !continue_on_error {
281 return Err(DataflowError::Task(format!(
282 "Task {} failed with status {}",
283 task_id, status
284 )));
285 }
286 }
287 Ok(())
288 }
289 Err(e) => {
290 error!("Task {} failed: {:?}", task_id, e);
291
292 message.audit_trail.push(AuditTrail {
294 timestamp: Utc::now(),
295 workflow_id: Arc::from(workflow_id),
296 task_id: Arc::from(task_id),
297 status: 500,
298 changes: vec![],
299 });
300
301 message.errors.push(
303 ErrorInfo::builder("TASK_ERROR", format!("Task {} error: {}", task_id, e))
304 .workflow_id(workflow_id)
305 .task_id(task_id)
306 .build(),
307 );
308
309 if !continue_on_error { Err(e) } else { Ok(()) }
310 }
311 }
312 }
313}
314
315#[cfg(test)]
316mod tests {
317 use super::*;
318 use crate::engine::compiler::LogicCompiler;
319 use serde_json::json;
320 use std::collections::HashMap;
321
322 #[tokio::test]
323 async fn test_workflow_executor_skip_condition() {
324 let workflow_json = r#"{
326 "id": "test_workflow",
327 "name": "Test Workflow",
328 "condition": false,
329 "tasks": [{
330 "id": "dummy_task",
331 "name": "Dummy Task",
332 "function": {
333 "name": "map",
334 "input": {"mappings": []}
335 }
336 }]
337 }"#;
338
339 let mut compiler = LogicCompiler::new();
340 let mut workflow = Workflow::from_json(workflow_json).unwrap();
341
342 let workflows = compiler.compile_workflows(vec![workflow.clone()]);
344 if let Some(compiled_workflow) = workflows.get("test_workflow") {
345 workflow = compiled_workflow.clone();
346 }
347
348 let (datalogic, logic_cache) = compiler.into_parts();
349 let internal_executor = Arc::new(InternalExecutor::new(datalogic.clone(), logic_cache));
350 let task_executor = Arc::new(TaskExecutor::new(
351 Arc::new(HashMap::new()),
352 internal_executor.clone(),
353 datalogic,
354 ));
355 let workflow_executor = WorkflowExecutor::new(task_executor, internal_executor);
356
357 let mut message = Message::from_value(&json!({}));
358
359 let executed = workflow_executor
361 .execute(&workflow, &mut message)
362 .await
363 .unwrap();
364 assert!(!executed);
365 assert_eq!(message.audit_trail.len(), 0);
366 }
367
368 #[tokio::test]
369 async fn test_workflow_executor_execute_success() {
370 let workflow_json = r#"{
372 "id": "test_workflow",
373 "name": "Test Workflow",
374 "condition": true,
375 "tasks": [{
376 "id": "dummy_task",
377 "name": "Dummy Task",
378 "function": {
379 "name": "map",
380 "input": {"mappings": []}
381 }
382 }]
383 }"#;
384
385 let mut compiler = LogicCompiler::new();
386 let mut workflow = Workflow::from_json(workflow_json).unwrap();
387
388 let workflows = compiler.compile_workflows(vec![workflow.clone()]);
390 if let Some(compiled_workflow) = workflows.get("test_workflow") {
391 workflow = compiled_workflow.clone();
392 }
393
394 let (datalogic, logic_cache) = compiler.into_parts();
395 let internal_executor = Arc::new(InternalExecutor::new(datalogic.clone(), logic_cache));
396 let task_executor = Arc::new(TaskExecutor::new(
397 Arc::new(HashMap::new()),
398 internal_executor.clone(),
399 datalogic,
400 ));
401 let workflow_executor = WorkflowExecutor::new(task_executor, internal_executor);
402
403 let mut message = Message::from_value(&json!({}));
404
405 let executed = workflow_executor
407 .execute(&workflow, &mut message)
408 .await
409 .unwrap();
410 assert!(executed);
411 }
412}