1use super::cancel_registry::CancelRegistry;
8use crate::component_logger::LogStrageConfig;
9use crate::envvar::EnvVar;
10use crate::std_output_stream::{StdOutputConfig, StdOutputConfigWithSender};
11use async_trait::async_trait;
12use concepts::storage::LogInfoAppendRow;
13use concepts::{
14 ComponentType, FunctionFqn, FunctionMetadata, PackageIfcFns, ParameterType,
15 ReturnTypeExtendable,
16};
17use executor::worker::{
18 FatalError, RunFinished, Worker, WorkerContext, WorkerError, WorkerResult, WorkerResultOk,
19};
20use secrecy::{ExposeSecret, SecretString};
21use std::path::PathBuf;
22use std::sync::Arc;
23use tokio::io::AsyncReadExt;
24use tokio::sync::mpsc;
25use tracing::{debug, trace, warn};
26use utils::wasm_tools::WasmComponent;
27
28#[derive(Debug)]
30pub enum ExecProgram {
31 Inline(String),
33 CachedFile(PathBuf),
35}
36
37pub struct ActivityExecWorkerCompiled {
39 program: ExecProgram,
40 user_ffqn: FunctionFqn,
41 user_params: Vec<ParameterType>,
42 user_return_type: ReturnTypeExtendable,
43 env_vars: Arc<[EnvVar]>,
44 max_output_bytes: u64,
45 forward_stdout: Option<StdOutputConfig>,
46 forward_stderr: Option<StdOutputConfig>,
47 stdin_content: Option<SecretString>,
49 user_wasm_component: WasmComponent,
50}
51
52impl ActivityExecWorkerCompiled {
53 #[expect(clippy::too_many_arguments)]
54 pub fn new(
55 program: ExecProgram,
56 user_ffqn: FunctionFqn,
57 user_params: Vec<ParameterType>,
58 user_return_type: ReturnTypeExtendable,
59 env_vars: Arc<[EnvVar]>,
60 max_output_bytes: u64,
61 forward_stdout: Option<StdOutputConfig>,
62 forward_stderr: Option<StdOutputConfig>,
63 stdin_content: Option<SecretString>,
64 ) -> Result<Self, utils::wasm_tools::DecodeError> {
65 let user_wasm_component = WasmComponent::new_from_fn_signature(
66 &user_ffqn,
67 &user_params,
68 &user_return_type,
69 ComponentType::Activity,
70 "exec-activity",
71 )?;
72 Ok(Self {
73 program,
74 user_ffqn,
75 user_params,
76 user_return_type,
77 env_vars,
78 max_output_bytes,
79 forward_stdout,
80 forward_stderr,
81 stdin_content,
82 user_wasm_component,
83 })
84 }
85
86 #[must_use]
87 pub fn exported_functions_ext(&self) -> &[FunctionMetadata] {
88 self.user_wasm_component.exported_functions(true)
89 }
90
91 #[must_use]
92 pub fn exports_hierarchy_ext(&self) -> &[PackageIfcFns] {
93 self.user_wasm_component.exports_hierarchy_ext()
94 }
95
96 #[must_use]
97 pub fn wit(&self) -> String {
98 self.user_wasm_component.wit()
99 }
100
101 #[must_use]
102 pub fn into_worker(
103 self,
104 cancel_registry: CancelRegistry,
105 log_forwarder_sender: &mpsc::Sender<LogInfoAppendRow>,
106 _logs_storage_config: Option<LogStrageConfig>,
107 ) -> ActivityExecWorker {
108 let stdout_config = StdOutputConfigWithSender::new(
109 self.forward_stdout,
110 log_forwarder_sender,
111 concepts::storage::LogStreamType::StdOut,
112 );
113 let stderr_config = StdOutputConfigWithSender::new(
114 self.forward_stderr,
115 log_forwarder_sender,
116 concepts::storage::LogStreamType::StdErr,
117 );
118 ActivityExecWorker {
119 program: self.program,
120 user_ffqn: self.user_ffqn,
121 user_params: self.user_params,
122 user_return_type: self.user_return_type,
123 env_vars: self.env_vars,
124 max_output_bytes: self.max_output_bytes,
125 forward_stdout: stdout_config,
126 forward_stderr: stderr_config,
127 stdin_content: self.stdin_content,
128 cancel_registry,
129 user_exports_noext: self.user_wasm_component.exported_functions(false).to_vec(),
130 }
131 }
132}
133
134pub struct ActivityExecWorker {
135 program: ExecProgram,
136 #[allow(dead_code)]
137 user_ffqn: FunctionFqn,
138 user_params: Vec<ParameterType>,
139 user_return_type: ReturnTypeExtendable,
140 env_vars: Arc<[EnvVar]>,
141 max_output_bytes: u64,
142 forward_stdout: Option<StdOutputConfigWithSender>,
143 forward_stderr: Option<StdOutputConfigWithSender>,
144 stdin_content: Option<SecretString>,
145 cancel_registry: CancelRegistry,
146 user_exports_noext: Vec<FunctionMetadata>,
147}
148
149async fn read_and_stream(
153 reader: &mut (impl tokio::io::AsyncRead + Unpin),
154 capture_limit: u64,
155 forwarder: Option<&StdOutputConfigWithSender>,
156 ctx: &WorkerContext,
157) -> std::io::Result<(Vec<u8>, bool)> {
158 let mut buf = Vec::with_capacity(capture_limit.min(8192) as usize);
159 let mut chunk = [0u8; 4096];
160 let mut exceeded = false;
161 loop {
162 let n = reader.read(&mut chunk).await?;
163 if n == 0 {
164 break;
165 }
166 if let Some(fwd) = forwarder {
168 forward_output(fwd, &chunk[..n], ctx);
169 }
170 if !exceeded && capture_limit > 0 {
172 let space = usize::try_from(capture_limit)
173 .expect("32 bit systems are unsupported")
174 .saturating_sub(buf.len());
175 if space > 0 {
176 let to_capture = n.min(space);
177 buf.extend_from_slice(&chunk[..to_capture]);
178 }
179 if buf.len() as u64 >= capture_limit && n > space {
180 exceeded = true;
181 }
182 }
183 }
184 if capture_limit == 0 {
185 assert!(!exceeded);
186 }
187 Ok((buf, exceeded))
188}
189
190#[async_trait]
191impl Worker for ActivityExecWorker {
192 fn exported_functions_noext(&self) -> &[FunctionMetadata] {
193 &self.user_exports_noext
194 }
195
196 async fn run(&self, ctx: WorkerContext) -> WorkerResult {
197 let version = ctx.version.clone();
198
199 let mut param_args: Vec<String> = Vec::new();
200
201 let _temp_file_guard;
205 let mut cmd = match &self.program {
206 ExecProgram::Inline(content) => {
207 let mut builder = tempfile::Builder::new();
208 builder.prefix("obelisk-exec-");
209 #[cfg(unix)]
210 {
211 use std::os::unix::fs::PermissionsExt;
212 builder.permissions(std::fs::Permissions::from_mode(0o755));
213 }
214 let mut tmp = builder.tempfile().map_err(|e| {
215 WorkerError::FatalError(
216 FatalError::CannotInstantiate {
217 reason: "failed to create temp file for inline script".to_string(),
218 detail: Some(e.to_string()),
219 },
220 version.clone(),
221 )
222 })?;
223 use std::io::Write;
224 tmp.write_all(content.as_bytes()).map_err(|e| {
225 WorkerError::FatalError(
226 FatalError::CannotInstantiate {
227 reason: "failed to write inline script".to_string(),
228 detail: Some(e.to_string()),
229 },
230 version.clone(),
231 )
232 })?;
233 let temp_path = tmp.into_temp_path();
234 let cmd = tokio::process::Command::new(&temp_path);
235 _temp_file_guard = Some(temp_path);
236 cmd
237 }
238 ExecProgram::CachedFile(path) => {
239 _temp_file_guard = None;
240 tokio::process::Command::new(path)
241 }
242 };
243
244 {
245 let json_params = ctx
247 .params
248 .as_json_values()
249 .expect("params come from database, not wasmtime");
250 assert_eq!(
251 self.user_params.len(),
252 json_params.len(),
253 "type checked in Params::from_json_values"
254 );
255 param_args.extend(json_params.iter().map(|v| {
256 serde_json::to_string(v).expect("serde_json::Value must be serializable")
257 }));
258 }
259 cmd.args(param_args);
260
261 cmd.env_clear();
263 for env_var in self.env_vars.iter() {
264 cmd.env(&env_var.key, &env_var.val);
265 }
266
267 #[cfg(unix)]
269 cmd.process_group(0);
270 cmd.kill_on_drop(true);
271
272 cmd.stdout(std::process::Stdio::piped());
274 cmd.stderr(std::process::Stdio::piped());
275 if self.stdin_content.is_some() {
276 cmd.stdin(std::process::Stdio::piped());
277 }
278
279 trace!("Spawning {cmd:?}");
281 let mut child = cmd.spawn().map_err(|e| {
282 WorkerError::FatalError(
283 FatalError::CannotInstantiate {
284 reason: "failed to spawn child process".to_string(),
285 detail: Some(e.to_string()),
286 },
287 version.clone(),
288 )
289 })?;
290
291 if let Some(ref stdin_content) = self.stdin_content {
293 use tokio::io::AsyncWriteExt;
294 let mut child_stdin = child.stdin.take().expect("stdin was piped");
295 child_stdin
296 .write_all(stdin_content.expose_secret().as_bytes())
297 .await
298 .map_err(|e| {
299 WorkerError::FatalError(
300 FatalError::CannotInstantiate {
301 reason: "failed to write to child stdin".to_string(),
302 detail: Some(e.to_string()),
303 },
304 version.clone(),
305 )
306 })?;
307 drop(child_stdin);
309 }
310
311 let mut child_stdout = child.stdout.take().expect("stdout was piped");
312 let mut child_stderr = child.stderr.take().expect("stderr was piped");
313
314 let cancel_token = self
316 .cancel_registry
317 .obtain_cancellation_token(ctx.execution_id.clone());
318
319 let max_stdout_bytes = if self.user_return_type.type_wrapper_tl.is_result_of_units() {
321 0
322 } else {
323 self.max_output_bytes
324 };
325 let result = tokio::select! {
326 biased;
327 _signal = cancel_token => {
328 debug!("Activity run interrupted, DB must have been updated");
331 let _ = child.kill().await;
333 return Ok(WorkerResultOk::DbUpdatedByWorkerOrWatcher);
334 }
335 result = async {
336 let stdout_fut = read_and_stream(
338 &mut child_stdout,
339 max_stdout_bytes,
340 self.forward_stdout.as_ref(),
341 &ctx,
342 );
343 let stderr_fut = read_and_stream(
344 &mut child_stderr,
345 0, self.forward_stderr.as_ref(),
347 &ctx,
348 );
349 let (stdout_result, stderr_result) = tokio::join!(stdout_fut, stderr_fut);
350 let (mut stdout_bytes, mut stdout_exceeded) = stdout_result?;
351 let _ = stderr_result?;
352 let exit_code = child.wait().await?.code().unwrap_or(-1);
353 if exit_code == 0 && self.user_return_type.type_wrapper_tl.ok.is_none()
355 || exit_code != 0 && self.user_return_type.type_wrapper_tl.err.is_none()
356 {
357 stdout_exceeded = false;
358 stdout_bytes = Vec::new();
359 }
360 Ok::<_, std::io::Error>((stdout_bytes, stdout_exceeded, exit_code))
361 } => {
362 result.map_err(|e| {
363 WorkerError::FatalError(
364 FatalError::CannotInstantiate {
365 reason: "I/O error during child process execution".to_string(),
366 detail: Some(e.to_string()),
367 },
368 version.clone(),
369 )
370 })?
371 }
372 };
373
374 let (stdout_bytes, stdout_exceeded, exit_code) = result;
375
376 if stdout_exceeded {
378 return Err(WorkerError::FatalError(
379 FatalError::CannotInstantiate {
380 reason: format!(
381 "stdout exceeded max_output_bytes limit of {} bytes",
382 self.max_output_bytes
383 ),
384 detail: None,
385 },
386 version,
387 ));
388 }
389
390 debug!(
391 exit_code,
392 stdout_len = stdout_bytes.len(),
393 "Child process finished"
394 );
395 let stdout = String::from_utf8_lossy(&stdout_bytes);
396 let parsed = if stdout.trim().is_empty() {
397 None
398 } else {
399 Some(serde_json::from_str::<serde_json::Value>(&stdout).map_err(|e| {
400 WorkerError::FatalError(
401 FatalError::ResultParsingError(
402 concepts::ResultParsingError::ResultParsingErrorFromVal(
403 concepts::ResultParsingErrorFromVal::TypeCheckError(format!(
404 "failed to parse stdout as JSON on exit {exit_code}: {e}, stdout: `{stdout}`"
405 )),
406 ),
407 ),
408 version.clone(),
409 )
410 })?)
411 };
412
413 let retval = if exit_code == 0 {
414 crate::js_worker_utils::map_ok_variant(parsed, &self.user_return_type, version.clone())?
415 } else {
416 crate::js_worker_utils::map_err_variant(
417 parsed,
418 &self.user_return_type,
419 version.clone(),
420 )?
421 };
422 Ok(WorkerResultOk::RunFinished(RunFinished {
423 retval,
424 version,
425 http_client_traces: None,
426 }))
427 }
428}
429
430fn forward_output(config: &StdOutputConfigWithSender, output: &[u8], ctx: &WorkerContext) {
431 if output.is_empty() {
432 return;
433 }
434 match config {
435 StdOutputConfigWithSender::Stdout => {
436 use std::io::Write;
437 let _ = std::io::stdout().write_all(output);
438 }
439 StdOutputConfigWithSender::Stderr => {
440 use std::io::Write;
441 let _ = std::io::stderr().write_all(output);
442 }
443 StdOutputConfigWithSender::Db {
444 sender,
445 forwarding_from,
446 } => {
447 let log_entry = concepts::storage::LogEntry::Stream {
448 created_at: chrono::Utc::now(),
449 payload: output.to_vec(),
450 stream_type: *forwarding_from,
451 };
452 let row = LogInfoAppendRow {
453 execution_id: ctx.execution_id.clone(),
454 run_id: ctx.locked_event.run_id,
455 log_entry,
456 };
457 if let Err(err) = sender.try_send(row) {
458 warn!("Failed to forward output to DB: {err}");
459 }
460 }
461 }
462}