Skip to main content

mlua_swarm/
worker.rs

1//! The `Worker` trait — the shared interface for the execution units
2//! each spawner keeps internally.
3//!
4//! ## Roles — the boundary between the Engine view and the Spawner's
5//! internal view
6//!
7//! - **Engine view.** Only `SpawnerAdapter` is visible. The engine does
8//!   not know about Workers, and it does not care about their shape.
9//! - **Spawner's internal view.** Each `SpawnerAdapter::spawn` builds a
10//!   concrete Worker internally (`ChildProcessWorker` / `ClosureWorker`
11//!   / `OperatorWorker` and friends), type-erases it as
12//!   `Box<dyn Worker>`, and returns that. This trait fixes the interface
13//!   so the Worker shape does not drift between spawner implementations.
14//!
15//! ## Worker lifetime semantics
16//!
17//! The current contract is "one spawn = one worker = one join". At
18//! `spawn()` time the worker has already spun up an internal tokio task
19//! (it is already running); the caller just needs to `join()` and wait
20//! for the completion signal. The value comes from
21//! `engine.output_tail(task_id, attempt)` via `OutputEvent::Final`
22//! — the oneshot channel carries the signal, not the
23//! value.
24//!
25//! Extending to "one worker = N invocations" (calling the same worker
26//! multiple times while the token's TTL is alive) is a carry on a
27//! separate axis. That was the original design intent
28//! v6.md:174, but the shape was collapsed for the sake of implementation
29//! simplicity. The route when it is needed: add
30//! `async fn invoke(&mut self, token, prompt) -> WorkerResult` to the
31//! trait and redefine `join` as "the last invocation's completion plus
32//! cleanup".
33//!
34//! ## `WorkerJoinHandler` — the canonical shape shared by the three
35//! spawners today
36//!
37//! All three current spawners (Shell / InProc / Operator) call
38//! `tokio::spawn` internally and push their completion signal through a
39//! oneshot channel. `WorkerJoinHandler` is the helper that wraps that
40//! shape into a `dyn Worker`. We share the helper until spawner
41//! implementations need to define their own Worker structs; further
42//! specialisation is a future carry.
43
44pub mod adapter;
45pub mod agent_block;
46pub mod baseline;
47pub mod output;
48pub mod process_spawner;
49
50use crate::types::WorkerId;
51use crate::worker::adapter::WorkerError;
52use async_trait::async_trait;
53use tokio::sync::oneshot;
54use tokio_util::sync::CancellationToken;
55
56/// Shared interface for the execution units spawners launch internally.
57///
58/// Every spawner implementation returns a concrete Worker struct that
59/// implements this trait (today that is `WorkerJoinHandler`) as a
60/// `Box<dyn Worker>`. The caller (the engine) only interacts with
61/// workers through three operations: `id()` / `cancel_token()` /
62/// `join()`.
63#[async_trait]
64pub trait Worker: Send {
65    /// This worker's identity — used for logging and to tie cancellation
66    /// back to the right worker.
67    fn id(&self) -> &WorkerId;
68
69    /// Token that carries the cancel signal. Clonable — this is the
70    /// path the engine uses to cancel from the outside.
71    fn cancel_token(&self) -> CancellationToken;
72
73    /// Await the completion signal. The worker is consumed — one
74    /// worker, one join. `Ok(())` means the worker ran to completion;
75    /// `Err` means it was cancelled, failed, or panicked internally.
76    /// Values do not come back through this trait; use
77    /// `engine.output_tail` for those.
78    async fn join(self: Box<Self>) -> Result<(), WorkerError>;
79}
80
81/// **Handler for a Worker's async completion signal.** A building
82/// block; it does not implement `Worker` itself. Holds the
83/// `(worker_id, cancel token, oneshot receiver)` triple and is embedded
84/// by every per-kind Worker (`AgentBlockWorker` / `LuaWorker` /
85/// `RustFnWorker` / `ProcessWorker` / `OperatorWorker`).
86///
87/// "The Worker that actually does the work" and "the mechanism that
88/// waits for its async completion" are two different concepts. This
89/// struct is dedicated to the latter; the former is expressed by
90/// per-kind Worker structs — one type per `AgentKind`, each hiding its
91/// kind-specific state (SDK quirks, VM state, child-process handles,
92/// etc.) inside itself.
93pub struct WorkerJoinHandler {
94    /// Identity of the worker this handler belongs to.
95    pub worker_id: WorkerId,
96    /// Cancellation token shared with the running task; cloned out via
97    /// `Worker::cancel_token`.
98    pub cancel: CancellationToken,
99    /// Receiver side of the oneshot channel the spawned task completes
100    /// through. Consumed by `await_completion`.
101    pub completion: oneshot::Receiver<Result<(), WorkerError>>,
102}
103
104impl WorkerJoinHandler {
105    /// Shared helper that receives the `join` async signal. This is the
106    /// canonical path called from every per-kind Worker's `Worker::join`
107    /// implementation.
108    pub async fn await_completion(self) -> Result<(), WorkerError> {
109        match self.completion.await {
110            Ok(r) => r,
111            Err(_) => Err(WorkerError::Failed(
112                "worker completion channel closed".into(),
113            )),
114        }
115    }
116}
117
118/// Generic Worker used only on the middleware (`wrap_join`) wrap path,
119/// so kind-agnostic post-processing wrap results can be returned as
120/// `Box<dyn Worker>`. Unlike a per-kind Worker, this does not represent
121/// "a specific kind's execution" — it is a thin wrapper that layers a
122/// post-processor on top of an existing Worker.
123///
124/// Named after its role: the "Worker for the middleware path" — the
125/// type boxed as the return value by `wrap_join` consumers (Audit /
126/// MainAI / Senior / LongHold / Lua after-hook, and so on).
127pub struct MiddlewareWorker {
128    /// The wrapped completion handle; `join` delegates to this.
129    pub handler: WorkerJoinHandler,
130}
131
132impl From<WorkerJoinHandler> for MiddlewareWorker {
133    fn from(handler: WorkerJoinHandler) -> Self {
134        Self { handler }
135    }
136}
137
138#[async_trait]
139impl Worker for MiddlewareWorker {
140    fn id(&self) -> &WorkerId {
141        &self.handler.worker_id
142    }
143
144    fn cancel_token(&self) -> CancellationToken {
145        self.handler.cancel.clone()
146    }
147
148    async fn join(self: Box<Self>) -> Result<(), WorkerError> {
149        self.handler.await_completion().await
150    }
151}
152
153/// Helper that wraps the inner Worker's `join()` completion signal in a
154/// post-processor and returns a fresh `Box<dyn Worker>`. All the
155/// middleware wrap paths (Audit / MainAI / Senior / LongHold / Lua
156/// after-hook, and so on) go through this helper for consistency.
157///
158/// The cancel token is inherited from the inner Worker verbatim, so
159/// cancelling from outside the engine still reaches the inner Worker.
160/// `worker_id` is also carried over from the inner Worker.
161pub fn wrap_join<F, Fut>(inner: Box<dyn Worker>, post: F) -> Box<dyn Worker>
162where
163    F: FnOnce(Result<(), WorkerError>) -> Fut + Send + 'static,
164    Fut: std::future::Future<Output = Result<(), WorkerError>> + Send,
165{
166    let worker_id = inner.id().clone();
167    let cancel = inner.cancel_token();
168    let (tx, rx) = oneshot::channel();
169    tokio::spawn(async move {
170        let r = inner.join().await;
171        let result = post(r).await;
172        let _ = tx.send(result);
173    });
174    Box::new(MiddlewareWorker {
175        handler: WorkerJoinHandler {
176            worker_id,
177            cancel,
178            completion: rx,
179        },
180    })
181}