taskflow_rs/executor/handlers/
python.rs

1use std::process::Stdio;
2use tokio::process::Command;
3use tracing::{error, info};
4
5use crate::error::{Result, TaskFlowError};
6use crate::task::{Task, TaskHandler, TaskResult};
7
8pub struct PythonTaskHandler;
9
10impl PythonTaskHandler {
11    pub fn new() -> Self {
12        Self
13    }
14}
15
16#[async_trait::async_trait]
17impl TaskHandler for PythonTaskHandler {
18    fn task_type(&self) -> &'static str {
19        "python_script"
20    }
21
22    async fn execute(&self, task: &Task) -> Result<TaskResult> {
23        let script_content = task
24            .definition
25            .payload
26            .get("script")
27            .and_then(|v| v.as_str())
28            .ok_or_else(|| {
29                TaskFlowError::InvalidConfiguration("Missing script content".to_string())
30            })?;
31
32        let args = task
33            .definition
34            .payload
35            .get("args")
36            .and_then(|v| v.as_array())
37            .map(|arr| {
38                arr.iter()
39                    .filter_map(|v| v.as_str().map(|s| s.to_string()))
40                    .collect::<Vec<String>>()
41            })
42            .unwrap_or_default();
43
44        info!(
45            "Executing Python script: {} with args: {:?}",
46            task.definition.id, args
47        );
48
49        let mut cmd = Command::new("python3");
50        cmd.arg("-c").arg(script_content);
51        cmd.args(&args);
52        cmd.stdin(Stdio::null());
53        cmd.stdout(Stdio::piped());
54        cmd.stderr(Stdio::piped());
55
56        let output = cmd
57            .output()
58            .await
59            .map_err(|e| TaskFlowError::ExecutionError(e.to_string()))?;
60
61        let success = output.status.success();
62        let stdout = String::from_utf8_lossy(&output.stdout).to_string();
63        let stderr = String::from_utf8_lossy(&output.stderr).to_string();
64
65        if !success {
66            error!("Python script execution failed: {}", stderr);
67        }
68
69        Ok(TaskResult {
70            success,
71            output: Some(stdout),
72            error: if stderr.is_empty() {
73                None
74            } else {
75                Some(stderr)
76            },
77            execution_time_ms: 0,
78            metadata: Default::default(),
79        })
80    }
81}