Skip to main content

mlua_swarm/worker/
process_spawner.rs

1//! `ProcessSpawner` — a general-purpose `SpawnerAdapter` implementation
2//! that spawns an arbitrary binary (or a one-line shell command) and
3//! runs it as a worker. The thin path for wrapping an agent-block CLI,
4//! an LLM CLI, a random binary, or a shell script as a worker.
5//!
6//! Direct library integration with the `agent-block-core` SDK lives on
7//! a separate axis, in
8//! [`crate::worker::agent_block::AgentBlockInProcessSpawnerFactory`]: the SDK
9//! is embedded in-process, and `bus.emit("worker_result", ...)` is
10//! captured host-side. This spawner's selling point is "call anything
11//! over a shell"; it is not agent-block-specific, and the two paths
12//! have fully separated responsibilities.
13//!
14//! Naming convention: `ProcessSpawner` starts a shell process, and
15//! `AgentBlockInProcessSpawnerFactory` provides direct integration
16//! with the agent-block SDK. Older commits still reference an
17//! "AgentBlockSpawner" — that was renamed to `ProcessSpawner` in the current design
18//! (commit 8d1058f). See mini-app issue `96821965` for the full
19//! rationale.
20//!
21//! # Modes (two flavours)
22//!
23//! **plain mode (default):**
24//! 1. On `spawn`, launch a child process with
25//!    `Command::new(self.program)` + `args`.
26//! 2. Write the directive to the child's stdin (used as the prompt).
27//! 3. Buffer the child's stdout in full.
28//! 4. Try to parse stdout as JSON; on failure wrap it as
29//!    `{"raw": "<text>"}`.
30//! 5. `ok = true` on exit code 0, otherwise `ok = false`.
31//! 6. Emit the `WorkerResult` in parallel via
32//!    `engine.submit_output(Final)` (design intent).
33//!
34//! **streaming mode (`.stream_mode(StreamMode::...)`):**
35//! 1-2. Same as plain mode.
36//! 3. Read the child's stdout **line by line** through a `BufReader`
37//!    for NDJSON — or via a different protocol later.
38//! 4. Parse each chunk as an `OutputEvent`; skip failures.
39//! 5. `engine.submit_output` each successfully-parsed event
40//!    **incrementally**.
41//! 6. When `OutputEvent::Final` arrives, fold its `{content, ok}`
42//!    into the `WorkerResult`.
43//! 7. If EOF is hit without a `Final`, mark the outcome `ok = false`
44//!    (Blocked).
45//!
46//! Only `StreamMode::NdjsonLines` ships today; SSE, length-prefixed,
47//! and friends are carries for future turns.
48//!
49//! Token metadata is also handed to the child as environment variables
50//! so a worker can re-pull if it needs to. `sig_hex` is deliberately
51//! not exported, to keep exposure minimal.
52
53use crate::core::ctx::Ctx;
54use crate::core::engine::Engine;
55use crate::types::{CapToken, TaskId, WorkerId};
56use crate::worker::adapter::{SpawnError, SpawnerAdapter, WorkerError, WorkerResult};
57use crate::worker::output::{ContentRef, OutputEvent};
58use crate::worker::{Worker, WorkerJoinHandler};
59use async_trait::async_trait;
60use serde_json::Value;
61use std::process::Stdio;
62use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
63use tokio::process::Command;
64use tokio::sync::oneshot;
65use tokio_util::sync::CancellationToken;
66
67/// Wire protocol used to receive `OutputEvent`s from the child's
68/// stdout. `None` means plain mode — the default — which buffers stdout
69/// in full and folds it into a single `Final`.
70#[derive(Debug, Clone)]
71pub enum StreamMode {
72    /// One line per `OutputEvent` JSON (newline-delimited JSON).
73    NdjsonLines,
74    /// The `text/event-stream` form. Each event is a `data: <json>`
75    /// line terminated by a blank line. `event:` / `id:` / `retry:`
76    /// lines are ignored (MVP: only `data` lines are picked up).
77    /// Multiple `data` lines are concatenated into a single JSON
78    /// payload.
79    SseEvents,
80    /// Binary form: repeated `[u32 BE length][N bytes JSON payload]`.
81    /// Handy for LLM tools and high-frequency streams that want to
82    /// avoid text-framing overhead.
83    LengthPrefixed,
84}
85
86/// A `SpawnerAdapter` that runs a worker as an external OS process
87/// (a binary or a `sh -c` one-liner). Configured with the builder
88/// methods below, then registered like any other spawner.
89pub struct ProcessSpawner {
90    /// Binary (or `sh`, when built via [`ProcessSpawner::run`]) to
91    /// execute.
92    pub program: String,
93    /// Extra arguments passed to `program`, in order.
94    pub args: Vec<String>,
95    /// Whether to pipe the directive into the child's stdin — most LLM
96    /// CLIs read prompts that way (`--prompt -` and friends). When
97    /// `false`, the directive is appended to `args` instead.
98    pub use_stdin: bool,
99    /// `Some(mode)` — streaming mode. `None` — plain mode (the default).
100    pub stream_mode: Option<StreamMode>,
101}
102
103impl ProcessSpawner {
104    /// Builder entry point: spawn `program` with no args, stdin piping
105    /// on, and plain mode.
106    pub fn new(program: impl Into<String>) -> Self {
107        Self {
108            program: program.into(),
109            args: Vec::new(),
110            use_stdin: true,
111            stream_mode: None,
112        }
113    }
114
115    /// Appends a single argument.
116    pub fn arg(mut self, a: impl Into<String>) -> Self {
117        self.args.push(a.into());
118        self
119    }
120
121    /// Appends multiple arguments at once.
122    pub fn args(mut self, args: impl IntoIterator<Item = impl Into<String>>) -> Self {
123        self.args.extend(args.into_iter().map(|a| a.into()));
124        self
125    }
126
127    /// Sets whether the directive/prompt is piped to the child's stdin
128    /// (`true`, the default) or appended as a trailing arg (`false`).
129    pub fn use_stdin(mut self, v: bool) -> Self {
130        self.use_stdin = v;
131        self
132    }
133
134    /// Set the streaming mode. Default: `None` (plain mode).
135    pub fn stream_mode(mut self, mode: StreamMode) -> Self {
136        self.stream_mode = Some(mode);
137        self
138    }
139
140    /// Reset to plain mode explicitly — sets `stream_mode` to `None`.
141    pub fn plain(mut self) -> Self {
142        self.stream_mode = None;
143        self
144    }
145
146    /// Compatibility helper: `ndjson(true)` is equivalent to
147    /// `.stream_mode(StreamMode::NdjsonLines)`, and `ndjson(false)` to
148    /// `.plain()`. A deprecation candidate, kept around for now.
149    pub fn ndjson(mut self, v: bool) -> Self {
150        self.stream_mode = if v {
151            Some(StreamMode::NdjsonLines)
152        } else {
153            None
154        };
155        self
156    }
157
158    /// Convenience builder that runs a one-liner via `sh -c '<cmd>'`.
159    pub fn run(cmd: impl Into<String>) -> Self {
160        Self {
161            program: "sh".into(),
162            args: vec!["-c".into(), cmd.into()],
163            use_stdin: true,
164            stream_mode: None,
165        }
166    }
167
168    /// Builder that spawns an arbitrary binary directly, without going
169    /// through a shell.
170    pub fn cmd(program: impl Into<String>) -> Self {
171        Self {
172            program: program.into(),
173            args: Vec::new(),
174            use_stdin: true,
175            stream_mode: None,
176        }
177    }
178}
179
180#[async_trait]
181impl SpawnerAdapter for ProcessSpawner {
182    async fn spawn(
183        &self,
184        engine: &Engine,
185        ctx: &Ctx,
186        task_id: TaskId,
187        attempt: u32,
188        token: CapToken,
189    ) -> Result<Box<dyn Worker>, SpawnError> {
190        // design intent: `prompt` is obtained through
191        // `engine.fetch_prompt`, replacing the removed `directive`
192        // argument. `ProcessSpawner` snapshots it here and pushes it
193        // either into the child's stdin or the tail of `args`. If a
194        // child process wants to pull `fetch_prompt` itself, it can
195        // rebuild the token from the `MSE_TOKEN_*` env vars and call
196        // the engine — that lives in a separate spawner implementation.
197        let directive = engine
198            .fetch_prompt(&token, &task_id)
199            .await
200            .map_err(|e| SpawnError::Internal(format!("fetch_prompt: {e}")))?;
201
202        let mut cmd = Command::new(&self.program);
203        cmd.args(&self.args)
204            .env("MSE_TOKEN_AGENT_ID", &token.agent_id)
205            .env("MSE_TOKEN_NONCE", &token.nonce)
206            .env("MSE_TASK_ID", &task_id.0)
207            .env("MSE_ATTEMPT", attempt.to_string())
208            .env("MSE_CTX_AGENT", &ctx.agent)
209            .stdin(Stdio::piped())
210            .stdout(Stdio::piped())
211            .stderr(Stdio::piped());
212
213        if !self.use_stdin {
214            cmd.arg(&directive);
215        }
216
217        let mut child = cmd
218            .spawn()
219            .map_err(|e| SpawnError::Internal(format!("spawn failed: {e}")))?;
220
221        if self.use_stdin {
222            if let Some(mut stdin) = child.stdin.take() {
223                stdin
224                    .write_all(directive.as_bytes())
225                    .await
226                    .map_err(|e| SpawnError::Internal(format!("stdin write: {e}")))?;
227                drop(stdin); // EOF for child
228            }
229        }
230
231        let cancel = CancellationToken::new();
232        let cancel_inner = cancel.clone();
233        let worker_id = WorkerId::new();
234        let (tx, rx) = oneshot::channel();
235        // design intent: hand `engine` / `token` to the spawn task so it can emit
236        // OutputEvent via submit_output (side-by-side with the WorkerResult
237        // oneshot path).
238        let engine_for_emit = engine.clone();
239        let token_for_emit = token.clone();
240        let task_id_for_emit = task_id.clone();
241        let stream_mode = self.stream_mode.clone();
242
243        tokio::spawn(async move {
244            let result: Result<WorkerResult, WorkerError> = if let Some(mode) = stream_mode {
245                // ── streaming mode: read stdout as a chunk stream per protocol,
246                // pushing each chunk to submit_output as an OutputEvent. When we
247                // see a Final, fold {value, ok} into WorkerResult.
248                run_streaming_mode(
249                    mode,
250                    child,
251                    &engine_for_emit,
252                    &token_for_emit,
253                    &task_id_for_emit,
254                    attempt,
255                    cancel_inner,
256                )
257                .await
258            } else {
259                // ── plain mode (default): buffer all stdout, JSON parse
260                // once, fold a single Final, then emit engine.submit_output(Final) in parallel.
261                let result = tokio::select! {
262                    output = child.wait_with_output() => {
263                        match output {
264                            Ok(out) => {
265                                let stdout = String::from_utf8_lossy(&out.stdout).to_string();
266                                let value: Value = serde_json::from_str(stdout.trim())
267                                    .unwrap_or_else(|_| serde_json::json!({
268                                        "raw": stdout.trim_end(),
269                                        "stderr": String::from_utf8_lossy(&out.stderr).to_string(),
270                                    }));
271                                Ok(WorkerResult { value, ok: out.status.success() })
272                            }
273                            Err(e) => Err(WorkerError::Failed(format!("wait_with_output: {e}"))),
274                        }
275                    }
276                    _ = cancel_inner.cancelled() => Err(WorkerError::Cancelled),
277                };
278                if let Ok(wr) = &result {
279                    let ev = OutputEvent::Final {
280                        content: ContentRef::Inline {
281                            value: wr.value.clone(),
282                        },
283                        ok: wr.ok,
284                    };
285                    let _ = engine_for_emit
286                        .submit_output(&token_for_emit, &task_id_for_emit, attempt, ev)
287                        .await;
288                }
289                result
290            };
291            // signal-only: the value travels through output_tail.
292            let signal: Result<(), WorkerError> = result.map(|_| ());
293            let _ = tx.send(signal);
294        });
295
296        Ok(Box::new(ProcessWorker {
297            handler: WorkerJoinHandler {
298                worker_id,
299                cancel,
300                completion: rx,
301            },
302        }))
303    }
304}
305
306/// Concrete Worker type for the Subprocess kind — the handle to a
307/// child OS process's `wait_with_output` / stream wait. Embeds a
308/// `WorkerJoinHandler` to carry the async signal.
309pub struct ProcessWorker {
310    /// The completion-signal handle for this child process's spawned
311    /// wait task.
312    pub handler: WorkerJoinHandler,
313}
314
315#[async_trait]
316impl Worker for ProcessWorker {
317    fn id(&self) -> &WorkerId {
318        &self.handler.worker_id
319    }
320    fn cancel_token(&self) -> CancellationToken {
321        self.handler.cancel.clone()
322    }
323    async fn join(self: Box<Self>) -> Result<(), WorkerError> {
324        self.handler.await_completion().await
325    }
326}
327
328/// Streaming-mode dispatcher. Picks one of the three reader functions
329/// per protocol. Owns the shared boilerplate — final tracking, child
330/// wait, synthetic-final emit, `WorkerResult` construction — so each
331/// reader only has to worry about parsing its protocol and calling
332/// `submit_output` per chunk.
333async fn run_streaming_mode(
334    mode: StreamMode,
335    mut child: tokio::process::Child,
336    engine: &Engine,
337    token: &CapToken,
338    task_id: &TaskId,
339    attempt: u32,
340    cancel: CancellationToken,
341) -> Result<WorkerResult, WorkerError> {
342    let stdout = child
343        .stdout
344        .take()
345        .ok_or_else(|| WorkerError::Failed("streaming: stdout pipe missing".into()))?;
346
347    let last_final = match mode {
348        StreamMode::NdjsonLines => {
349            read_ndjson(stdout, engine, token, task_id, attempt, cancel.clone()).await?
350        }
351        StreamMode::SseEvents => {
352            read_sse(stdout, engine, token, task_id, attempt, cancel.clone()).await?
353        }
354        StreamMode::LengthPrefixed => {
355            read_length_prefixed(stdout, engine, token, task_id, attempt, cancel.clone()).await?
356        }
357    };
358
359    let status = child
360        .wait()
361        .await
362        .map_err(|e| WorkerError::Failed(format!("streaming wait: {e}")))?;
363
364    match last_final {
365        Some((value, ok)) => Ok(WorkerResult {
366            value,
367            ok: ok && status.success(),
368        }),
369        None => {
370            // No Final present: push a synthesized Final so dispatch can pull it from output_tail.
371            let value = serde_json::json!({
372                "raw": "",
373                "note": "streaming mode: no Final event received",
374                "exit_success": status.success(),
375            });
376            let _ = engine
377                .submit_output(
378                    token,
379                    task_id,
380                    attempt,
381                    OutputEvent::Final {
382                        content: ContentRef::Inline {
383                            value: value.clone(),
384                        },
385                        ok: false,
386                    },
387                )
388                .await;
389            Ok(WorkerResult { value, ok: false })
390        }
391    }
392}
393
394/// Shared per-chunk parse + emit path. Called by every reader once it
395/// has recovered an `OutputEvent`.
396async fn forward_event(
397    engine: &Engine,
398    token: &CapToken,
399    task_id: &TaskId,
400    attempt: u32,
401    ev: OutputEvent,
402    last_final: &mut Option<(Value, bool)>,
403) {
404    if let OutputEvent::Final { content, ok } = &ev {
405        let value = match content {
406            ContentRef::Inline { value } => value.clone(),
407            ContentRef::FileRef {
408                path,
409                mime,
410                size_hint,
411            } => serde_json::json!({
412                "file_ref": path.to_string_lossy(),
413                "mime": mime,
414                "size_hint": size_hint,
415            }),
416        };
417        *last_final = Some((value, *ok));
418    }
419    let _ = engine.submit_output(token, task_id, attempt, ev).await;
420}
421
422/// NDJSON: one line per JSON `OutputEvent`. Unparseable lines are
423/// skipped.
424async fn read_ndjson(
425    stdout: tokio::process::ChildStdout,
426    engine: &Engine,
427    token: &CapToken,
428    task_id: &TaskId,
429    attempt: u32,
430    cancel: CancellationToken,
431) -> Result<Option<(Value, bool)>, WorkerError> {
432    let mut reader = BufReader::new(stdout).lines();
433    let mut last_final = None;
434    loop {
435        tokio::select! {
436            line_res = reader.next_line() => match line_res {
437                Ok(Some(line)) => {
438                    let trimmed = line.trim();
439                    if trimmed.is_empty() { continue; }
440                    if let Ok(ev) = serde_json::from_str::<OutputEvent>(trimmed) {
441                        forward_event(engine, token, task_id, attempt, ev, &mut last_final).await;
442                    }
443                }
444                Ok(None) => break,
445                Err(e) => return Err(WorkerError::Failed(format!("ndjson read: {e}"))),
446            },
447            _ = cancel.cancelled() => return Err(WorkerError::Cancelled),
448        }
449    }
450    Ok(last_final)
451}
452
453/// SSE: one event per `data: <json>` line followed by a blank line.
454/// `event:` / `id:` / `retry:` lines are ignored; multiple `data:`
455/// lines are LF-joined into a single JSON payload (a W3C-SSE-spec MVP).
456async fn read_sse(
457    stdout: tokio::process::ChildStdout,
458    engine: &Engine,
459    token: &CapToken,
460    task_id: &TaskId,
461    attempt: u32,
462    cancel: CancellationToken,
463) -> Result<Option<(Value, bool)>, WorkerError> {
464    let mut reader = BufReader::new(stdout).lines();
465    let mut last_final = None;
466    let mut data_buf = String::new();
467    loop {
468        tokio::select! {
469            line_res = reader.next_line() => match line_res {
470                Ok(Some(line)) => {
471                    if line.is_empty() {
472                        // Empty line = event terminator, so flush.
473                        if !data_buf.is_empty() {
474                            if let Ok(ev) = serde_json::from_str::<OutputEvent>(data_buf.trim()) {
475                                forward_event(engine, token, task_id, attempt, ev, &mut last_final).await;
476                            }
477                            data_buf.clear();
478                        }
479                    } else if let Some(rest) = line.strip_prefix("data:") {
480                        // SSE spec: optional space after colon
481                        let payload = rest.strip_prefix(' ').unwrap_or(rest);
482                        if !data_buf.is_empty() {
483                            data_buf.push('\n');
484                        }
485                        data_buf.push_str(payload);
486                    }
487                    // else: event: / id: / retry: / comment line → skip
488                }
489                Ok(None) => {
490                    // EOF: flush any leftover data_buf as the final event.
491                    if !data_buf.is_empty() {
492                        if let Ok(ev) = serde_json::from_str::<OutputEvent>(data_buf.trim()) {
493                            forward_event(engine, token, task_id, attempt, ev, &mut last_final).await;
494                        }
495                    }
496                    break;
497                }
498                Err(e) => return Err(WorkerError::Failed(format!("sse read: {e}"))),
499            },
500            _ = cancel.cancelled() => return Err(WorkerError::Cancelled),
501        }
502    }
503    Ok(last_final)
504}
505
506/// Length-prefixed: repeated `[u32 BE length][N bytes JSON payload]`
507/// binary frames.
508async fn read_length_prefixed(
509    mut stdout: tokio::process::ChildStdout,
510    engine: &Engine,
511    token: &CapToken,
512    task_id: &TaskId,
513    attempt: u32,
514    cancel: CancellationToken,
515) -> Result<Option<(Value, bool)>, WorkerError> {
516    use tokio::io::AsyncReadExt;
517    let mut last_final = None;
518    loop {
519        // Read the 4-byte length prefix (racing against cancel via select).
520        let mut len_buf = [0u8; 4];
521        let read_fut = stdout.read_exact(&mut len_buf);
522        let read_res = tokio::select! {
523            r = read_fut => r,
524            _ = cancel.cancelled() => return Err(WorkerError::Cancelled),
525        };
526        match read_res {
527            Ok(_) => {}
528            Err(e) if e.kind() == std::io::ErrorKind::UnexpectedEof => break, // clean EOF
529            Err(e) => return Err(WorkerError::Failed(format!("len read: {e}"))),
530        }
531        let len = u32::from_be_bytes(len_buf) as usize;
532        if len == 0 || len > 16 * 1024 * 1024 {
533            // 0 or > 16 MiB is treated as a frame error; break out.
534            break;
535        }
536        let mut payload = vec![0u8; len];
537        let read_fut = stdout.read_exact(&mut payload);
538        let read_res = tokio::select! {
539            r = read_fut => r,
540            _ = cancel.cancelled() => return Err(WorkerError::Cancelled),
541        };
542        if read_res.is_err() {
543            break;
544        }
545        if let Ok(ev) = serde_json::from_slice::<OutputEvent>(&payload) {
546            forward_event(engine, token, task_id, attempt, ev, &mut last_final).await;
547        }
548    }
549    Ok(last_final)
550}