Skip to main content

mlua_swarm/worker/
adapter.rs

1//! The second stage of the two-stage pipeline: `SpawnerAdapter`.
2//!
3//! From the engine's viewpoint there is only one trait,
4//! `SpawnerAdapter`; its `spawn` returns `Box<dyn Worker>` (see
5//! `crate::worker::Worker`). Worker shape is an implementation detail of
6//! each spawner; the engine only touches Workers through three
7//! operations — `id()` / `cancel_token()` / `join()`.
8//!
9//! The old `WorkerAdapter` trait and `InProcWorker` struct — which
10//! assumed a three-stage `Spawner.spawn → WorkerAdapter → invoke`
11//! pipeline — were removed on this turn. Nothing instantiated or
12//! dispatched them (dead code), and the multi-invocation path from
13//! was collapsed in the implementation anyway.
14//! The interface is now consolidated into the new `trait Worker` in
15//! `src/worker.rs`.
16
17use crate::core::ctx::Ctx;
18use crate::core::engine::Engine;
19use crate::types::{CapToken, TaskId};
20use crate::worker::Worker;
21use async_trait::async_trait;
22use serde_json::Value;
23use std::collections::HashMap;
24use std::future::Future;
25use std::pin::Pin;
26use std::sync::Arc;
27use thiserror::Error;
28
29/// Errors that can occur while `SpawnerAdapter::spawn` is setting up a
30/// worker, before the worker itself starts running.
31#[derive(Debug, Error)]
32pub enum SpawnError {
33    /// No `WorkerFn` is registered for the requested agent name.
34    #[error("worker not registered: {0}")]
35    NotRegistered(String),
36    /// A middleware layer vetoed the spawn (e.g. capability check, rate
37    /// limit, policy gate).
38    #[error("spawn rejected by middleware: {0}")]
39    RejectedByMiddleware(String),
40    /// Any other setup failure (e.g. `fetch_prompt` failed).
41    #[error("internal: {0}")]
42    Internal(String),
43}
44
45/// Errors surfaced once a worker is running, via `Worker::join`.
46#[derive(Debug, Error)]
47pub enum WorkerError {
48    /// The worker fn itself returned an error.
49    #[error("worker fn returned error: {0}")]
50    Failed(String),
51    /// The worker was cancelled through its `CancellationToken`.
52    #[error("cancelled")]
53    Cancelled,
54}
55
56/// The value a `WorkerFn` hands back on success, folded into an
57/// `OutputEvent::Final` by the spawner.
58#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
59pub struct WorkerResult {
60    /// The worker fn's output payload.
61    pub value: Value,
62    /// Whether the agent itself considers this a successful result
63    /// (distinct from `Result::Err` — a worker fn can return `Ok(..)`
64    /// with `ok: false` to signal an agent-level failure).
65    pub ok: bool,
66}
67
68/// First stage of the two-stage pipeline: builds a `Box<dyn Worker>` for
69/// one attempt. Every concrete spawner (`InProcSpawner`, `ProcessSpawner`,
70/// the Operator spawner) implements this; the engine only ever holds a
71/// `Arc<dyn SpawnerAdapter>` and knows nothing about the Worker shape
72/// behind it.
73#[async_trait]
74pub trait SpawnerAdapter: Send + Sync {
75    /// Spawn one attempt as a worker. Returns `Box<dyn Worker>`.
76    ///
77    /// The `directive` argument was removed in design intent: prompts are
78    /// pulled on demand through
79    /// `engine.fetch_prompt(token, task_id, attempt)`. Spawners are free
80    /// to use whatever protocol they like internally — push, pull, or a
81    /// hybrid. `ProcessSpawner` runs `fetch_prompt` and pushes the
82    /// result into the child's stdin; `InProcSpawner` injects a prep
83    /// snapshot as `WorkerInvocation.prompt`; a child process could
84    /// even re-pull with the token itself.
85    async fn spawn(
86        &self,
87        engine: &Engine,
88        ctx: &Ctx,
89        task_id: TaskId,
90        attempt: u32,
91        token: CapToken,
92    ) -> Result<Box<dyn Worker>, SpawnError>;
93}
94
95// ─── InProcSpawner ────────────────────────────────────────────────────────
96
97/// Invocation context handed to a Worker fn. Bundles `token` +
98/// `task_id` + `prompt` + `sink`.
99///
100/// The `prompt` field was added in design intent, folding the old
101/// `Fn(inv, directive)` `directive` argument into the invocation. The
102/// spawner is expected to call
103/// `engine.fetch_prompt(token, task_id, attempt)` in its prep step and
104/// inject the snapshot into the invocation (push form). The `WorkerFn`
105/// side may still re-pull if it needs to — for example to fetch the
106/// prompt for a different attempt.
107///
108/// The `sink` field was added in design intent as the formal contract for
109/// the spawner's intake surface. A worker fn can stream intermediate
110/// events with things like
111/// `inv.sink.emit(OutputEvent::Progress { .. })`. Child-process
112/// spawners (`ProcessSpawner`, etc.) do not use `sink` — the child
113/// speaks the stdout protocol; `InProcSpawner` injects one. Even
114/// without `sink`, the `WorkerResult` returned by the fn is still
115/// folded into a `Final` event on the spawner side, running alongside
116/// the older return-value path.
117#[derive(Clone)]
118pub struct WorkerInvocation {
119    /// Capability token authorizing this attempt.
120    pub token: CapToken,
121    /// The task this invocation belongs to.
122    pub task_id: TaskId,
123    /// Attempt number within the task (used to key output events).
124    pub attempt: u32,
125    /// Registered agent name the `WorkerFn` was looked up under.
126    pub agent: String,
127    /// The prompt/prep snapshot pulled via `engine.fetch_prompt`,
128    /// injected here (push form) so the worker fn does not need to call
129    /// back into the engine for the common case.
130    pub prompt: String,
131    /// Intake: sink the worker fn uses to emit intermediate
132    /// `OutputEvent`s. Injected by `InProcSpawner`. `None` means the
133    /// sink path is not wired for this invocation.
134    pub sink: Option<std::sync::Arc<dyn crate::worker::output::OutputSink>>,
135    /// Upstream task cancel token — the clone of `cancel_inner`
136    /// generated by `InProcSpawner` for `JoinHandleWorker`. Worker fns
137    /// bridge this to their child futures or their SDK's
138    /// `shutdown_token`, propagating external cancellation all the way
139    /// down. `None` — like `sink` above — means the caller path is not
140    /// carrying the cancel channel.
141    pub cancel_token: Option<tokio_util::sync::CancellationToken>,
142}
143
144impl std::fmt::Debug for WorkerInvocation {
145    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
146        f.debug_struct("WorkerInvocation")
147            .field("token", &self.token)
148            .field("task_id", &self.task_id)
149            .field("attempt", &self.attempt)
150            .field("agent", &self.agent)
151            .field("prompt", &self.prompt)
152            .field("sink", &self.sink.as_ref().map(|_| "<OutputSink>"))
153            .field(
154                "cancel_token",
155                &self.cancel_token.as_ref().map(|_| "<CancellationToken>"),
156            )
157            .finish()
158    }
159}
160
161/// A registered agent implementation: takes a `WorkerInvocation` and
162/// resolves to a `WorkerResult` (or a `WorkerError`). Boxed as a
163/// type-erased `Future` so heterogeneous agent implementations (async
164/// fns, closures capturing state, etc.) can share one registry entry
165/// type.
166pub type WorkerFn = Arc<
167    dyn Fn(
168            WorkerInvocation,
169        ) -> Pin<Box<dyn Future<Output = Result<WorkerResult, WorkerError>> + Send>>
170        + Send
171        + Sync,
172>;
173
174/// `agent`-string → `WorkerFn` registry. The generic parameter `W` pins
175/// the per-kind Worker concrete type at the type level, so AgentBlock /
176/// Lua / RustFn each produce their own Worker type through
177/// `InProcSpawner<W>` and the type binding is preserved right up until
178/// `SpawnerAdapter::spawn()` erases the return as `Box<dyn Worker>`.
179/// `W` must be constructible from `WorkerJoinHandler` via `From` — i.e.
180/// a newtype that embeds the async-signal handle.
181pub struct InProcSpawner<W = crate::worker::MiddlewareWorker> {
182    /// Agent name → implementation lookup table.
183    pub registry: HashMap<String, WorkerFn>,
184    _phantom: std::marker::PhantomData<W>,
185}
186
187// Inherent impl for the default W = MiddlewareWorker (so `InProcSpawner::new()`
188// in existing tests picks this default).
189impl InProcSpawner {
190    /// Creates an empty registry, defaulting the Worker type to
191    /// `MiddlewareWorker` (used by existing call sites and tests).
192    pub fn new() -> Self {
193        Self {
194            registry: HashMap::new(),
195            _phantom: std::marker::PhantomData,
196        }
197    }
198
199    /// Registers a `WorkerFn`-shaped async closure under `agent`,
200    /// overwriting any previous registration for the same name. Returns
201    /// `&mut Self` for chained registration calls.
202    pub fn register<F, Fut>(&mut self, agent: impl Into<String>, f: F) -> &mut Self
203    where
204        F: Fn(WorkerInvocation) -> Fut + Send + Sync + 'static,
205        Fut: Future<Output = Result<WorkerResult, WorkerError>> + Send + 'static,
206    {
207        let f = Arc::new(f);
208        let wrapped: WorkerFn = Arc::new(move |inv| {
209            let f = f.clone();
210            Box::pin(f(inv))
211        });
212        self.registry.insert(agent.into(), wrapped);
213        self
214    }
215}
216
217// Generic typed impl (the factory.build path that constructs a per-kind Worker).
218impl<W> InProcSpawner<W>
219where
220    W: Worker + From<crate::worker::WorkerJoinHandler> + Send + Sync + 'static,
221{
222    /// Creates an empty registry pinned to Worker type `W` (the
223    /// `factory.build` path uses this to get a per-kind Worker out of
224    /// `spawn()` instead of the default `MiddlewareWorker`).
225    pub fn typed() -> Self {
226        Self {
227            registry: HashMap::new(),
228            _phantom: std::marker::PhantomData,
229        }
230    }
231}
232
233impl Default for InProcSpawner {
234    fn default() -> Self {
235        Self::new()
236    }
237}
238
239#[async_trait]
240impl<W: Worker + From<crate::worker::WorkerJoinHandler> + Send + Sync + 'static> SpawnerAdapter
241    for InProcSpawner<W>
242{
243    async fn spawn(
244        &self,
245        engine: &Engine,
246        ctx: &Ctx,
247        task_id: TaskId,
248        attempt: u32,
249        token: CapToken,
250    ) -> Result<Box<dyn Worker>, SpawnError> {
251        let f = self
252            .registry
253            .get(&ctx.agent)
254            .cloned()
255            .ok_or_else(|| SpawnError::NotRegistered(ctx.agent.clone()))?;
256
257        // design intent: prompts are pulled via engine.fetch_prompt (the directive argument is retired)
258        let prompt = engine
259            .fetch_prompt(&token, &task_id)
260            .await
261            .map_err(|e| SpawnError::Internal(format!("fetch_prompt: {e}")))?;
262
263        let (tx, rx) = tokio::sync::oneshot::channel();
264        let cancel = tokio_util::sync::CancellationToken::new();
265        let cancel_inner = cancel.clone();
266        let worker_id = crate::types::WorkerId::new();
267        // design intent: hand `engine` / `token` to the spawn task so it can emit
268        // OutputEvent::Final via submit_output (side-by-side with the
269        // WorkerResult oneshot path).
270        let engine_for_emit = engine.clone();
271        let token_for_emit = token.clone();
272        let task_id_for_emit = task_id.clone();
273        // Wire the receiving end by injecting an EngineSink into WorkerInvocation.sink.
274        let sink = std::sync::Arc::new(crate::worker::output::EngineSink::new(
275            engine.clone(),
276            token.clone(),
277            task_id.clone(),
278            attempt,
279        )) as std::sync::Arc<dyn crate::worker::output::OutputSink>;
280        let inv = WorkerInvocation {
281            token,
282            task_id,
283            attempt,
284            agent: ctx.agent.clone(),
285            prompt,
286            sink: Some(sink),
287            cancel_token: Some(cancel_inner.clone()),
288        };
289
290        tokio::spawn(async move {
291            let result = tokio::select! {
292                r = f(inv) => r,
293                _ = cancel_inner.cancelled() => Err(WorkerError::Cancelled),
294            };
295            // Fold WorkerResult into OutputEvent::Final. Contract: one Final per attempt.
296            if let Ok(wr) = &result {
297                let ev = crate::worker::output::OutputEvent::Final {
298                    content: crate::worker::output::ContentRef::Inline {
299                        value: wr.value.clone(),
300                    },
301                    ok: wr.ok,
302                };
303                let _ = engine_for_emit
304                    .submit_output(&token_for_emit, &task_id_for_emit, attempt, ev)
305                    .await;
306            }
307            let signal: Result<(), WorkerError> = result.map(|_| ());
308            let _ = tx.send(signal);
309        });
310
311        let handler = crate::worker::WorkerJoinHandler {
312            worker_id,
313            cancel,
314            completion: rx,
315        };
316        Ok(Box::new(W::from(handler)))
317    }
318}