Skip to main content

obeli_sk_wasm_workers/activity/
activity_exec_worker.rs

1//! Exec activity worker that spawns native processes via `tokio::process::Command`.
2//!
3//! The child process communicates its result via exit code and stdout:
4//! - Exit 0: stdout contains the ok-variant JSON matching `return_type`.
5//! - Exit non-zero: stdout contains the err-variant JSON matching `return_type`.
6
7use super::cancel_registry::CancelRegistry;
8use crate::component_logger::LogStrageConfig;
9use crate::envvar::EnvVar;
10use crate::std_output_stream::{StdOutputConfig, StdOutputConfigWithSender};
11use async_trait::async_trait;
12use concepts::storage::LogInfoAppendRow;
13use concepts::{
14    ComponentType, FunctionFqn, FunctionMetadata, PackageIfcFns, ParameterType,
15    ReturnTypeExtendable,
16};
17use executor::worker::{
18    FatalError, RunFinished, Worker, WorkerContext, WorkerError, WorkerResult, WorkerResultOk,
19};
20use secrecy::{ExposeSecret, SecretString};
21use std::path::PathBuf;
22use std::sync::Arc;
23use tokio::io::AsyncReadExt;
24use tokio::sync::mpsc;
25use tracing::{debug, trace, warn};
26use utils::wasm_tools::WasmComponent;
27
28/// How the exec activity program is provided to the worker.
29#[derive(Debug)]
30pub enum ExecProgram {
31    /// Inline script content. Written to a temp file at each execution.
32    Inline(String),
33    /// Path to an immutable cached script file (from OCI). Executed directly.
34    CachedFile(PathBuf),
35}
36
37/// Compiled exec activity. No WASM engine needed.
38pub struct ActivityExecWorkerCompiled {
39    program: ExecProgram,
40    user_ffqn: FunctionFqn,
41    user_params: Vec<ParameterType>,
42    user_return_type: ReturnTypeExtendable,
43    env_vars: Arc<[EnvVar]>,
44    max_output_bytes: u64,
45    forward_stdout: Option<StdOutputConfig>,
46    forward_stderr: Option<StdOutputConfig>,
47    /// Pre-computed stdin content from resolved secrets. Written to the child's stdin pipe.
48    stdin_content: Option<SecretString>,
49    user_wasm_component: WasmComponent,
50}
51
52impl ActivityExecWorkerCompiled {
53    #[expect(clippy::too_many_arguments)]
54    pub fn new(
55        program: ExecProgram,
56        user_ffqn: FunctionFqn,
57        user_params: Vec<ParameterType>,
58        user_return_type: ReturnTypeExtendable,
59        env_vars: Arc<[EnvVar]>,
60        max_output_bytes: u64,
61        forward_stdout: Option<StdOutputConfig>,
62        forward_stderr: Option<StdOutputConfig>,
63        stdin_content: Option<SecretString>,
64    ) -> Result<Self, utils::wasm_tools::DecodeError> {
65        let user_wasm_component = WasmComponent::new_from_fn_signature(
66            &user_ffqn,
67            &user_params,
68            &user_return_type,
69            ComponentType::Activity,
70            "exec-activity",
71        )?;
72        Ok(Self {
73            program,
74            user_ffqn,
75            user_params,
76            user_return_type,
77            env_vars,
78            max_output_bytes,
79            forward_stdout,
80            forward_stderr,
81            stdin_content,
82            user_wasm_component,
83        })
84    }
85
86    #[must_use]
87    pub fn exported_functions_ext(&self) -> &[FunctionMetadata] {
88        self.user_wasm_component.exported_functions(true)
89    }
90
91    #[must_use]
92    pub fn exports_hierarchy_ext(&self) -> &[PackageIfcFns] {
93        self.user_wasm_component.exports_hierarchy_ext()
94    }
95
96    #[must_use]
97    pub fn wit(&self) -> String {
98        self.user_wasm_component.wit()
99    }
100
101    #[must_use]
102    pub fn into_worker(
103        self,
104        cancel_registry: CancelRegistry,
105        log_forwarder_sender: &mpsc::Sender<LogInfoAppendRow>,
106        _logs_storage_config: Option<LogStrageConfig>,
107    ) -> ActivityExecWorker {
108        let stdout_config = StdOutputConfigWithSender::new(
109            self.forward_stdout,
110            log_forwarder_sender,
111            concepts::storage::LogStreamType::StdOut,
112        );
113        let stderr_config = StdOutputConfigWithSender::new(
114            self.forward_stderr,
115            log_forwarder_sender,
116            concepts::storage::LogStreamType::StdErr,
117        );
118        ActivityExecWorker {
119            program: self.program,
120            user_ffqn: self.user_ffqn,
121            user_params: self.user_params,
122            user_return_type: self.user_return_type,
123            env_vars: self.env_vars,
124            max_output_bytes: self.max_output_bytes,
125            forward_stdout: stdout_config,
126            forward_stderr: stderr_config,
127            stdin_content: self.stdin_content,
128            cancel_registry,
129            user_exports_noext: self.user_wasm_component.exported_functions(false).to_vec(),
130        }
131    }
132}
133
134pub struct ActivityExecWorker {
135    program: ExecProgram,
136    #[allow(dead_code)]
137    user_ffqn: FunctionFqn,
138    user_params: Vec<ParameterType>,
139    user_return_type: ReturnTypeExtendable,
140    env_vars: Arc<[EnvVar]>,
141    max_output_bytes: u64,
142    forward_stdout: Option<StdOutputConfigWithSender>,
143    forward_stderr: Option<StdOutputConfigWithSender>,
144    stdin_content: Option<SecretString>,
145    cancel_registry: CancelRegistry,
146    user_exports_noext: Vec<FunctionMetadata>,
147}
148
149/// Read from `reader` in chunks, streaming each chunk to `forwarder`,
150/// while accumulating the full output (up to `capture_limit` bytes).
151/// Capturing can be turned off by setting `capture_limit` to zero.
152async fn read_and_stream(
153    reader: &mut (impl tokio::io::AsyncRead + Unpin),
154    capture_limit: u64,
155    forwarder: Option<&StdOutputConfigWithSender>,
156    ctx: &WorkerContext,
157) -> std::io::Result<(Vec<u8>, bool)> {
158    let mut buf = Vec::with_capacity(capture_limit.min(8192) as usize);
159    let mut chunk = [0u8; 4096];
160    let mut exceeded = false;
161    loop {
162        let n = reader.read(&mut chunk).await?;
163        if n == 0 {
164            break;
165        }
166        // Forward to log storage.
167        if let Some(fwd) = forwarder {
168            forward_output(fwd, &chunk[..n], ctx);
169        }
170        // Accumulate for result capture unless turned off.
171        if !exceeded && capture_limit > 0 {
172            let space = usize::try_from(capture_limit)
173                .expect("32 bit systems are unsupported")
174                .saturating_sub(buf.len());
175            if space > 0 {
176                let to_capture = n.min(space);
177                buf.extend_from_slice(&chunk[..to_capture]);
178            }
179            if buf.len() as u64 >= capture_limit && n > space {
180                exceeded = true;
181            }
182        }
183    }
184    if capture_limit == 0 {
185        assert!(!exceeded);
186    }
187    Ok((buf, exceeded))
188}
189
190#[async_trait]
191impl Worker for ActivityExecWorker {
192    fn exported_functions_noext(&self) -> &[FunctionMetadata] {
193        &self.user_exports_noext
194    }
195
196    async fn run(&self, ctx: WorkerContext) -> WorkerResult {
197        let version = ctx.version.clone();
198
199        let mut param_args: Vec<String> = Vec::new();
200
201        // Build the command depending on program source.
202        // For inline scripts, write to a temp file each execution.
203        // For cached files (from OCI), execute the immutable file directly.
204        let _temp_file_guard;
205        let mut cmd = match &self.program {
206            ExecProgram::Inline(content) => {
207                let mut builder = tempfile::Builder::new();
208                builder.prefix("obelisk-exec-");
209                #[cfg(unix)]
210                {
211                    use std::os::unix::fs::PermissionsExt;
212                    builder.permissions(std::fs::Permissions::from_mode(0o755));
213                }
214                let mut tmp = builder.tempfile().map_err(|e| {
215                    WorkerError::FatalError(
216                        FatalError::CannotInstantiate {
217                            reason: "failed to create temp file for inline script".to_string(),
218                            detail: Some(e.to_string()),
219                        },
220                        version.clone(),
221                    )
222                })?;
223                use std::io::Write;
224                tmp.write_all(content.as_bytes()).map_err(|e| {
225                    WorkerError::FatalError(
226                        FatalError::CannotInstantiate {
227                            reason: "failed to write inline script".to_string(),
228                            detail: Some(e.to_string()),
229                        },
230                        version.clone(),
231                    )
232                })?;
233                let temp_path = tmp.into_temp_path();
234                let cmd = tokio::process::Command::new(&temp_path);
235                _temp_file_guard = Some(temp_path);
236                cmd
237            }
238            ExecProgram::CachedFile(path) => {
239                _temp_file_guard = None;
240                tokio::process::Command::new(path)
241            }
242        };
243
244        {
245            // Serialize each user parameter as a JSON string for command-line args.
246            let json_params = ctx
247                .params
248                .as_json_values()
249                .expect("params come from database, not wasmtime");
250            assert_eq!(
251                self.user_params.len(),
252                json_params.len(),
253                "type checked in Params::from_json_values"
254            );
255            param_args.extend(json_params.iter().map(|v| {
256                serde_json::to_string(v).expect("serde_json::Value must be serializable")
257            }));
258        }
259        cmd.args(param_args);
260
261        // Clean environment + configured env vars.
262        cmd.env_clear();
263        for env_var in self.env_vars.iter() {
264            cmd.env(&env_var.key, &env_var.val);
265        }
266
267        // Process group and kill_on_drop.
268        #[cfg(unix)]
269        cmd.process_group(0);
270        cmd.kill_on_drop(true);
271
272        // Capture stdout/stderr, optionally pipe stdin.
273        cmd.stdout(std::process::Stdio::piped());
274        cmd.stderr(std::process::Stdio::piped());
275        if self.stdin_content.is_some() {
276            cmd.stdin(std::process::Stdio::piped());
277        }
278
279        // Spawn the child process.
280        trace!("Spawning {cmd:?}");
281        let mut child = cmd.spawn().map_err(|e| {
282            WorkerError::FatalError(
283                FatalError::CannotInstantiate {
284                    reason: "failed to spawn child process".to_string(),
285                    detail: Some(e.to_string()),
286                },
287                version.clone(),
288            )
289        })?;
290
291        // Write stdin content if configured (e.g. resolved secrets).
292        if let Some(ref stdin_content) = self.stdin_content {
293            use tokio::io::AsyncWriteExt;
294            let mut child_stdin = child.stdin.take().expect("stdin was piped");
295            child_stdin
296                .write_all(stdin_content.expose_secret().as_bytes())
297                .await
298                .map_err(|e| {
299                    WorkerError::FatalError(
300                        FatalError::CannotInstantiate {
301                            reason: "failed to write to child stdin".to_string(),
302                            detail: Some(e.to_string()),
303                        },
304                        version.clone(),
305                    )
306                })?;
307            // Drop stdin to signal EOF so the child can proceed.
308            drop(child_stdin);
309        }
310
311        let mut child_stdout = child.stdout.take().expect("stdout was piped");
312        let mut child_stderr = child.stderr.take().expect("stderr was piped");
313
314        // Register cancellation token.
315        let cancel_token = self
316            .cancel_registry
317            .obtain_cancellation_token(ctx.execution_id.clone());
318
319        // Skip stdout collection when return_type is `result` (unit ok and err variants).
320        let max_stdout_bytes = if self.user_return_type.type_wrapper_tl.is_result_of_units() {
321            0
322        } else {
323            self.max_output_bytes
324        };
325        let result = tokio::select! {
326            biased;
327            _signal = cancel_token => {
328                // Either paused or cancelled by CancelRegistry, or timed out by `expired_timers_watcher`
329                // and Sender removed from CancelRegistry using its watcher.
330                debug!("Activity run interrupted, DB must have been updated");
331                // Kill the child once the DB state has already been updated elsewhere.
332                let _ = child.kill().await;
333                return Ok(WorkerResultOk::DbUpdatedByWorkerOrWatcher);
334            }
335            result = async {
336                // Read stdout/stderr concurrently, streaming to log forwarder as chunks arrive.
337                let stdout_fut = read_and_stream(
338                    &mut child_stdout,
339                    max_stdout_bytes,
340                    self.forward_stdout.as_ref(),
341                    &ctx,
342                );
343                let stderr_fut = read_and_stream(
344                    &mut child_stderr,
345                    0, // stderr is only streamed to logs, not captured
346                    self.forward_stderr.as_ref(),
347                    &ctx,
348                );
349                let (stdout_result, stderr_result) = tokio::join!(stdout_fut, stderr_fut);
350                let (mut stdout_bytes, mut stdout_exceeded) = stdout_result?;
351                let _ = stderr_result?;
352                let exit_code = child.wait().await?.code().unwrap_or(-1);
353                // If the unit type was requested, return empty response.
354                if exit_code == 0 && self.user_return_type.type_wrapper_tl.ok.is_none()
355                    || exit_code != 0 && self.user_return_type.type_wrapper_tl.err.is_none()
356                {
357                    stdout_exceeded = false;
358                    stdout_bytes = Vec::new();
359                }
360                Ok::<_, std::io::Error>((stdout_bytes, stdout_exceeded, exit_code))
361            } => {
362                result.map_err(|e| {
363                    WorkerError::FatalError(
364                        FatalError::CannotInstantiate {
365                            reason: "I/O error during child process execution".to_string(),
366                            detail: Some(e.to_string()),
367                        },
368                        version.clone(),
369                    )
370                })?
371            }
372        };
373
374        let (stdout_bytes, stdout_exceeded, exit_code) = result;
375
376        // Check output size limit.
377        if stdout_exceeded {
378            return Err(WorkerError::FatalError(
379                FatalError::CannotInstantiate {
380                    reason: format!(
381                        "stdout exceeded max_output_bytes limit of {} bytes",
382                        self.max_output_bytes
383                    ),
384                    detail: None,
385                },
386                version,
387            ));
388        }
389
390        debug!(
391            exit_code,
392            stdout_len = stdout_bytes.len(),
393            "Child process finished"
394        );
395        let stdout = String::from_utf8_lossy(&stdout_bytes);
396        let parsed = if stdout.trim().is_empty() {
397            None
398        } else {
399            Some(serde_json::from_str::<serde_json::Value>(&stdout).map_err(|e| {
400                WorkerError::FatalError(
401                    FatalError::ResultParsingError(
402                        concepts::ResultParsingError::ResultParsingErrorFromVal(
403                            concepts::ResultParsingErrorFromVal::TypeCheckError(format!(
404                                "failed to parse stdout as JSON on exit {exit_code}: {e}, stdout: `{stdout}`"
405                            )),
406                        ),
407                    ),
408                    version.clone(),
409                )
410            })?)
411        };
412
413        let retval = if exit_code == 0 {
414            crate::js_worker_utils::map_ok_variant(parsed, &self.user_return_type, version.clone())?
415        } else {
416            crate::js_worker_utils::map_err_variant(
417                parsed,
418                &self.user_return_type,
419                version.clone(),
420            )?
421        };
422        Ok(WorkerResultOk::RunFinished(RunFinished {
423            retval,
424            version,
425            http_client_traces: None,
426        }))
427    }
428}
429
430fn forward_output(config: &StdOutputConfigWithSender, output: &[u8], ctx: &WorkerContext) {
431    if output.is_empty() {
432        return;
433    }
434    match config {
435        StdOutputConfigWithSender::Stdout => {
436            use std::io::Write;
437            let _ = std::io::stdout().write_all(output);
438        }
439        StdOutputConfigWithSender::Stderr => {
440            use std::io::Write;
441            let _ = std::io::stderr().write_all(output);
442        }
443        StdOutputConfigWithSender::Db {
444            sender,
445            forwarding_from,
446        } => {
447            let log_entry = concepts::storage::LogEntry::Stream {
448                created_at: chrono::Utc::now(),
449                payload: output.to_vec(),
450                stream_type: *forwarding_from,
451            };
452            let row = LogInfoAppendRow {
453                execution_id: ctx.execution_id.clone(),
454                run_id: ctx.locked_event.run_id,
455                log_entry,
456            };
457            if let Err(err) = sender.try_send(row) {
458                warn!("Failed to forward output to DB: {err}");
459            }
460        }
461    }
462}