mlua_swarm/worker/process_spawner.rs
1//! `ProcessSpawner` — a general-purpose `SpawnerAdapter` implementation
2//! that spawns an arbitrary binary (or a one-line shell command) and
3//! runs it as a worker. The thin path for wrapping an agent-block CLI,
4//! an LLM CLI, a random binary, or a shell script as a worker.
5//!
6//! Direct library integration with the `agent-block-core` SDK lives on
7//! a separate axis, in
8//! [`crate::worker::agent_block::AgentBlockInProcessSpawnerFactory`]: the SDK
9//! is embedded in-process, and `bus.emit("worker_result", ...)` is
10//! captured host-side. This spawner's selling point is "call anything
11//! over a shell"; it is not agent-block-specific, and the two paths
12//! have fully separated responsibilities.
13//!
14//! Naming convention: `ProcessSpawner` starts a shell process, and
15//! `AgentBlockInProcessSpawnerFactory` provides direct integration
16//! with the agent-block SDK. Older commits still reference an
17//! "AgentBlockSpawner" — that was renamed to `ProcessSpawner` in the current design
18//! (commit 8d1058f). See mini-app issue `96821965` for the full
19//! rationale.
20//!
21//! # Modes (two flavours)
22//!
23//! **plain mode (default):**
24//! 1. On `spawn`, launch a child process with
25//! `Command::new(self.program)` + `args`.
26//! 2. Write the directive to the child's stdin (used as the prompt).
27//! 3. Buffer the child's stdout in full.
28//! 4. Try to parse stdout as JSON; on failure wrap it as
29//! `{"raw": "<text>"}`.
30//! 5. `ok = true` on exit code 0, otherwise `ok = false`.
31//! 6. Emit the `WorkerResult` in parallel via
32//! `engine.submit_output(Final)` (design intent).
33//!
34//! **streaming mode (`.stream_mode(StreamMode::...)`):**
35//! 1-2. Same as plain mode.
36//! 3. Read the child's stdout **line by line** through a `BufReader`
37//! for NDJSON — or via a different protocol later.
38//! 4. Parse each chunk as an `OutputEvent`; skip failures.
39//! 5. `engine.submit_output` each successfully-parsed event
40//! **incrementally**.
41//! 6. When `OutputEvent::Final` arrives, fold its `{content, ok}`
42//! into the `WorkerResult`.
43//! 7. If EOF is hit without a `Final`, mark the outcome `ok = false`
44//! (Blocked).
45//!
46//! Only `StreamMode::NdjsonLines` ships today; SSE, length-prefixed,
47//! and friends are carries for future turns.
48//!
49//! Token metadata is also handed to the child as environment variables
50//! so a worker can re-pull if it needs to. `sig_hex` is deliberately
51//! not exported, to keep exposure minimal.
52
53use crate::core::ctx::Ctx;
54use crate::core::engine::Engine;
55use crate::types::{CapToken, TaskId, WorkerId};
56use crate::worker::adapter::{SpawnError, SpawnerAdapter, WorkerError, WorkerResult};
57use crate::worker::output::{ContentRef, OutputEvent};
58use crate::worker::{Worker, WorkerJoinHandler};
59use async_trait::async_trait;
60use serde_json::Value;
61use std::process::Stdio;
62use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
63use tokio::process::Command;
64use tokio::sync::oneshot;
65use tokio_util::sync::CancellationToken;
66
67/// Wire protocol used to receive `OutputEvent`s from the child's
68/// stdout. `None` means plain mode — the default — which buffers stdout
69/// in full and folds it into a single `Final`.
70#[derive(Debug, Clone)]
71pub enum StreamMode {
72 /// One line per `OutputEvent` JSON (newline-delimited JSON).
73 NdjsonLines,
74 /// The `text/event-stream` form. Each event is a `data: <json>`
75 /// line terminated by a blank line. `event:` / `id:` / `retry:`
76 /// lines are ignored (MVP: only `data` lines are picked up).
77 /// Multiple `data` lines are concatenated into a single JSON
78 /// payload.
79 SseEvents,
80 /// Binary form: repeated `[u32 BE length][N bytes JSON payload]`.
81 /// Handy for LLM tools and high-frequency streams that want to
82 /// avoid text-framing overhead.
83 LengthPrefixed,
84}
85
86/// A `SpawnerAdapter` that runs a worker as an external OS process
87/// (a binary or a `sh -c` one-liner). Configured with the builder
88/// methods below, then registered like any other spawner.
89pub struct ProcessSpawner {
90 /// Binary (or `sh`, when built via [`ProcessSpawner::run`]) to
91 /// execute.
92 pub program: String,
93 /// Extra arguments passed to `program`, in order.
94 pub args: Vec<String>,
95 /// Whether to pipe the directive into the child's stdin — most LLM
96 /// CLIs read prompts that way (`--prompt -` and friends). When
97 /// `false`, the directive is appended to `args` instead.
98 pub use_stdin: bool,
99 /// `Some(mode)` — streaming mode. `None` — plain mode (the default).
100 pub stream_mode: Option<StreamMode>,
101}
102
103impl ProcessSpawner {
104 /// Builder entry point: spawn `program` with no args, stdin piping
105 /// on, and plain mode.
106 pub fn new(program: impl Into<String>) -> Self {
107 Self {
108 program: program.into(),
109 args: Vec::new(),
110 use_stdin: true,
111 stream_mode: None,
112 }
113 }
114
115 /// Appends a single argument.
116 pub fn arg(mut self, a: impl Into<String>) -> Self {
117 self.args.push(a.into());
118 self
119 }
120
121 /// Appends multiple arguments at once.
122 pub fn args(mut self, args: impl IntoIterator<Item = impl Into<String>>) -> Self {
123 self.args.extend(args.into_iter().map(|a| a.into()));
124 self
125 }
126
127 /// Sets whether the directive/prompt is piped to the child's stdin
128 /// (`true`, the default) or appended as a trailing arg (`false`).
129 pub fn use_stdin(mut self, v: bool) -> Self {
130 self.use_stdin = v;
131 self
132 }
133
134 /// Set the streaming mode. Default: `None` (plain mode).
135 pub fn stream_mode(mut self, mode: StreamMode) -> Self {
136 self.stream_mode = Some(mode);
137 self
138 }
139
140 /// Reset to plain mode explicitly — sets `stream_mode` to `None`.
141 pub fn plain(mut self) -> Self {
142 self.stream_mode = None;
143 self
144 }
145
146 /// Compatibility helper: `ndjson(true)` is equivalent to
147 /// `.stream_mode(StreamMode::NdjsonLines)`, and `ndjson(false)` to
148 /// `.plain()`. A deprecation candidate, kept around for now.
149 pub fn ndjson(mut self, v: bool) -> Self {
150 self.stream_mode = if v {
151 Some(StreamMode::NdjsonLines)
152 } else {
153 None
154 };
155 self
156 }
157
158 /// Convenience builder that runs a one-liner via `sh -c '<cmd>'`.
159 pub fn run(cmd: impl Into<String>) -> Self {
160 Self {
161 program: "sh".into(),
162 args: vec!["-c".into(), cmd.into()],
163 use_stdin: true,
164 stream_mode: None,
165 }
166 }
167
168 /// Builder that spawns an arbitrary binary directly, without going
169 /// through a shell.
170 pub fn cmd(program: impl Into<String>) -> Self {
171 Self {
172 program: program.into(),
173 args: Vec::new(),
174 use_stdin: true,
175 stream_mode: None,
176 }
177 }
178}
179
180#[async_trait]
181impl SpawnerAdapter for ProcessSpawner {
182 async fn spawn(
183 &self,
184 engine: &Engine,
185 ctx: &Ctx,
186 task_id: TaskId,
187 attempt: u32,
188 token: CapToken,
189 ) -> Result<Box<dyn Worker>, SpawnError> {
190 // design intent: `prompt` is obtained through
191 // `engine.fetch_prompt`, replacing the removed `directive`
192 // argument. `ProcessSpawner` snapshots it here and pushes it
193 // either into the child's stdin or the tail of `args`. If a
194 // child process wants to pull `fetch_prompt` itself, it can
195 // rebuild the token from the `MSE_TOKEN_*` env vars and call
196 // the engine — that lives in a separate spawner implementation.
197 let directive = engine
198 .fetch_prompt(&token, &task_id)
199 .await
200 .map_err(|e| SpawnError::Internal(format!("fetch_prompt: {e}")))?;
201
202 let mut cmd = Command::new(&self.program);
203 cmd.args(&self.args)
204 .env("MSE_TOKEN_AGENT_ID", &token.agent_id)
205 .env("MSE_TOKEN_NONCE", &token.nonce)
206 .env("MSE_TASK_ID", &task_id.0)
207 .env("MSE_ATTEMPT", attempt.to_string())
208 .env("MSE_CTX_AGENT", &ctx.agent)
209 .stdin(Stdio::piped())
210 .stdout(Stdio::piped())
211 .stderr(Stdio::piped());
212
213 if !self.use_stdin {
214 cmd.arg(&directive);
215 }
216
217 let mut child = cmd
218 .spawn()
219 .map_err(|e| SpawnError::Internal(format!("spawn failed: {e}")))?;
220
221 if self.use_stdin {
222 if let Some(mut stdin) = child.stdin.take() {
223 stdin
224 .write_all(directive.as_bytes())
225 .await
226 .map_err(|e| SpawnError::Internal(format!("stdin write: {e}")))?;
227 drop(stdin); // EOF for child
228 }
229 }
230
231 let cancel = CancellationToken::new();
232 let cancel_inner = cancel.clone();
233 let worker_id = WorkerId::new();
234 let (tx, rx) = oneshot::channel();
235 // design intent: hand `engine` / `token` to the spawn task so it can emit
236 // OutputEvent via submit_output (side-by-side with the WorkerResult
237 // oneshot path).
238 let engine_for_emit = engine.clone();
239 let token_for_emit = token.clone();
240 let task_id_for_emit = task_id.clone();
241 let stream_mode = self.stream_mode.clone();
242
243 tokio::spawn(async move {
244 let result: Result<WorkerResult, WorkerError> = if let Some(mode) = stream_mode {
245 // ── streaming mode: read stdout as a chunk stream per protocol,
246 // pushing each chunk to submit_output as an OutputEvent. When we
247 // see a Final, fold {value, ok} into WorkerResult.
248 run_streaming_mode(
249 mode,
250 child,
251 &engine_for_emit,
252 &token_for_emit,
253 &task_id_for_emit,
254 attempt,
255 cancel_inner,
256 )
257 .await
258 } else {
259 // ── plain mode (default): buffer all stdout, JSON parse
260 // once, fold a single Final, then emit engine.submit_output(Final) in parallel.
261 let result = tokio::select! {
262 output = child.wait_with_output() => {
263 match output {
264 Ok(out) => {
265 let stdout = String::from_utf8_lossy(&out.stdout).to_string();
266 let value: Value = serde_json::from_str(stdout.trim())
267 .unwrap_or_else(|_| serde_json::json!({
268 "raw": stdout.trim_end(),
269 "stderr": String::from_utf8_lossy(&out.stderr).to_string(),
270 }));
271 Ok(WorkerResult { value, ok: out.status.success() })
272 }
273 Err(e) => Err(WorkerError::Failed(format!("wait_with_output: {e}"))),
274 }
275 }
276 _ = cancel_inner.cancelled() => Err(WorkerError::Cancelled),
277 };
278 if let Ok(wr) = &result {
279 let ev = OutputEvent::Final {
280 content: ContentRef::Inline {
281 value: wr.value.clone(),
282 },
283 ok: wr.ok,
284 };
285 let _ = engine_for_emit
286 .submit_output(&token_for_emit, &task_id_for_emit, attempt, ev)
287 .await;
288 }
289 result
290 };
291 // signal-only: the value travels through output_tail.
292 let signal: Result<(), WorkerError> = result.map(|_| ());
293 let _ = tx.send(signal);
294 });
295
296 Ok(Box::new(ProcessWorker {
297 handler: WorkerJoinHandler {
298 worker_id,
299 cancel,
300 completion: rx,
301 },
302 }))
303 }
304}
305
306/// Concrete Worker type for the Subprocess kind — the handle to a
307/// child OS process's `wait_with_output` / stream wait. Embeds a
308/// `WorkerJoinHandler` to carry the async signal.
309pub struct ProcessWorker {
310 /// The completion-signal handle for this child process's spawned
311 /// wait task.
312 pub handler: WorkerJoinHandler,
313}
314
315#[async_trait]
316impl Worker for ProcessWorker {
317 fn id(&self) -> &WorkerId {
318 &self.handler.worker_id
319 }
320 fn cancel_token(&self) -> CancellationToken {
321 self.handler.cancel.clone()
322 }
323 async fn join(self: Box<Self>) -> Result<(), WorkerError> {
324 self.handler.await_completion().await
325 }
326}
327
328/// Streaming-mode dispatcher. Picks one of the three reader functions
329/// per protocol. Owns the shared boilerplate — final tracking, child
330/// wait, synthetic-final emit, `WorkerResult` construction — so each
331/// reader only has to worry about parsing its protocol and calling
332/// `submit_output` per chunk.
333async fn run_streaming_mode(
334 mode: StreamMode,
335 mut child: tokio::process::Child,
336 engine: &Engine,
337 token: &CapToken,
338 task_id: &TaskId,
339 attempt: u32,
340 cancel: CancellationToken,
341) -> Result<WorkerResult, WorkerError> {
342 let stdout = child
343 .stdout
344 .take()
345 .ok_or_else(|| WorkerError::Failed("streaming: stdout pipe missing".into()))?;
346
347 let last_final = match mode {
348 StreamMode::NdjsonLines => {
349 read_ndjson(stdout, engine, token, task_id, attempt, cancel.clone()).await?
350 }
351 StreamMode::SseEvents => {
352 read_sse(stdout, engine, token, task_id, attempt, cancel.clone()).await?
353 }
354 StreamMode::LengthPrefixed => {
355 read_length_prefixed(stdout, engine, token, task_id, attempt, cancel.clone()).await?
356 }
357 };
358
359 let status = child
360 .wait()
361 .await
362 .map_err(|e| WorkerError::Failed(format!("streaming wait: {e}")))?;
363
364 match last_final {
365 Some((value, ok)) => Ok(WorkerResult {
366 value,
367 ok: ok && status.success(),
368 }),
369 None => {
370 // No Final present: push a synthesized Final so dispatch can pull it from output_tail.
371 let value = serde_json::json!({
372 "raw": "",
373 "note": "streaming mode: no Final event received",
374 "exit_success": status.success(),
375 });
376 let _ = engine
377 .submit_output(
378 token,
379 task_id,
380 attempt,
381 OutputEvent::Final {
382 content: ContentRef::Inline {
383 value: value.clone(),
384 },
385 ok: false,
386 },
387 )
388 .await;
389 Ok(WorkerResult { value, ok: false })
390 }
391 }
392}
393
394/// Shared per-chunk parse + emit path. Called by every reader once it
395/// has recovered an `OutputEvent`.
396async fn forward_event(
397 engine: &Engine,
398 token: &CapToken,
399 task_id: &TaskId,
400 attempt: u32,
401 ev: OutputEvent,
402 last_final: &mut Option<(Value, bool)>,
403) {
404 if let OutputEvent::Final { content, ok } = &ev {
405 let value = match content {
406 ContentRef::Inline { value } => value.clone(),
407 ContentRef::FileRef {
408 path,
409 mime,
410 size_hint,
411 } => serde_json::json!({
412 "file_ref": path.to_string_lossy(),
413 "mime": mime,
414 "size_hint": size_hint,
415 }),
416 };
417 *last_final = Some((value, *ok));
418 }
419 let _ = engine.submit_output(token, task_id, attempt, ev).await;
420}
421
422/// NDJSON: one line per JSON `OutputEvent`. Unparseable lines are
423/// skipped.
424async fn read_ndjson(
425 stdout: tokio::process::ChildStdout,
426 engine: &Engine,
427 token: &CapToken,
428 task_id: &TaskId,
429 attempt: u32,
430 cancel: CancellationToken,
431) -> Result<Option<(Value, bool)>, WorkerError> {
432 let mut reader = BufReader::new(stdout).lines();
433 let mut last_final = None;
434 loop {
435 tokio::select! {
436 line_res = reader.next_line() => match line_res {
437 Ok(Some(line)) => {
438 let trimmed = line.trim();
439 if trimmed.is_empty() { continue; }
440 if let Ok(ev) = serde_json::from_str::<OutputEvent>(trimmed) {
441 forward_event(engine, token, task_id, attempt, ev, &mut last_final).await;
442 }
443 }
444 Ok(None) => break,
445 Err(e) => return Err(WorkerError::Failed(format!("ndjson read: {e}"))),
446 },
447 _ = cancel.cancelled() => return Err(WorkerError::Cancelled),
448 }
449 }
450 Ok(last_final)
451}
452
453/// SSE: one event per `data: <json>` line followed by a blank line.
454/// `event:` / `id:` / `retry:` lines are ignored; multiple `data:`
455/// lines are LF-joined into a single JSON payload (a W3C-SSE-spec MVP).
456async fn read_sse(
457 stdout: tokio::process::ChildStdout,
458 engine: &Engine,
459 token: &CapToken,
460 task_id: &TaskId,
461 attempt: u32,
462 cancel: CancellationToken,
463) -> Result<Option<(Value, bool)>, WorkerError> {
464 let mut reader = BufReader::new(stdout).lines();
465 let mut last_final = None;
466 let mut data_buf = String::new();
467 loop {
468 tokio::select! {
469 line_res = reader.next_line() => match line_res {
470 Ok(Some(line)) => {
471 if line.is_empty() {
472 // Empty line = event terminator, so flush.
473 if !data_buf.is_empty() {
474 if let Ok(ev) = serde_json::from_str::<OutputEvent>(data_buf.trim()) {
475 forward_event(engine, token, task_id, attempt, ev, &mut last_final).await;
476 }
477 data_buf.clear();
478 }
479 } else if let Some(rest) = line.strip_prefix("data:") {
480 // SSE spec: optional space after colon
481 let payload = rest.strip_prefix(' ').unwrap_or(rest);
482 if !data_buf.is_empty() {
483 data_buf.push('\n');
484 }
485 data_buf.push_str(payload);
486 }
487 // else: event: / id: / retry: / comment line → skip
488 }
489 Ok(None) => {
490 // EOF: flush any leftover data_buf as the final event.
491 if !data_buf.is_empty() {
492 if let Ok(ev) = serde_json::from_str::<OutputEvent>(data_buf.trim()) {
493 forward_event(engine, token, task_id, attempt, ev, &mut last_final).await;
494 }
495 }
496 break;
497 }
498 Err(e) => return Err(WorkerError::Failed(format!("sse read: {e}"))),
499 },
500 _ = cancel.cancelled() => return Err(WorkerError::Cancelled),
501 }
502 }
503 Ok(last_final)
504}
505
506/// Length-prefixed: repeated `[u32 BE length][N bytes JSON payload]`
507/// binary frames.
508async fn read_length_prefixed(
509 mut stdout: tokio::process::ChildStdout,
510 engine: &Engine,
511 token: &CapToken,
512 task_id: &TaskId,
513 attempt: u32,
514 cancel: CancellationToken,
515) -> Result<Option<(Value, bool)>, WorkerError> {
516 use tokio::io::AsyncReadExt;
517 let mut last_final = None;
518 loop {
519 // Read the 4-byte length prefix (racing against cancel via select).
520 let mut len_buf = [0u8; 4];
521 let read_fut = stdout.read_exact(&mut len_buf);
522 let read_res = tokio::select! {
523 r = read_fut => r,
524 _ = cancel.cancelled() => return Err(WorkerError::Cancelled),
525 };
526 match read_res {
527 Ok(_) => {}
528 Err(e) if e.kind() == std::io::ErrorKind::UnexpectedEof => break, // clean EOF
529 Err(e) => return Err(WorkerError::Failed(format!("len read: {e}"))),
530 }
531 let len = u32::from_be_bytes(len_buf) as usize;
532 if len == 0 || len > 16 * 1024 * 1024 {
533 // 0 or > 16 MiB is treated as a frame error; break out.
534 break;
535 }
536 let mut payload = vec![0u8; len];
537 let read_fut = stdout.read_exact(&mut payload);
538 let read_res = tokio::select! {
539 r = read_fut => r,
540 _ = cancel.cancelled() => return Err(WorkerError::Cancelled),
541 };
542 if read_res.is_err() {
543 break;
544 }
545 if let Ok(ev) = serde_json::from_slice::<OutputEvent>(&payload) {
546 forward_event(engine, token, task_id, attempt, ev, &mut last_final).await;
547 }
548 }
549 Ok(last_final)
550}