taskflow_rs/executor/handlers/
python.rs1use std::process::Stdio;
2use tokio::process::Command;
3use tracing::{error, info};
4
5use crate::error::{Result, TaskFlowError};
6use crate::task::{Task, TaskHandler, TaskResult};
7
8pub struct PythonTaskHandler;
9
10impl PythonTaskHandler {
11 pub fn new() -> Self {
12 Self
13 }
14}
15
16#[async_trait::async_trait]
17impl TaskHandler for PythonTaskHandler {
18 fn task_type(&self) -> &'static str {
19 "python_script"
20 }
21
22 async fn execute(&self, task: &Task) -> Result<TaskResult> {
23 let script_content = task
24 .definition
25 .payload
26 .get("script")
27 .and_then(|v| v.as_str())
28 .ok_or_else(|| {
29 TaskFlowError::InvalidConfiguration("Missing script content".to_string())
30 })?;
31
32 let args = task
33 .definition
34 .payload
35 .get("args")
36 .and_then(|v| v.as_array())
37 .map(|arr| {
38 arr.iter()
39 .filter_map(|v| v.as_str().map(|s| s.to_string()))
40 .collect::<Vec<String>>()
41 })
42 .unwrap_or_default();
43
44 info!(
45 "Executing Python script: {} with args: {:?}",
46 task.definition.id, args
47 );
48
49 let mut cmd = Command::new("python3");
50 cmd.arg("-c").arg(script_content);
51 cmd.args(&args);
52 cmd.stdin(Stdio::null());
53 cmd.stdout(Stdio::piped());
54 cmd.stderr(Stdio::piped());
55
56 let output = cmd
57 .output()
58 .await
59 .map_err(|e| TaskFlowError::ExecutionError(e.to_string()))?;
60
61 let success = output.status.success();
62 let stdout = String::from_utf8_lossy(&output.stdout).to_string();
63 let stderr = String::from_utf8_lossy(&output.stderr).to_string();
64
65 if !success {
66 error!("Python script execution failed: {}", stderr);
67 }
68
69 Ok(TaskResult {
70 success,
71 output: Some(stdout),
72 error: if stderr.is_empty() {
73 None
74 } else {
75 Some(stderr)
76 },
77 execution_time_ms: 0,
78 metadata: Default::default(),
79 })
80 }
81}