Skip to main content

workflow_graph_worker_sdk/
executor.rs

1use std::collections::HashMap;
2use std::time::Duration;
3
4use serde::Serialize;
5use tokio::io::{AsyncBufReadExt, BufReader};
6use tokio::process::Command;
7use workflow_graph_queue::traits::{LogChunk, LogStream};
8
9/// Result of a successful job execution.
10pub struct JobOutput {
11    pub outputs: HashMap<String, String>,
12}
13
14/// Error from job execution.
15#[derive(Debug)]
16pub struct JobError {
17    pub message: String,
18    pub exit_code: Option<i32>,
19}
20
21impl std::fmt::Display for JobError {
22    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
23        if let Some(code) = self.exit_code {
24            write!(f, "exit code {code}: {}", self.message)
25        } else {
26            write!(f, "{}", self.message)
27        }
28    }
29}
30
31impl std::error::Error for JobError {}
32
33#[derive(Serialize)]
34struct PushLogsRequest {
35    chunks: Vec<LogChunk>,
36}
37
38/// Execute a shell command, streaming output to the server as log chunks.
39pub async fn execute_job_streaming(
40    command: &str,
41    client: &reqwest::Client,
42    logs_url: &str,
43    workflow_id: &str,
44    job_id: &str,
45    batch_interval: Duration,
46    cancel_token: tokio_util::sync::CancellationToken,
47) -> Result<JobOutput, JobError> {
48    let mut child = Command::new("sh")
49        .arg("-c")
50        .arg(command)
51        .stdout(std::process::Stdio::piped())
52        .stderr(std::process::Stdio::piped())
53        .spawn()
54        .map_err(|e| JobError {
55            message: format!("failed to spawn: {e}"),
56            exit_code: None,
57        })?;
58
59    let stdout = child.stdout.take().unwrap();
60    let stderr = child.stderr.take().unwrap();
61
62    let wf_id = workflow_id.to_string();
63    let j_id = job_id.to_string();
64    let client = client.clone();
65    let logs_url = logs_url.to_string();
66
67    // Collect output lines and batch-send as log chunks
68    let log_handle = {
69        let wf_id = wf_id.clone();
70        let j_id = j_id.clone();
71        let client = client.clone();
72        let logs_url = logs_url.clone();
73
74        tokio::spawn(async move {
75            let mut stdout_reader = BufReader::new(stdout).lines();
76            let mut stderr_reader = BufReader::new(stderr).lines();
77            let mut sequence: u64 = 0;
78            let mut batch: Vec<LogChunk> = Vec::new();
79            let mut last_flush = tokio::time::Instant::now();
80            let mut full_output = String::new();
81
82            loop {
83                tokio::select! {
84                    line = stdout_reader.next_line() => {
85                        match line {
86                            Ok(Some(text)) => {
87                                full_output.push_str(&text);
88                                full_output.push('\n');
89                                batch.push(LogChunk {
90                                    workflow_id: wf_id.clone(),
91                                    job_id: j_id.clone(),
92                                    sequence,
93                                    data: format!("{text}\n"),
94                                    timestamp_ms: now_ms(),
95                                    stream: LogStream::Stdout,
96                                });
97                                sequence += 1;
98                            }
99                            Ok(None) => break, // stdout closed
100                            Err(_) => break,
101                        }
102                    }
103                    line = stderr_reader.next_line() => {
104                        match line {
105                            Ok(Some(text)) => {
106                                full_output.push_str(&text);
107                                full_output.push('\n');
108                                batch.push(LogChunk {
109                                    workflow_id: wf_id.clone(),
110                                    job_id: j_id.clone(),
111                                    sequence,
112                                    data: format!("{text}\n"),
113                                    timestamp_ms: now_ms(),
114                                    stream: LogStream::Stderr,
115                                });
116                                sequence += 1;
117                            }
118                            Ok(None) => {} // stderr closed before stdout, keep going
119                            Err(_) => {}
120                        }
121                    }
122                    _ = tokio::time::sleep_until(last_flush + batch_interval) => {
123                        // Flush batch
124                        if !batch.is_empty() {
125                            flush_logs(&client, &logs_url, &mut batch).await;
126                            last_flush = tokio::time::Instant::now();
127                        }
128                    }
129                }
130            }
131
132            // Final flush
133            if !batch.is_empty() {
134                flush_logs(&client, &logs_url, &mut batch).await;
135            }
136
137            full_output
138        })
139    };
140
141    // Wait for either completion or cancellation
142    tokio::select! {
143        status = child.wait() => {
144            let output = log_handle.await.unwrap_or_default();
145
146            match status {
147                Ok(exit) if exit.success() => Ok(JobOutput {
148                    outputs: HashMap::new(),
149                }),
150                Ok(exit) => Err(JobError {
151                    message: output.chars().take(4096).collect(),
152                    exit_code: exit.code(),
153                }),
154                Err(e) => Err(JobError {
155                    message: format!("wait failed: {e}"),
156                    exit_code: None,
157                }),
158            }
159        }
160        _ = cancel_token.cancelled() => {
161            // Kill the child process
162            child.kill().await.ok();
163            log_handle.abort();
164            Err(JobError {
165                message: "cancelled".into(),
166                exit_code: None,
167            })
168        }
169    }
170}
171
172async fn flush_logs(client: &reqwest::Client, url: &str, batch: &mut Vec<LogChunk>) {
173    let chunks = std::mem::take(batch);
174    client
175        .post(url)
176        .json(&PushLogsRequest { chunks })
177        .send()
178        .await
179        .ok();
180}
181
182fn now_ms() -> u64 {
183    std::time::SystemTime::now()
184        .duration_since(std::time::UNIX_EPOCH)
185        .unwrap_or_default()
186        .as_millis() as u64
187}