Skip to main content

mlua_swarm_server/operator_ws/
protocol.rs

1//! Wire format (= S↔C JSON message schema) for `WS /v1/operators/:sid/ws`.
2//!
3//! `ServerMsg` = 4 messages the server pushes to the client (Ask / HookBefore /
4//! HookAfter / Spawn).
5//! `ClientMsg` = 3 messages the client replies with (Answer / HookAck / SpawnAck).
6//! For the parent module's message-flow figure, see the doc of `mod.rs`.
7//!
8//! `PendingReply` is the intermediate representation delivered over the internal
9//! `oneshot` reply channel, used to resolve a `ClientMsg` (arriving from a client)
10//! against the session's pending `HashMap` keyed by `req_id`.
11//! See `session::WSOperatorSession::resolve_pending` for details.
12
13use mlua_swarm::WorkerBinding;
14use serde::{Deserialize, Serialize};
15use serde_json::Value;
16
17// parent_req_id schema field carry: the engine middleware does not fire the
18// true nested-ask case (another ask running mid-ask), so this stays None.
19// The field is kept for schema compatibility; when a middleware extension
20// starts firing the true nested case, it can be reintroduced via task_local
21// or similar.
22pub(super) fn current_parent_req_id() -> Option<String> {
23    None
24}
25
26pub(super) fn default_ok_true() -> bool {
27    true
28}
29
30/// Server → client push messages on `WS /v1/operators/:sid/ws`. Each variant
31/// pairs with a `ClientMsg` reply carrying the same `req_id` (except
32/// `HookAfter`, which is fire-and-forget).
33#[derive(Debug, Serialize)]
34#[serde(tag = "type", rename_all = "snake_case")]
35pub enum ServerMsg {
36    /// `SeniorBridge.ask` request.
37    Ask {
38        /// Correlation key the client must echo back in `ClientMsg::Answer`.
39        req_id: String,
40        /// Reserved for nested-ask correlation; currently always `None`
41        /// (see [`current_parent_req_id`]).
42        #[serde(skip_serializing_if = "Option::is_none")]
43        parent_req_id: Option<String>,
44        /// Task the question originates from.
45        task_id: String,
46        /// Free-form question payload produced by the engine middleware.
47        question: Value,
48    },
49    /// `SpawnHook.before` request (= the client returns OK / NG via ack).
50    HookBefore {
51        /// Correlation key the client must echo back in `ClientMsg::HookAck`.
52        req_id: String,
53        /// Reserved for nested-ask correlation; currently always `None`.
54        #[serde(skip_serializing_if = "Option::is_none")]
55        parent_req_id: Option<String>,
56        /// Task whose spawn is being gated.
57        task_id: String,
58        /// Agent ref about to be spawned.
59        agent: String,
60        /// 1-based dispatch attempt counter for this agent step.
61        attempt: u32,
62    },
63    /// `SpawnHook.after` notification (= no client ack, fire-and-forget).
64    HookAfter {
65        /// Correlation key (informational only — no reply is expected).
66        req_id: String,
67        /// Reserved for nested-ask correlation; currently always `None`.
68        #[serde(skip_serializing_if = "Option::is_none")]
69        parent_req_id: Option<String>,
70        /// Task the spawn belonged to.
71        task_id: String,
72        /// Agent ref that was spawned.
73        agent: String,
74        /// 1-based dispatch attempt counter for this agent step.
75        attempt: u32,
76        /// Worker result payload observed after the spawn completed.
77        result: Value,
78    },
79    /// `Operator.execute` request (= delegates the whole spawn to an external
80    /// Operator, via `OperatorDelegateMiddleware`). The client replies with the
81    /// `WorkerResult`-equivalent (= value + ok) in `spawn_ack`.
82    ///
83    /// **Thin control channel** (the Spawn thin-control axis): the server sends only
84    /// the `capability_token`. `system_prompt` / `prompt` are NOT carried in the
85    /// WS payload. The MainAI (WS Client) forwards the token to the SubAgent,
86    /// and the SubAgent hits `/v1/worker/prompt` + `/v1/worker/result` itself
87    /// with `Authorization: Bearer <capability_token>` — fetching prompt /
88    /// system and posting the result (= heavy payloads go over HTTP; WS stays
89    /// purely thin control).
90    ///
91    /// `capability_token` is `CapToken::encode()` form (= URL-safe base64 of
92    /// serde_json): a session token with `Role::Worker` + `["*"]` scopes + 600s
93    /// TTL. The HMAC sig is verified server-side by `verify_token_for_task` —
94    /// a self-contained capability token (= no server lookup required).
95    ///
96    /// `directive` (= immediate instruction for the MainAI; fix for observation #7):
97    /// Under thin-push discipline, if the payload were only routing fields, the
98    /// MainAI (a large LLM) would fire the drift "I have a token → I should
99    /// fetch it myself" / "I got the prompt → I should embed it literally into
100    /// the SubAgent" 100% of the time (= bias accumulation across 50–100 parallel
101    /// agents dulls decisions). To structurally remove this drift, a literal
102    /// instruction text — "launch a SubAgent, hand it the token + endpoint, and
103    /// let the SubAgent do the fetch / execution / post" — is explicitly embedded
104    /// into the payload (= implicit convention → literal statement).
105    ///
106    /// This field carries **natural-language text intended for the MainAI to read**
107    /// (= not a JSON schema target for parsing). See
108    /// `operator_ws::session::default_spawn_directive()` for the server-side
109    /// default text.
110    Spawn {
111        /// Correlation key the client must echo back in `ClientMsg::SpawnAck`.
112        req_id: String,
113        /// Reserved for nested-ask correlation; currently always `None`.
114        #[serde(skip_serializing_if = "Option::is_none")]
115        parent_req_id: Option<String>,
116        /// Task the delegated spawn belongs to.
117        task_id: String,
118        /// Agent ref the Operator is asked to execute.
119        agent: String,
120        /// 1-based dispatch attempt counter for this agent step.
121        attempt: u32,
122        /// `CapToken::encode()` form Bearer credential for the worker HTTP
123        /// endpoints (see the variant doc above for the thin-control contract).
124        capability_token: String,
125        /// Short handle (= `wh-XXXXXXXX`, 12 chars). An alternate Bearer path
126        /// paired with `capability_token`. When `/v1/worker/submit` receives a
127        /// handle in Bearer, the server resolves nonce → `task_id` via the
128        /// `worker_handles` map (the short-handle switchover — removes
129        /// base64 copy-paste accidents). SubAgents (mse-worker) should use
130        /// **this field** instead of `capability_token` as the recommended path.
131        #[serde(skip_serializing_if = "Option::is_none")]
132        worker_handle: Option<String>,
133        /// Worker binding resolved from the Blueprint at compile time. `None`
134        /// never reaches the wire on the WS thin path (compile-time gate,
135        /// see `Operator::requires_worker_binding`), but the field stays
136        /// optional for forward compatibility.
137        #[serde(skip_serializing_if = "Option::is_none")]
138        worker: Option<WorkerBinding>,
139        /// Literal natural-language instruction for the MainAI (see the
140        /// variant doc above for why this is embedded in the payload).
141        directive: String,
142    },
143}
144
145/// Client → server reply messages on `WS /v1/operators/:sid/ws`. Each variant
146/// resolves the pending oneshot registered under its `req_id`
147/// (see [`super::session::WSOperatorSession::resolve_pending`]).
148#[derive(Debug, Deserialize)]
149#[serde(tag = "type", rename_all = "snake_case")]
150pub enum ClientMsg {
151    /// Reply to `ServerMsg::Ask` (`SeniorBridge.ask` result).
152    Answer {
153        /// Correlation key copied from the originating `ServerMsg::Ask`.
154        req_id: String,
155        /// Answer payload returned to the engine middleware.
156        value: Value,
157    },
158    /// Ack for `SpawnHook.before`. `ok=false` rejects the spawn
159    /// (= `MainAIMiddleware` converts it into `SpawnError::RejectedByMiddleware`).
160    /// `reason` propagates as `Err(reason)`.
161    HookAck {
162        /// Correlation key copied from the originating `ServerMsg::HookBefore`.
163        req_id: String,
164        /// `true` allows the spawn; `false` rejects it.
165        ok: bool,
166        /// Optional rejection reason surfaced to the engine when `ok=false`.
167        #[serde(default)]
168        reason: Option<String>,
169    },
170    /// Ack for `Operator.execute` (Spawn). `value = WorkerResult.value`,
171    /// `ok = WorkerResult.ok`. When `error` is `Some`, the `Operator` returns
172    /// it as `WorkerError`.
173    ///
174    /// After the thin-path switch (= the thin-control axis): if the MainAI returns this ack
175    /// **after** the SubAgent has hit HTTP `/v1/worker/result`, the server-side
176    /// dispatch path can complete with both the `Final` in `output_tail` and
177    /// this ack's `value` aligned. Sending an empty JSON `{}` for `value` makes
178    /// the `task.last_result` written by the HTTP path (= `post_result`)
179    /// canonical (= the ack-side `value` is duplicate / informational).
180    SpawnAck {
181        /// Correlation key copied from the originating `ServerMsg::Spawn`.
182        req_id: String,
183        /// `WorkerResult.value` equivalent; empty `{}` defers to the HTTP-path
184        /// result (see the variant doc above).
185        #[serde(default)]
186        value: Value,
187        /// `WorkerResult.ok` equivalent; defaults to `true` when omitted.
188        #[serde(default = "default_ok_true")]
189        ok: bool,
190        /// When `Some`, the Operator surfaces it as a `WorkerError`.
191        #[serde(default)]
192        error: Option<String>,
193    },
194}
195
196/// Intermediate representation for the session's `req_id` ↔ oneshot reply
197/// channel. The resolved form of `ClientMsg` looked up on the session side by
198/// `req_id` (= runtime-only, not wire format).
199pub(super) enum PendingReply {
200    /// Answer (return `Value` of `SeniorBridge.ask`).
201    Answer(Value),
202    /// `hook_ack` (OK / NG for `before`).
203    HookAck { ok: bool, reason: Option<String> },
204    /// `spawn_ack` (return of `Operator.execute` = `WorkerResult`-equivalent).
205    SpawnAck {
206        value: Value,
207        ok: bool,
208        error: Option<String>,
209    },
210}