Skip to main content

mlua_swarm/
operator.rs

1//! Operator abstraction.
2//!
3//! ## Roles
4//!
5//! - **Spawners** (`SpawnerAdapter`) do not know about `Operator` `kind`s.
6//!   Ordinary dispatches are handled by `ProcessSpawner` /
7//!   `InProcSpawner` / etc.
8//! - `OperatorSpawner` is the `SpawnerAdapter` that routes dispatches
9//!   through an operator. It holds an `Arc<dyn Operator>` and does one
10//!   thing: hand every spawn request to that operator's `execute`. It
11//!   still does not know the operator's `kind` (`MainAi` / `Human` /
12//!   `Automate` / `Composite`).
13//! - The `Operator` trait itself returns a `WorkerResult`, as a
14//!   synchronous backend. Implementations are free per kind — a `MainAi`
15//!   operator might round-trip through Claude via an HTTP callback, a
16//!   `Human` operator might prompt on a CLI, an `Automate` operator
17//!   might delegate to a different spawner, and so on.
18//!
19//! Which dispatches go through the `OperatorSpawner` is decided at the
20//! flow.ir layer (designer + hints + Swarm compiler). The algocline
21//! strategy side never says "hand this to the operator" — a firm
22//! separation of concerns.
23
24pub mod render;
25
26pub use render::{render_system, slots_from_prompt, RenderError};
27
28use crate::core::ctx::Ctx;
29use crate::core::engine::Engine;
30use crate::types::{CapToken, TaskId, WorkerId};
31use crate::worker::adapter::{SpawnError, SpawnerAdapter, WorkerError, WorkerResult};
32use crate::worker::output::{ContentRef, OutputEvent};
33use crate::worker::{Worker, WorkerJoinHandler};
34use async_trait::async_trait;
35use std::sync::Arc;
36use tokio::sync::oneshot;
37use tokio_util::sync::CancellationToken;
38
39/// The `Operator` trait: takes a spawn request and returns a
40/// `WorkerResult`. The backend for `OperatorSpawner`. Implementations
41/// are free to differ per kind; the spawner just calls `execute` and
42/// stays out of the internals.
43///
44/// Arguments — a two-slot payload plus `worker_token` (the thin path
45/// was added later):
46///
47/// - `system`: the agent persona — the rendered value of
48///   `AgentDef.profile.system_prompt` after template expansion. `None`
49///   means no profile. Expected to map straight onto the LLM API's
50///   system message; direct-LLM operators consume this.
51/// - `prompt`: task-specific intent — `TaskSpec.initial_directive`,
52///   pulled server-side via `engine.fetch_prompt`. Expected to map
53///   straight onto the LLM API's user message.
54/// - `worker_token`: a capability token (`Role::Worker`, 600s TTL,
55///   `scopes = ["*"]`). Thin-path operators (a `a WebSocket-backed operator session`,
56///   for instance) `encode()` this token and hand it to the MainAI
57///   WebSocket client, so the SubAgent can hit `/v1/worker/prompt` +
58///   `/v1/worker/result` with `Authorization: Bearer <encoded>`.
59///   Direct-LLM operators may ignore it.
60///
61/// The trait passes both slots so the same signature works for the
62/// thin path and the direct path; the implementation picks which one
63/// it takes (consume the server-rendered `system` directly, or forward
64/// the token and let the client fetch).
65#[async_trait]
66pub trait Operator: Send + Sync {
67    /// Executes one spawn request against this operator's backend and
68    /// returns the resulting `WorkerResult` (or a `WorkerError` if the
69    /// backend failed). See the trait doc above for the meaning of each
70    /// argument.
71    async fn execute(
72        &self,
73        ctx: &Ctx,
74        system: Option<String>,
75        prompt: String,
76        worker_token: CapToken,
77    ) -> Result<WorkerResult, WorkerError>;
78}
79
80/// A `SpawnerAdapter` implementation that hands the dispatch off to an
81/// `Arc<dyn Operator>`.
82///
83/// `OperatorSpawner` itself does not inspect the operator's `kind` —
84/// `MainAi` / `Human` / `Automate` / `Composite` all go through the same
85/// path, and the operator implementation absorbs the differences.
86///
87/// # Position — the AgentSpec-axis Operator path
88///
89/// Use this type on the path that **bakes a separate Operator backend
90/// into every `AgentDef`**. For an `AgentKind::Operator` `AgentDef`, the
91/// `OperatorSpawnerFactory` produces one with `OperatorSpawner::new(op)`
92/// and places it in `routes[agent_name]`. Agents flowing in through the
93/// `agent.md` loader default to `kind = Operator`, so they land here.
94///
95/// The paired **Blueprint-global (session) axis** is
96/// `crate::middleware::OperatorDelegateMiddleware` — a single operator
97/// backend registered on the session and applied uniformly across every
98/// agent. When both are effective, the delegate middleware sits at the
99/// outer end of the stack and bypasses `inner.spawn`; this type is inert
100/// and no double fire can occur. See the `OperatorSpawnerFactory` doc
101/// for the exclusivity narrative.
102pub struct OperatorSpawner {
103    operator: Arc<dyn Operator>,
104    /// The compile-time-baked `AgentDef.profile.system_prompt` — the
105    /// agent's persona. If `Some`, it takes priority at spawn time; if
106    /// `None`, we fall back to `fetch_prompt` (`initial_directive`).
107    system_prompt: Option<String>,
108}
109
110impl OperatorSpawner {
111    /// Binds an operator backend plus an optional compile-time
112    /// `system_prompt` template (rendered per-spawn via `render_system`).
113    pub fn new(operator: Arc<dyn Operator>, system_prompt: Option<String>) -> Self {
114        Self {
115            operator,
116            system_prompt,
117        }
118    }
119}
120
121#[async_trait]
122impl SpawnerAdapter for OperatorSpawner {
123    async fn spawn(
124        &self,
125        engine: &Engine,
126        ctx: &Ctx,
127        task_id: TaskId,
128        attempt: u32,
129        token: CapToken,
130    ) -> Result<Box<dyn Worker>, SpawnError> {
131        // By convention the spawner pulls `prompt`
132        // through `fetch_prompt`. The `system_prompt` (from
133        // `AgentDef.profile`) travels on the other slot — sibling to the
134        // AgentBlock path's `BlockConfig.context` / `.prompt` split.
135        let prompt = engine
136            .fetch_prompt(&token, &task_id)
137            .await
138            .map_err(|e| SpawnError::Internal(format!("fetch_prompt: {e}")))?;
139
140        // Render the `system_prompt` template.
141        // Expand the prompt into a slot map and hand the template to
142        // minijinja. The syntax used inside the agent.md body is
143        // Jinja2-compatible (`{{ directive }}` / `{% if intent %}` /
144        // `{{ x | upper }}`), with strict undefined variables and
145        // auto-escape disabled.
146        let system = match self.system_prompt.as_deref() {
147            Some(tmpl) => {
148                let slots = render::slots_from_prompt(&prompt);
149                let rendered = render::render_system(tmpl, &slots)
150                    .map_err(|e| SpawnError::Internal(format!("render system_prompt: {e}")))?;
151                Some(rendered)
152            }
153            None => None,
154        };
155
156        // Bake the rendered `system`
157        // into engine state so the SubAgent can fetch it alongside
158        // `prompt` on the `HTTP /v1/worker/prompt` path. Failures are
159        // fail-loud via `SpawnError::Internal` — no silent fallback.
160        engine
161            .bake_worker_system_prompt(&task_id, attempt, system.clone())
162            .await
163            .map_err(|e| SpawnError::Internal(format!("bake system_prompt: {e}")))?;
164
165        let op = self.operator.clone();
166        let engine_clone = engine.clone();
167        let token_clone = token.clone();
168        let token_for_op = token.clone();
169        let task_id_clone = task_id.clone();
170        let ctx_clone = ctx.clone();
171        let (tx, rx) = oneshot::channel();
172        let cancel = CancellationToken::new();
173        let cancel_inner = cancel.clone();
174        let worker_id = WorkerId::new();
175
176        tokio::spawn(async move {
177            let result: Result<WorkerResult, WorkerError> = tokio::select! {
178                r = op.execute(&ctx_clone, system, prompt, token_for_op) => r,
179                _ = cancel_inner.cancelled() => Err(WorkerError::Cancelled),
180            };
181            // Emit `WorkerResult` → `OutputEvent::Final` in
182            // parallel. If the SubAgent already
183            // pushed a `Final` via HTTP (`/v1/worker/result` or
184            // `/v1/worker/submit`), skip. The POSTed value is canonical
185            // — protocol.rs L107-110 design intent. Only operator
186            // implementations that do not POST (tests, inline
187            // operators) need this fallback emit.
188            if let Ok(wr) = &result {
189                let tail = engine_clone.output_tail(&task_id_clone, attempt).await;
190                let has_final = tail
191                    .iter()
192                    .any(|ev| matches!(ev, OutputEvent::Final { .. }));
193                if !has_final {
194                    let ev = OutputEvent::Final {
195                        content: ContentRef::Inline {
196                            value: wr.value.clone(),
197                        },
198                        ok: wr.ok,
199                    };
200                    let _ = engine_clone
201                        .submit_output(&token_clone, &task_id_clone, attempt, ev)
202                        .await;
203                }
204            }
205            let signal: Result<(), WorkerError> = result.map(|_| ());
206            let _ = tx.send(signal);
207        });
208
209        Ok(Box::new(OperatorWorker {
210            handler: WorkerJoinHandler {
211                worker_id,
212                cancel,
213                completion: rx,
214            },
215        }))
216    }
217}
218
219/// Concrete Worker type for the Operator kind — wraps the async
220/// `Operator::execute` call. This represents the handle for a task
221/// backed by an operator (SDK, WebSocket bridge, direct LLM call, etc.)
222/// and embeds a `WorkerJoinHandler` that carries the async signal.
223pub struct OperatorWorker {
224    /// The completion-signal handle for this operator call's spawned
225    /// task.
226    pub handler: WorkerJoinHandler,
227}
228
229#[async_trait]
230impl Worker for OperatorWorker {
231    fn id(&self) -> &WorkerId {
232        &self.handler.worker_id
233    }
234    fn cancel_token(&self) -> CancellationToken {
235        self.handler.cancel.clone()
236    }
237    async fn join(self: Box<Self>) -> Result<(), WorkerError> {
238        self.handler.await_completion().await
239    }
240}