workflow_graph_worker_sdk/
executor.rs1use std::collections::HashMap;
2use std::time::Duration;
3
4use serde::Serialize;
5use tokio::io::{AsyncBufReadExt, BufReader};
6use tokio::process::Command;
7use workflow_graph_queue::traits::{LogChunk, LogStream};
8
9pub struct JobOutput {
11 pub outputs: HashMap<String, String>,
12}
13
14#[derive(Debug)]
16pub struct JobError {
17 pub message: String,
18 pub exit_code: Option<i32>,
19}
20
21impl std::fmt::Display for JobError {
22 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
23 if let Some(code) = self.exit_code {
24 write!(f, "exit code {code}: {}", self.message)
25 } else {
26 write!(f, "{}", self.message)
27 }
28 }
29}
30
31impl std::error::Error for JobError {}
32
33#[derive(Serialize)]
34struct PushLogsRequest {
35 chunks: Vec<LogChunk>,
36}
37
38pub async fn execute_job_streaming(
40 command: &str,
41 client: &reqwest::Client,
42 logs_url: &str,
43 workflow_id: &str,
44 job_id: &str,
45 batch_interval: Duration,
46 cancel_token: tokio_util::sync::CancellationToken,
47) -> Result<JobOutput, JobError> {
48 let mut child = Command::new("sh")
49 .arg("-c")
50 .arg(command)
51 .stdout(std::process::Stdio::piped())
52 .stderr(std::process::Stdio::piped())
53 .spawn()
54 .map_err(|e| JobError {
55 message: format!("failed to spawn: {e}"),
56 exit_code: None,
57 })?;
58
59 let stdout = child.stdout.take().unwrap();
60 let stderr = child.stderr.take().unwrap();
61
62 let wf_id = workflow_id.to_string();
63 let j_id = job_id.to_string();
64 let client = client.clone();
65 let logs_url = logs_url.to_string();
66
67 let log_handle = {
69 let wf_id = wf_id.clone();
70 let j_id = j_id.clone();
71 let client = client.clone();
72 let logs_url = logs_url.clone();
73
74 tokio::spawn(async move {
75 let mut stdout_reader = BufReader::new(stdout).lines();
76 let mut stderr_reader = BufReader::new(stderr).lines();
77 let mut sequence: u64 = 0;
78 let mut batch: Vec<LogChunk> = Vec::new();
79 let mut last_flush = tokio::time::Instant::now();
80 let mut full_output = String::new();
81
82 loop {
83 tokio::select! {
84 line = stdout_reader.next_line() => {
85 match line {
86 Ok(Some(text)) => {
87 full_output.push_str(&text);
88 full_output.push('\n');
89 batch.push(LogChunk {
90 workflow_id: wf_id.clone(),
91 job_id: j_id.clone(),
92 sequence,
93 data: format!("{text}\n"),
94 timestamp_ms: now_ms(),
95 stream: LogStream::Stdout,
96 });
97 sequence += 1;
98 }
99 Ok(None) => break, Err(_) => break,
101 }
102 }
103 line = stderr_reader.next_line() => {
104 match line {
105 Ok(Some(text)) => {
106 full_output.push_str(&text);
107 full_output.push('\n');
108 batch.push(LogChunk {
109 workflow_id: wf_id.clone(),
110 job_id: j_id.clone(),
111 sequence,
112 data: format!("{text}\n"),
113 timestamp_ms: now_ms(),
114 stream: LogStream::Stderr,
115 });
116 sequence += 1;
117 }
118 Ok(None) => {} Err(_) => {}
120 }
121 }
122 _ = tokio::time::sleep_until(last_flush + batch_interval) => {
123 if !batch.is_empty() {
125 flush_logs(&client, &logs_url, &mut batch).await;
126 last_flush = tokio::time::Instant::now();
127 }
128 }
129 }
130 }
131
132 if !batch.is_empty() {
134 flush_logs(&client, &logs_url, &mut batch).await;
135 }
136
137 full_output
138 })
139 };
140
141 tokio::select! {
143 status = child.wait() => {
144 let output = log_handle.await.unwrap_or_default();
145
146 match status {
147 Ok(exit) if exit.success() => Ok(JobOutput {
148 outputs: HashMap::new(),
149 }),
150 Ok(exit) => Err(JobError {
151 message: output.chars().take(4096).collect(),
152 exit_code: exit.code(),
153 }),
154 Err(e) => Err(JobError {
155 message: format!("wait failed: {e}"),
156 exit_code: None,
157 }),
158 }
159 }
160 _ = cancel_token.cancelled() => {
161 child.kill().await.ok();
163 log_handle.abort();
164 Err(JobError {
165 message: "cancelled".into(),
166 exit_code: None,
167 })
168 }
169 }
170}
171
172async fn flush_logs(client: &reqwest::Client, url: &str, batch: &mut Vec<LogChunk>) {
173 let chunks = std::mem::take(batch);
174 client
175 .post(url)
176 .json(&PushLogsRequest { chunks })
177 .send()
178 .await
179 .ok();
180}
181
182fn now_ms() -> u64 {
183 std::time::SystemTime::now()
184 .duration_since(std::time::UNIX_EPOCH)
185 .unwrap_or_default()
186 .as_millis() as u64
187}