mlua_swarm/worker/adapter.rs
1//! The second stage of the two-stage pipeline: `SpawnerAdapter`.
2//!
3//! From the engine's viewpoint there is only one trait,
4//! `SpawnerAdapter`; its `spawn` returns `Box<dyn Worker>` (see
5//! `crate::worker::Worker`). Worker shape is an implementation detail of
6//! each spawner; the engine only touches Workers through three
7//! operations — `id()` / `cancel_token()` / `join()`.
8//!
9//! The old `WorkerAdapter` trait and `InProcWorker` struct — which
10//! assumed a three-stage `Spawner.spawn → WorkerAdapter → invoke`
11//! pipeline — were removed on this turn. Nothing instantiated or
12//! dispatched them (dead code), and the multi-invocation path from
13//! was collapsed in the implementation anyway.
14//! The interface is now consolidated into the new `trait Worker` in
15//! `src/worker.rs`.
16
17use crate::core::ctx::Ctx;
18use crate::core::engine::Engine;
19use crate::types::{CapToken, TaskId};
20use crate::worker::Worker;
21use async_trait::async_trait;
22use serde_json::Value;
23use std::collections::HashMap;
24use std::future::Future;
25use std::pin::Pin;
26use std::sync::Arc;
27use thiserror::Error;
28
29/// Errors that can occur while `SpawnerAdapter::spawn` is setting up a
30/// worker, before the worker itself starts running.
31#[derive(Debug, Error)]
32pub enum SpawnError {
33 /// No `WorkerFn` is registered for the requested agent name.
34 #[error("worker not registered: {0}")]
35 NotRegistered(String),
36 /// A middleware layer vetoed the spawn (e.g. capability check, rate
37 /// limit, policy gate).
38 #[error("spawn rejected by middleware: {0}")]
39 RejectedByMiddleware(String),
40 /// Any other setup failure (e.g. `fetch_prompt` failed).
41 #[error("internal: {0}")]
42 Internal(String),
43}
44
45/// Errors surfaced once a worker is running, via `Worker::join`.
46#[derive(Debug, Error)]
47pub enum WorkerError {
48 /// The worker fn itself returned an error.
49 #[error("worker fn returned error: {0}")]
50 Failed(String),
51 /// The worker was cancelled through its `CancellationToken`.
52 #[error("cancelled")]
53 Cancelled,
54}
55
56/// The value a `WorkerFn` hands back on success, folded into an
57/// `OutputEvent::Final` by the spawner.
58#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
59pub struct WorkerResult {
60 /// The worker fn's output payload.
61 pub value: Value,
62 /// Whether the agent itself considers this a successful result
63 /// (distinct from `Result::Err` — a worker fn can return `Ok(..)`
64 /// with `ok: false` to signal an agent-level failure).
65 pub ok: bool,
66}
67
68/// First stage of the two-stage pipeline: builds a `Box<dyn Worker>` for
69/// one attempt. Every concrete spawner (`InProcSpawner`, `ProcessSpawner`,
70/// the Operator spawner) implements this; the engine only ever holds a
71/// `Arc<dyn SpawnerAdapter>` and knows nothing about the Worker shape
72/// behind it.
73#[async_trait]
74pub trait SpawnerAdapter: Send + Sync {
75 /// Spawn one attempt as a worker. Returns `Box<dyn Worker>`.
76 ///
77 /// The `directive` argument was removed in design intent: prompts are
78 /// pulled on demand through
79 /// `engine.fetch_prompt(token, task_id, attempt)`. Spawners are free
80 /// to use whatever protocol they like internally — push, pull, or a
81 /// hybrid. `ProcessSpawner` runs `fetch_prompt` and pushes the
82 /// result into the child's stdin; `InProcSpawner` injects a prep
83 /// snapshot as `WorkerInvocation.prompt`; a child process could
84 /// even re-pull with the token itself.
85 async fn spawn(
86 &self,
87 engine: &Engine,
88 ctx: &Ctx,
89 task_id: TaskId,
90 attempt: u32,
91 token: CapToken,
92 ) -> Result<Box<dyn Worker>, SpawnError>;
93}
94
95// ─── InProcSpawner ────────────────────────────────────────────────────────
96
97/// Invocation context handed to a Worker fn. Bundles `token` +
98/// `task_id` + `prompt` + `sink`.
99///
100/// The `prompt` field was added in design intent, folding the old
101/// `Fn(inv, directive)` `directive` argument into the invocation. The
102/// spawner is expected to call
103/// `engine.fetch_prompt(token, task_id, attempt)` in its prep step and
104/// inject the snapshot into the invocation (push form). The `WorkerFn`
105/// side may still re-pull if it needs to — for example to fetch the
106/// prompt for a different attempt.
107///
108/// The `sink` field was added in design intent as the formal contract for
109/// the spawner's intake surface. A worker fn can stream intermediate
110/// events with things like
111/// `inv.sink.emit(OutputEvent::Progress { .. })`. Child-process
112/// spawners (`ProcessSpawner`, etc.) do not use `sink` — the child
113/// speaks the stdout protocol; `InProcSpawner` injects one. Even
114/// without `sink`, the `WorkerResult` returned by the fn is still
115/// folded into a `Final` event on the spawner side, running alongside
116/// the older return-value path.
117#[derive(Clone)]
118pub struct WorkerInvocation {
119 /// Capability token authorizing this attempt.
120 pub token: CapToken,
121 /// The task this invocation belongs to.
122 pub task_id: TaskId,
123 /// Attempt number within the task (used to key output events).
124 pub attempt: u32,
125 /// Registered agent name the `WorkerFn` was looked up under.
126 pub agent: String,
127 /// The prompt/prep snapshot pulled via `engine.fetch_prompt`,
128 /// injected here (push form) so the worker fn does not need to call
129 /// back into the engine for the common case.
130 pub prompt: String,
131 /// Intake: sink the worker fn uses to emit intermediate
132 /// `OutputEvent`s. Injected by `InProcSpawner`. `None` means the
133 /// sink path is not wired for this invocation.
134 pub sink: Option<std::sync::Arc<dyn crate::worker::output::OutputSink>>,
135 /// Upstream task cancel token — the clone of `cancel_inner`
136 /// generated by `InProcSpawner` for `JoinHandleWorker`. Worker fns
137 /// bridge this to their child futures or their SDK's
138 /// `shutdown_token`, propagating external cancellation all the way
139 /// down. `None` — like `sink` above — means the caller path is not
140 /// carrying the cancel channel.
141 pub cancel_token: Option<tokio_util::sync::CancellationToken>,
142}
143
144impl std::fmt::Debug for WorkerInvocation {
145 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
146 f.debug_struct("WorkerInvocation")
147 .field("token", &self.token)
148 .field("task_id", &self.task_id)
149 .field("attempt", &self.attempt)
150 .field("agent", &self.agent)
151 .field("prompt", &self.prompt)
152 .field("sink", &self.sink.as_ref().map(|_| "<OutputSink>"))
153 .field(
154 "cancel_token",
155 &self.cancel_token.as_ref().map(|_| "<CancellationToken>"),
156 )
157 .finish()
158 }
159}
160
161/// A registered agent implementation: takes a `WorkerInvocation` and
162/// resolves to a `WorkerResult` (or a `WorkerError`). Boxed as a
163/// type-erased `Future` so heterogeneous agent implementations (async
164/// fns, closures capturing state, etc.) can share one registry entry
165/// type.
166pub type WorkerFn = Arc<
167 dyn Fn(
168 WorkerInvocation,
169 ) -> Pin<Box<dyn Future<Output = Result<WorkerResult, WorkerError>> + Send>>
170 + Send
171 + Sync,
172>;
173
174/// `agent`-string → `WorkerFn` registry. The generic parameter `W` pins
175/// the per-kind Worker concrete type at the type level, so AgentBlock /
176/// Lua / RustFn each produce their own Worker type through
177/// `InProcSpawner<W>` and the type binding is preserved right up until
178/// `SpawnerAdapter::spawn()` erases the return as `Box<dyn Worker>`.
179/// `W` must be constructible from `WorkerJoinHandler` via `From` — i.e.
180/// a newtype that embeds the async-signal handle.
181pub struct InProcSpawner<W = crate::worker::MiddlewareWorker> {
182 /// Agent name → implementation lookup table.
183 pub registry: HashMap<String, WorkerFn>,
184 _phantom: std::marker::PhantomData<W>,
185}
186
187// Inherent impl for the default W = MiddlewareWorker (so `InProcSpawner::new()`
188// in existing tests picks this default).
189impl InProcSpawner {
190 /// Creates an empty registry, defaulting the Worker type to
191 /// `MiddlewareWorker` (used by existing call sites and tests).
192 pub fn new() -> Self {
193 Self {
194 registry: HashMap::new(),
195 _phantom: std::marker::PhantomData,
196 }
197 }
198
199 /// Registers a `WorkerFn`-shaped async closure under `agent`,
200 /// overwriting any previous registration for the same name. Returns
201 /// `&mut Self` for chained registration calls.
202 pub fn register<F, Fut>(&mut self, agent: impl Into<String>, f: F) -> &mut Self
203 where
204 F: Fn(WorkerInvocation) -> Fut + Send + Sync + 'static,
205 Fut: Future<Output = Result<WorkerResult, WorkerError>> + Send + 'static,
206 {
207 let f = Arc::new(f);
208 let wrapped: WorkerFn = Arc::new(move |inv| {
209 let f = f.clone();
210 Box::pin(f(inv))
211 });
212 self.registry.insert(agent.into(), wrapped);
213 self
214 }
215}
216
217// Generic typed impl (the factory.build path that constructs a per-kind Worker).
218impl<W> InProcSpawner<W>
219where
220 W: Worker + From<crate::worker::WorkerJoinHandler> + Send + Sync + 'static,
221{
222 /// Creates an empty registry pinned to Worker type `W` (the
223 /// `factory.build` path uses this to get a per-kind Worker out of
224 /// `spawn()` instead of the default `MiddlewareWorker`).
225 pub fn typed() -> Self {
226 Self {
227 registry: HashMap::new(),
228 _phantom: std::marker::PhantomData,
229 }
230 }
231}
232
233impl Default for InProcSpawner {
234 fn default() -> Self {
235 Self::new()
236 }
237}
238
239#[async_trait]
240impl<W: Worker + From<crate::worker::WorkerJoinHandler> + Send + Sync + 'static> SpawnerAdapter
241 for InProcSpawner<W>
242{
243 async fn spawn(
244 &self,
245 engine: &Engine,
246 ctx: &Ctx,
247 task_id: TaskId,
248 attempt: u32,
249 token: CapToken,
250 ) -> Result<Box<dyn Worker>, SpawnError> {
251 let f = self
252 .registry
253 .get(&ctx.agent)
254 .cloned()
255 .ok_or_else(|| SpawnError::NotRegistered(ctx.agent.clone()))?;
256
257 // design intent: prompts are pulled via engine.fetch_prompt (the directive argument is retired)
258 let prompt = engine
259 .fetch_prompt(&token, &task_id)
260 .await
261 .map_err(|e| SpawnError::Internal(format!("fetch_prompt: {e}")))?;
262
263 let (tx, rx) = tokio::sync::oneshot::channel();
264 let cancel = tokio_util::sync::CancellationToken::new();
265 let cancel_inner = cancel.clone();
266 let worker_id = crate::types::WorkerId::new();
267 // design intent: hand `engine` / `token` to the spawn task so it can emit
268 // OutputEvent::Final via submit_output (side-by-side with the
269 // WorkerResult oneshot path).
270 let engine_for_emit = engine.clone();
271 let token_for_emit = token.clone();
272 let task_id_for_emit = task_id.clone();
273 // Wire the receiving end by injecting an EngineSink into WorkerInvocation.sink.
274 let sink = std::sync::Arc::new(crate::worker::output::EngineSink::new(
275 engine.clone(),
276 token.clone(),
277 task_id.clone(),
278 attempt,
279 )) as std::sync::Arc<dyn crate::worker::output::OutputSink>;
280 let inv = WorkerInvocation {
281 token,
282 task_id,
283 attempt,
284 agent: ctx.agent.clone(),
285 prompt,
286 sink: Some(sink),
287 cancel_token: Some(cancel_inner.clone()),
288 };
289
290 tokio::spawn(async move {
291 let result = tokio::select! {
292 r = f(inv) => r,
293 _ = cancel_inner.cancelled() => Err(WorkerError::Cancelled),
294 };
295 // Fold WorkerResult into OutputEvent::Final. Contract: one Final per attempt.
296 if let Ok(wr) = &result {
297 let ev = crate::worker::output::OutputEvent::Final {
298 content: crate::worker::output::ContentRef::Inline {
299 value: wr.value.clone(),
300 },
301 ok: wr.ok,
302 };
303 let _ = engine_for_emit
304 .submit_output(&token_for_emit, &task_id_for_emit, attempt, ev)
305 .await;
306 }
307 let signal: Result<(), WorkerError> = result.map(|_| ());
308 let _ = tx.send(signal);
309 });
310
311 let handler = crate::worker::WorkerJoinHandler {
312 worker_id,
313 cancel,
314 completion: rx,
315 };
316 Ok(Box::new(W::from(handler)))
317 }
318}