mlua_swarm/operator.rs
1//! Operator abstraction.
2//!
3//! ## Roles
4//!
5//! - **Spawners** (`SpawnerAdapter`) do not know about `Operator` `kind`s.
6//! Ordinary dispatches are handled by `ProcessSpawner` /
7//! `InProcSpawner` / etc.
8//! - `OperatorSpawner` is the `SpawnerAdapter` that routes dispatches
9//! through an operator. It holds an `Arc<dyn Operator>` and does one
10//! thing: hand every spawn request to that operator's `execute`. It
11//! still does not know the operator's `kind` (`MainAi` / `Human` /
12//! `Automate` / `Composite`).
13//! - The `Operator` trait itself returns a `WorkerResult`, as a
14//! synchronous backend. Implementations are free per kind — a `MainAi`
15//! operator might round-trip through Claude via an HTTP callback, a
16//! `Human` operator might prompt on a CLI, an `Automate` operator
17//! might delegate to a different spawner, and so on.
18//!
19//! Which dispatches go through the `OperatorSpawner` is decided at the
20//! flow.ir layer (designer + hints + Swarm compiler). The algocline
21//! strategy side never says "hand this to the operator" — a firm
22//! separation of concerns.
23
24pub mod render;
25
26pub use render::{render_system, slots_from_prompt, RenderError};
27
28use crate::core::ctx::Ctx;
29use crate::core::engine::Engine;
30use crate::types::{CapToken, TaskId, WorkerId};
31use crate::worker::adapter::{SpawnError, SpawnerAdapter, WorkerError, WorkerResult};
32use crate::worker::output::{ContentRef, OutputEvent};
33use crate::worker::{Worker, WorkerJoinHandler};
34use async_trait::async_trait;
35use std::sync::Arc;
36use tokio::sync::oneshot;
37use tokio_util::sync::CancellationToken;
38
39/// Worker binding baked from `AgentDef.profile` at compile time — which
40/// worker variant the operator backend must run, plus the tool surface
41/// the Blueprint declared for this agent.
42///
43/// `variant` is mse domain vocabulary; backend-specific terms (e.g. the
44/// Claude Code Agent tool's `subagent_type` parameter) belong to the
45/// rendering boundary (`operator_ws::session` directive render), not here.
46#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
47pub struct WorkerBinding {
48 /// Worker variant name (for the Claude Code backend this maps onto
49 /// the Agent tool `subagent_type` at directive-render time).
50 #[serde(alias = "subagent_type")]
51 pub variant: String,
52 /// Tool list declared in `AgentDef.profile.tools` (informational
53 /// for the MainAI / observability; the SubAgent's own frontmatter
54 /// is what actually grants tools).
55 pub tools: Vec<String>,
56}
57
58/// The `Operator` trait: takes a spawn request and returns a
59/// `WorkerResult`. The backend for `OperatorSpawner`. Implementations
60/// are free to differ per kind; the spawner just calls `execute` and
61/// stays out of the internals.
62///
63/// Arguments — a two-slot payload plus `worker_token` (the thin path
64/// was added later) plus `worker` (the Blueprint-baked binding, added
65/// later still):
66///
67/// - `system`: the agent persona — the rendered value of
68/// `AgentDef.profile.system_prompt` after template expansion. `None`
69/// means no profile. Expected to map straight onto the LLM API's
70/// system message; direct-LLM operators consume this.
71/// - `prompt`: task-specific intent — `TaskSpec.initial_directive`,
72/// pulled server-side via `engine.fetch_prompt`. Expected to map
73/// straight onto the LLM API's user message.
74/// - `worker`: the compile-time-baked [`WorkerBinding`] (subagent type +
75/// declared tools) resolved from `AgentDef.profile.worker_binding`.
76/// `None` for agents whose profile has no `worker_binding` set.
77/// Backends that require one (see [`Operator::requires_worker_binding`])
78/// must fail loud rather than silently degrade when this is `None`.
79/// - `worker_token`: a capability token (`Role::Worker`, 600s TTL,
80/// `scopes = ["*"]`). Thin-path operators (a `a WebSocket-backed operator session`,
81/// for instance) `encode()` this token and hand it to the MainAI
82/// WebSocket client, so the SubAgent can hit `/v1/worker/prompt` +
83/// `/v1/worker/result` with `Authorization: Bearer <encoded>`.
84/// Direct-LLM operators may ignore it.
85///
86/// The trait passes both slots so the same signature works for the
87/// thin path and the direct path; the implementation picks which one
88/// it takes (consume the server-rendered `system` directly, or forward
89/// the token and let the client fetch).
90#[async_trait]
91pub trait Operator: Send + Sync {
92 /// Executes one spawn request against this operator's backend and
93 /// returns the resulting `WorkerResult` (or a `WorkerError` if the
94 /// backend failed). See the trait doc above for the meaning of each
95 /// argument.
96 async fn execute(
97 &self,
98 ctx: &Ctx,
99 system: Option<String>,
100 prompt: String,
101 worker: Option<WorkerBinding>,
102 worker_token: CapToken,
103 ) -> Result<WorkerResult, WorkerError>;
104
105 /// Whether this operator backend requires a non-`None` `worker`
106 /// binding to execute at all. `false` by default (direct-LLM
107 /// operators consume `system` / `prompt` directly and have no
108 /// SubAgent to dispatch). WS thin-path operators override this to
109 /// `true` — the compiler uses it to fail loud at `compile()` time
110 /// when `AgentDef.profile.worker_binding` is absent, rather than
111 /// silently degrading at dispatch time.
112 fn requires_worker_binding(&self) -> bool {
113 false
114 }
115}
116
117/// A `SpawnerAdapter` implementation that hands the dispatch off to an
118/// `Arc<dyn Operator>`.
119///
120/// `OperatorSpawner` itself does not inspect the operator's `kind` —
121/// `MainAi` / `Human` / `Automate` / `Composite` all go through the same
122/// path, and the operator implementation absorbs the differences.
123///
124/// # Position — the AgentSpec-axis Operator path
125///
126/// Use this type on the path that **bakes a separate Operator backend
127/// into every `AgentDef`**. For an `AgentKind::Operator` `AgentDef`, the
128/// `OperatorSpawnerFactory` produces one with
129/// `OperatorSpawner::new(op, system_prompt, worker_binding)` and places it
130/// in `routes[agent_name]`. Agents flowing in through the `agent.md`
131/// loader default to `kind = Operator`, so they land here.
132///
133/// The paired **Blueprint-global (session) axis** is
134/// `crate::middleware::OperatorDelegateMiddleware` — a single operator
135/// backend registered on the session and applied uniformly across every
136/// agent. When both are effective, the delegate middleware sits at the
137/// outer end of the stack and bypasses `inner.spawn`; this type is inert
138/// and no double fire can occur. See the `OperatorSpawnerFactory` doc
139/// for the exclusivity narrative.
140pub struct OperatorSpawner {
141 operator: Arc<dyn Operator>,
142 /// The compile-time-baked `AgentDef.profile.system_prompt` — the
143 /// agent's persona. If `Some`, it takes priority at spawn time; if
144 /// `None`, we fall back to `fetch_prompt` (`initial_directive`).
145 system_prompt: Option<String>,
146 /// The compile-time-baked worker binding — resolved from
147 /// `AgentDef.profile.worker_binding` by `OperatorSpawnerFactory`.
148 /// Passed straight through to `Operator::execute` on every spawn.
149 worker_binding: Option<WorkerBinding>,
150}
151
152impl OperatorSpawner {
153 /// Binds an operator backend plus an optional compile-time
154 /// `system_prompt` template (rendered per-spawn via `render_system`)
155 /// and an optional compile-time-baked `worker_binding`.
156 pub fn new(
157 operator: Arc<dyn Operator>,
158 system_prompt: Option<String>,
159 worker_binding: Option<WorkerBinding>,
160 ) -> Self {
161 Self {
162 operator,
163 system_prompt,
164 worker_binding,
165 }
166 }
167}
168
169#[async_trait]
170impl SpawnerAdapter for OperatorSpawner {
171 async fn spawn(
172 &self,
173 engine: &Engine,
174 ctx: &Ctx,
175 task_id: TaskId,
176 attempt: u32,
177 token: CapToken,
178 ) -> Result<Box<dyn Worker>, SpawnError> {
179 // By convention the spawner pulls `prompt`
180 // through `fetch_prompt`. The `system_prompt` (from
181 // `AgentDef.profile`) travels on the other slot — sibling to the
182 // AgentBlock path's `BlockConfig.context` / `.prompt` split.
183 let prompt = engine
184 .fetch_prompt(&token, &task_id)
185 .await
186 .map_err(|e| SpawnError::Internal(format!("fetch_prompt: {e}")))?;
187
188 // Render the `system_prompt` template.
189 // Expand the prompt into a slot map and hand the template to
190 // minijinja. The syntax used inside the agent.md body is
191 // Jinja2-compatible (`{{ directive }}` / `{% if intent %}` /
192 // `{{ x | upper }}`), with strict undefined variables and
193 // auto-escape disabled.
194 let system = match self.system_prompt.as_deref() {
195 Some(tmpl) => {
196 let slots = render::slots_from_prompt(&prompt);
197 let rendered = render::render_system(tmpl, &slots)
198 .map_err(|e| SpawnError::Internal(format!("render system_prompt: {e}")))?;
199 Some(rendered)
200 }
201 None => None,
202 };
203
204 // Bake the rendered `system`
205 // into engine state so the SubAgent can fetch it alongside
206 // `prompt` on the `HTTP /v1/worker/prompt` path. Failures are
207 // fail-loud via `SpawnError::Internal` — no silent fallback.
208 engine
209 .bake_worker_system_prompt(&task_id, attempt, system.clone())
210 .await
211 .map_err(|e| SpawnError::Internal(format!("bake system_prompt: {e}")))?;
212
213 let op = self.operator.clone();
214 let engine_clone = engine.clone();
215 let token_clone = token.clone();
216 let token_for_op = token.clone();
217 let task_id_clone = task_id.clone();
218 let ctx_clone = ctx.clone();
219 let worker_binding = self.worker_binding.clone();
220 let (tx, rx) = oneshot::channel();
221 let cancel = CancellationToken::new();
222 let cancel_inner = cancel.clone();
223 let worker_id = WorkerId::new();
224
225 tokio::spawn(async move {
226 let result: Result<WorkerResult, WorkerError> = tokio::select! {
227 r = op.execute(&ctx_clone, system, prompt, worker_binding, token_for_op) => r,
228 _ = cancel_inner.cancelled() => Err(WorkerError::Cancelled),
229 };
230 // Emit `WorkerResult` → `OutputEvent::Final` in
231 // parallel. If the SubAgent already
232 // pushed a `Final` via HTTP (`/v1/worker/result` or
233 // `/v1/worker/submit`), skip. The POSTed value is canonical
234 // — protocol.rs L107-110 design intent. Only operator
235 // implementations that do not POST (tests, inline
236 // operators) need this fallback emit.
237 if let Ok(wr) = &result {
238 let tail = engine_clone.output_tail(&task_id_clone, attempt).await;
239 let has_final = tail
240 .iter()
241 .any(|ev| matches!(ev, OutputEvent::Final { .. }));
242 if !has_final {
243 let ev = OutputEvent::Final {
244 content: ContentRef::Inline {
245 value: wr.value.clone(),
246 },
247 ok: wr.ok,
248 };
249 let _ = engine_clone
250 .submit_output(&token_clone, &task_id_clone, attempt, ev)
251 .await;
252 }
253 }
254 let signal: Result<(), WorkerError> = result.map(|_| ());
255 let _ = tx.send(signal);
256 });
257
258 Ok(Box::new(OperatorWorker {
259 handler: WorkerJoinHandler {
260 worker_id,
261 cancel,
262 completion: rx,
263 },
264 }))
265 }
266}
267
268/// Concrete Worker type for the Operator kind — wraps the async
269/// `Operator::execute` call. This represents the handle for a task
270/// backed by an operator (SDK, WebSocket bridge, direct LLM call, etc.)
271/// and embeds a `WorkerJoinHandler` that carries the async signal.
272pub struct OperatorWorker {
273 /// The completion-signal handle for this operator call's spawned
274 /// task.
275 pub handler: WorkerJoinHandler,
276}
277
278#[async_trait]
279impl Worker for OperatorWorker {
280 fn id(&self) -> &WorkerId {
281 &self.handler.worker_id
282 }
283 fn cancel_token(&self) -> CancellationToken {
284 self.handler.cancel.clone()
285 }
286 async fn join(self: Box<Self>) -> Result<(), WorkerError> {
287 self.handler.await_completion().await
288 }
289}