mlua_swarm/worker.rs
1//! The `Worker` trait — the shared interface for the execution units
2//! each spawner keeps internally.
3//!
4//! ## Roles — the boundary between the Engine view and the Spawner's
5//! internal view
6//!
7//! - **Engine view.** Only `SpawnerAdapter` is visible. The engine does
8//! not know about Workers, and it does not care about their shape.
9//! - **Spawner's internal view.** Each `SpawnerAdapter::spawn` builds a
10//! concrete Worker internally (`ChildProcessWorker` / `ClosureWorker`
11//! / `OperatorWorker` and friends), type-erases it as
12//! `Box<dyn Worker>`, and returns that. This trait fixes the interface
13//! so the Worker shape does not drift between spawner implementations.
14//!
15//! ## Worker lifetime semantics
16//!
17//! The current contract is "one spawn = one worker = one join". At
18//! `spawn()` time the worker has already spun up an internal tokio task
19//! (it is already running); the caller just needs to `join()` and wait
20//! for the completion signal. The value comes from
21//! `engine.output_tail(task_id, attempt)` via `OutputEvent::Final`
22//! — the oneshot channel carries the signal, not the
23//! value.
24//!
25//! Extending to "one worker = N invocations" (calling the same worker
26//! multiple times while the token's TTL is alive) is a carry on a
27//! separate axis. That was the original design intent
28//! v6.md:174, but the shape was collapsed for the sake of implementation
29//! simplicity. The route when it is needed: add
30//! `async fn invoke(&mut self, token, prompt) -> WorkerResult` to the
31//! trait and redefine `join` as "the last invocation's completion plus
32//! cleanup".
33//!
34//! ## `WorkerJoinHandler` — the canonical shape shared by the three
35//! spawners today
36//!
37//! All three current spawners (Shell / InProc / Operator) call
38//! `tokio::spawn` internally and push their completion signal through a
39//! oneshot channel. `WorkerJoinHandler` is the helper that wraps that
40//! shape into a `dyn Worker`. We share the helper until spawner
41//! implementations need to define their own Worker structs; further
42//! specialisation is a future carry.
43
44pub mod adapter;
45pub mod agent_block;
46pub mod baseline;
47pub mod output;
48pub mod process_spawner;
49
50use crate::types::WorkerId;
51use crate::worker::adapter::WorkerError;
52use async_trait::async_trait;
53use tokio::sync::oneshot;
54use tokio_util::sync::CancellationToken;
55
56/// Shared interface for the execution units spawners launch internally.
57///
58/// Every spawner implementation returns a concrete Worker struct that
59/// implements this trait (today that is `WorkerJoinHandler`) as a
60/// `Box<dyn Worker>`. The caller (the engine) only interacts with
61/// workers through three operations: `id()` / `cancel_token()` /
62/// `join()`.
63#[async_trait]
64pub trait Worker: Send {
65 /// This worker's identity — used for logging and to tie cancellation
66 /// back to the right worker.
67 fn id(&self) -> &WorkerId;
68
69 /// Token that carries the cancel signal. Clonable — this is the
70 /// path the engine uses to cancel from the outside.
71 fn cancel_token(&self) -> CancellationToken;
72
73 /// Await the completion signal. The worker is consumed — one
74 /// worker, one join. `Ok(())` means the worker ran to completion;
75 /// `Err` means it was cancelled, failed, or panicked internally.
76 /// Values do not come back through this trait; use
77 /// `engine.output_tail` for those.
78 async fn join(self: Box<Self>) -> Result<(), WorkerError>;
79}
80
81/// **Handler for a Worker's async completion signal.** A building
82/// block; it does not implement `Worker` itself. Holds the
83/// `(worker_id, cancel token, oneshot receiver)` triple and is embedded
84/// by every per-kind Worker (`AgentBlockWorker` / `LuaWorker` /
85/// `RustFnWorker` / `ProcessWorker` / `OperatorWorker`).
86///
87/// "The Worker that actually does the work" and "the mechanism that
88/// waits for its async completion" are two different concepts. This
89/// struct is dedicated to the latter; the former is expressed by
90/// per-kind Worker structs — one type per `AgentKind`, each hiding its
91/// kind-specific state (SDK quirks, VM state, child-process handles,
92/// etc.) inside itself.
93pub struct WorkerJoinHandler {
94 /// Identity of the worker this handler belongs to.
95 pub worker_id: WorkerId,
96 /// Cancellation token shared with the running task; cloned out via
97 /// `Worker::cancel_token`.
98 pub cancel: CancellationToken,
99 /// Receiver side of the oneshot channel the spawned task completes
100 /// through. Consumed by `await_completion`.
101 pub completion: oneshot::Receiver<Result<(), WorkerError>>,
102}
103
104impl WorkerJoinHandler {
105 /// Shared helper that receives the `join` async signal. This is the
106 /// canonical path called from every per-kind Worker's `Worker::join`
107 /// implementation.
108 pub async fn await_completion(self) -> Result<(), WorkerError> {
109 match self.completion.await {
110 Ok(r) => r,
111 Err(_) => Err(WorkerError::Failed(
112 "worker completion channel closed".into(),
113 )),
114 }
115 }
116}
117
118/// Generic Worker used only on the middleware (`wrap_join`) wrap path,
119/// so kind-agnostic post-processing wrap results can be returned as
120/// `Box<dyn Worker>`. Unlike a per-kind Worker, this does not represent
121/// "a specific kind's execution" — it is a thin wrapper that layers a
122/// post-processor on top of an existing Worker.
123///
124/// Named after its role: the "Worker for the middleware path" — the
125/// type boxed as the return value by `wrap_join` consumers (Audit /
126/// MainAI / Senior / LongHold / Lua after-hook, and so on).
127pub struct MiddlewareWorker {
128 /// The wrapped completion handle; `join` delegates to this.
129 pub handler: WorkerJoinHandler,
130}
131
132impl From<WorkerJoinHandler> for MiddlewareWorker {
133 fn from(handler: WorkerJoinHandler) -> Self {
134 Self { handler }
135 }
136}
137
138#[async_trait]
139impl Worker for MiddlewareWorker {
140 fn id(&self) -> &WorkerId {
141 &self.handler.worker_id
142 }
143
144 fn cancel_token(&self) -> CancellationToken {
145 self.handler.cancel.clone()
146 }
147
148 async fn join(self: Box<Self>) -> Result<(), WorkerError> {
149 self.handler.await_completion().await
150 }
151}
152
153/// Helper that wraps the inner Worker's `join()` completion signal in a
154/// post-processor and returns a fresh `Box<dyn Worker>`. All the
155/// middleware wrap paths (Audit / MainAI / Senior / LongHold / Lua
156/// after-hook, and so on) go through this helper for consistency.
157///
158/// The cancel token is inherited from the inner Worker verbatim, so
159/// cancelling from outside the engine still reaches the inner Worker.
160/// `worker_id` is also carried over from the inner Worker.
161pub fn wrap_join<F, Fut>(inner: Box<dyn Worker>, post: F) -> Box<dyn Worker>
162where
163 F: FnOnce(Result<(), WorkerError>) -> Fut + Send + 'static,
164 Fut: std::future::Future<Output = Result<(), WorkerError>> + Send,
165{
166 let worker_id = inner.id().clone();
167 let cancel = inner.cancel_token();
168 let (tx, rx) = oneshot::channel();
169 tokio::spawn(async move {
170 let r = inner.join().await;
171 let result = post(r).await;
172 let _ = tx.send(result);
173 });
174 Box::new(MiddlewareWorker {
175 handler: WorkerJoinHandler {
176 worker_id,
177 cancel,
178 completion: rx,
179 },
180 })
181}