mlua_swarm_server/operator_ws/protocol.rs
1//! Wire format (= S↔C JSON message schema) for `WS /v1/operators/:sid/ws`.
2//!
3//! `ServerMsg` = 4 messages the server pushes to the client (Ask / HookBefore /
4//! HookAfter / Spawn).
5//! `ClientMsg` = 3 messages the client replies with (Answer / HookAck / SpawnAck).
6//! For the parent module's message-flow figure, see the doc of `mod.rs`.
7//!
8//! `PendingReply` is the intermediate representation delivered over the internal
9//! `oneshot` reply channel, used to resolve a `ClientMsg` (arriving from a client)
10//! against the session's pending `HashMap` keyed by `req_id`.
11//! See `session::WSOperatorSession::resolve_pending` for details.
12
13use mlua_swarm::WorkerBinding;
14use serde::{Deserialize, Serialize};
15use serde_json::Value;
16
17// parent_req_id schema field carry: the engine middleware does not fire the
18// true nested-ask case (another ask running mid-ask), so this stays None.
19// The field is kept for schema compatibility; when a middleware extension
20// starts firing the true nested case, it can be reintroduced via task_local
21// or similar.
22pub(super) fn current_parent_req_id() -> Option<String> {
23 None
24}
25
26pub(super) fn default_ok_true() -> bool {
27 true
28}
29
30/// Server → client push messages on `WS /v1/operators/:sid/ws`. Each variant
31/// pairs with a `ClientMsg` reply carrying the same `req_id` (except
32/// `HookAfter`, which is fire-and-forget).
33#[derive(Debug, Serialize)]
34#[serde(tag = "type", rename_all = "snake_case")]
35pub enum ServerMsg {
36 /// `SeniorBridge.ask` request.
37 Ask {
38 /// Correlation key the client must echo back in `ClientMsg::Answer`.
39 req_id: String,
40 /// Reserved for nested-ask correlation; currently always `None`
41 /// (see [`current_parent_req_id`]).
42 #[serde(skip_serializing_if = "Option::is_none")]
43 parent_req_id: Option<String>,
44 /// Task the question originates from.
45 task_id: String,
46 /// Free-form question payload produced by the engine middleware.
47 question: Value,
48 },
49 /// `SpawnHook.before` request (= the client returns OK / NG via ack).
50 HookBefore {
51 /// Correlation key the client must echo back in `ClientMsg::HookAck`.
52 req_id: String,
53 /// Reserved for nested-ask correlation; currently always `None`.
54 #[serde(skip_serializing_if = "Option::is_none")]
55 parent_req_id: Option<String>,
56 /// Task whose spawn is being gated.
57 task_id: String,
58 /// Agent ref about to be spawned.
59 agent: String,
60 /// 1-based dispatch attempt counter for this agent step.
61 attempt: u32,
62 },
63 /// `SpawnHook.after` notification (= no client ack, fire-and-forget).
64 HookAfter {
65 /// Correlation key (informational only — no reply is expected).
66 req_id: String,
67 /// Reserved for nested-ask correlation; currently always `None`.
68 #[serde(skip_serializing_if = "Option::is_none")]
69 parent_req_id: Option<String>,
70 /// Task the spawn belonged to.
71 task_id: String,
72 /// Agent ref that was spawned.
73 agent: String,
74 /// 1-based dispatch attempt counter for this agent step.
75 attempt: u32,
76 /// Worker result payload observed after the spawn completed.
77 result: Value,
78 },
79 /// `Operator.execute` request (= delegates the whole spawn to an external
80 /// Operator, via `OperatorDelegateMiddleware`). The client replies with the
81 /// `WorkerResult`-equivalent (= value + ok) in `spawn_ack`.
82 ///
83 /// **Thin control channel** (the Spawn thin-control axis): the server sends only
84 /// the `capability_token`. `system_prompt` / `prompt` are NOT carried in the
85 /// WS payload. The MainAI (WS Client) forwards the token to the SubAgent,
86 /// and the SubAgent hits `/v1/worker/prompt` + `/v1/worker/result` itself
87 /// with `Authorization: Bearer <capability_token>` — fetching prompt /
88 /// system and posting the result (= heavy payloads go over HTTP; WS stays
89 /// purely thin control).
90 ///
91 /// `capability_token` is `CapToken::encode()` form (= URL-safe base64 of
92 /// serde_json): a session token with `Role::Worker` + `["*"]` scopes + 600s
93 /// TTL. The HMAC sig is verified server-side by `verify_token_for_task` —
94 /// a self-contained capability token (= no server lookup required).
95 ///
96 /// `directive` (= immediate instruction for the MainAI; fix for observation #7):
97 /// Under thin-push discipline, if the payload were only routing fields, the
98 /// MainAI (a large LLM) would fire the drift "I have a token → I should
99 /// fetch it myself" / "I got the prompt → I should embed it literally into
100 /// the SubAgent" 100% of the time (= bias accumulation across 50–100 parallel
101 /// agents dulls decisions). To structurally remove this drift, a literal
102 /// instruction text — "launch a SubAgent, hand it the token + endpoint, and
103 /// let the SubAgent do the fetch / execution / post" — is explicitly embedded
104 /// into the payload (= implicit convention → literal statement).
105 ///
106 /// This field carries **natural-language text intended for the MainAI to read**
107 /// (= not a JSON schema target for parsing). See
108 /// `operator_ws::session::default_spawn_directive()` for the server-side
109 /// default text.
110 Spawn {
111 /// Correlation key the client must echo back in `ClientMsg::SpawnAck`.
112 req_id: String,
113 /// Reserved for nested-ask correlation; currently always `None`.
114 #[serde(skip_serializing_if = "Option::is_none")]
115 parent_req_id: Option<String>,
116 /// Task the delegated spawn belongs to.
117 task_id: String,
118 /// Agent ref the Operator is asked to execute.
119 agent: String,
120 /// 1-based dispatch attempt counter for this agent step.
121 attempt: u32,
122 /// `CapToken::encode()` form Bearer credential for the worker HTTP
123 /// endpoints (see the variant doc above for the thin-control contract).
124 capability_token: String,
125 /// Short handle (= `wh-XXXXXXXX`, 12 chars). An alternate Bearer path
126 /// paired with `capability_token`. When `/v1/worker/submit` receives a
127 /// handle in Bearer, the server resolves nonce → `task_id` via the
128 /// `worker_handles` map (the short-handle switchover — removes
129 /// base64 copy-paste accidents). SubAgents (mse-worker) should use
130 /// **this field** instead of `capability_token` as the recommended path.
131 #[serde(skip_serializing_if = "Option::is_none")]
132 worker_handle: Option<String>,
133 /// Worker binding resolved from the Blueprint at compile time. `None`
134 /// never reaches the wire on the WS thin path (compile-time gate,
135 /// see `Operator::requires_worker_binding`), but the field stays
136 /// optional for forward compatibility.
137 #[serde(skip_serializing_if = "Option::is_none")]
138 worker: Option<WorkerBinding>,
139 /// Literal natural-language instruction for the MainAI (see the
140 /// variant doc above for why this is embedded in the payload).
141 directive: String,
142 },
143}
144
145/// Client → server reply messages on `WS /v1/operators/:sid/ws`. Each variant
146/// resolves the pending oneshot registered under its `req_id`
147/// (see [`super::session::WSOperatorSession::resolve_pending`]).
148#[derive(Debug, Deserialize)]
149#[serde(tag = "type", rename_all = "snake_case")]
150pub enum ClientMsg {
151 /// Reply to `ServerMsg::Ask` (`SeniorBridge.ask` result).
152 Answer {
153 /// Correlation key copied from the originating `ServerMsg::Ask`.
154 req_id: String,
155 /// Answer payload returned to the engine middleware.
156 value: Value,
157 },
158 /// Ack for `SpawnHook.before`. `ok=false` rejects the spawn
159 /// (= `MainAIMiddleware` converts it into `SpawnError::RejectedByMiddleware`).
160 /// `reason` propagates as `Err(reason)`.
161 HookAck {
162 /// Correlation key copied from the originating `ServerMsg::HookBefore`.
163 req_id: String,
164 /// `true` allows the spawn; `false` rejects it.
165 ok: bool,
166 /// Optional rejection reason surfaced to the engine when `ok=false`.
167 #[serde(default)]
168 reason: Option<String>,
169 },
170 /// Ack for `Operator.execute` (Spawn). `value = WorkerResult.value`,
171 /// `ok = WorkerResult.ok`. When `error` is `Some`, the `Operator` returns
172 /// it as `WorkerError`.
173 ///
174 /// After the thin-path switch (= the thin-control axis): if the MainAI returns this ack
175 /// **after** the SubAgent has hit HTTP `/v1/worker/result`, the server-side
176 /// dispatch path can complete with both the `Final` in `output_tail` and
177 /// this ack's `value` aligned. Sending an empty JSON `{}` for `value` makes
178 /// the `task.last_result` written by the HTTP path (= `post_result`)
179 /// canonical (= the ack-side `value` is duplicate / informational).
180 SpawnAck {
181 /// Correlation key copied from the originating `ServerMsg::Spawn`.
182 req_id: String,
183 /// `WorkerResult.value` equivalent; empty `{}` defers to the HTTP-path
184 /// result (see the variant doc above).
185 #[serde(default)]
186 value: Value,
187 /// `WorkerResult.ok` equivalent; defaults to `true` when omitted.
188 #[serde(default = "default_ok_true")]
189 ok: bool,
190 /// When `Some`, the Operator surfaces it as a `WorkerError`.
191 #[serde(default)]
192 error: Option<String>,
193 },
194}
195
196/// Intermediate representation for the session's `req_id` ↔ oneshot reply
197/// channel. The resolved form of `ClientMsg` looked up on the session side by
198/// `req_id` (= runtime-only, not wire format).
199pub(super) enum PendingReply {
200 /// Answer (return `Value` of `SeniorBridge.ask`).
201 Answer(Value),
202 /// `hook_ack` (OK / NG for `before`).
203 HookAck { ok: bool, reason: Option<String> },
204 /// `spawn_ack` (return of `Operator.execute` = `WorkerResult`-equivalent).
205 SpawnAck {
206 value: Value,
207 ok: bool,
208 error: Option<String>,
209 },
210}