mlua_swarm_server/operator_ws/protocol.rs
1//! Wire format (= S↔C JSON message schema) for `WS /v1/operators/:sid/ws`.
2//!
3//! `ServerMsg` = 4 messages the server pushes to the client (Ask / HookBefore /
4//! HookAfter / Spawn).
5//! `ClientMsg` = 3 messages the client replies with (Answer / HookAck / SpawnAck).
6//! For the parent module's message-flow figure, see the doc of `mod.rs`.
7//!
8//! `PendingReply` is the intermediate representation delivered over the internal
9//! `oneshot` reply channel, used to resolve a `ClientMsg` (arriving from a client)
10//! against the session's pending `HashMap` keyed by `req_id`.
11//! See `session::WSOperatorSession::resolve_pending` for details.
12
13use serde::{Deserialize, Serialize};
14use serde_json::Value;
15
16// parent_req_id schema field carry: the engine middleware does not fire the
17// true nested-ask case (another ask running mid-ask), so this stays None.
18// The field is kept for schema compatibility; when a middleware extension
19// starts firing the true nested case, it can be reintroduced via task_local
20// or similar.
21pub(super) fn current_parent_req_id() -> Option<String> {
22 None
23}
24
25pub(super) fn default_ok_true() -> bool {
26 true
27}
28
29/// Server → client push messages on `WS /v1/operators/:sid/ws`. Each variant
30/// pairs with a `ClientMsg` reply carrying the same `req_id` (except
31/// `HookAfter`, which is fire-and-forget).
32#[derive(Debug, Serialize)]
33#[serde(tag = "type", rename_all = "snake_case")]
34pub enum ServerMsg {
35 /// `SeniorBridge.ask` request.
36 Ask {
37 /// Correlation key the client must echo back in `ClientMsg::Answer`.
38 req_id: String,
39 /// Reserved for nested-ask correlation; currently always `None`
40 /// (see [`current_parent_req_id`]).
41 #[serde(skip_serializing_if = "Option::is_none")]
42 parent_req_id: Option<String>,
43 /// Task the question originates from.
44 task_id: String,
45 /// Free-form question payload produced by the engine middleware.
46 question: Value,
47 },
48 /// `SpawnHook.before` request (= the client returns OK / NG via ack).
49 HookBefore {
50 /// Correlation key the client must echo back in `ClientMsg::HookAck`.
51 req_id: String,
52 /// Reserved for nested-ask correlation; currently always `None`.
53 #[serde(skip_serializing_if = "Option::is_none")]
54 parent_req_id: Option<String>,
55 /// Task whose spawn is being gated.
56 task_id: String,
57 /// Agent ref about to be spawned.
58 agent: String,
59 /// 1-based dispatch attempt counter for this agent step.
60 attempt: u32,
61 },
62 /// `SpawnHook.after` notification (= no client ack, fire-and-forget).
63 HookAfter {
64 /// Correlation key (informational only — no reply is expected).
65 req_id: String,
66 /// Reserved for nested-ask correlation; currently always `None`.
67 #[serde(skip_serializing_if = "Option::is_none")]
68 parent_req_id: Option<String>,
69 /// Task the spawn belonged to.
70 task_id: String,
71 /// Agent ref that was spawned.
72 agent: String,
73 /// 1-based dispatch attempt counter for this agent step.
74 attempt: u32,
75 /// Worker result payload observed after the spawn completed.
76 result: Value,
77 },
78 /// `Operator.execute` request (= delegates the whole spawn to an external
79 /// Operator, via `OperatorDelegateMiddleware`). The client replies with the
80 /// `WorkerResult`-equivalent (= value + ok) in `spawn_ack`.
81 ///
82 /// **Thin control channel** (the Spawn thin-control axis): the server sends only
83 /// the `capability_token`. `system_prompt` / `prompt` are NOT carried in the
84 /// WS payload. The MainAI (WS Client) forwards the token to the SubAgent,
85 /// and the SubAgent hits `/v1/worker/prompt` + `/v1/worker/result` itself
86 /// with `Authorization: Bearer <capability_token>` — fetching prompt /
87 /// system and posting the result (= heavy payloads go over HTTP; WS stays
88 /// purely thin control).
89 ///
90 /// `capability_token` is `CapToken::encode()` form (= URL-safe base64 of
91 /// serde_json): a session token with `Role::Worker` + `["*"]` scopes + 600s
92 /// TTL. The HMAC sig is verified server-side by `verify_token_for_task` —
93 /// a self-contained capability token (= no server lookup required).
94 ///
95 /// `directive` (= immediate instruction for the MainAI; fix for observation #7):
96 /// Under thin-push discipline, if the payload were only routing fields, the
97 /// MainAI (a large LLM) would fire the drift "I have a token → I should
98 /// fetch it myself" / "I got the prompt → I should embed it literally into
99 /// the SubAgent" 100% of the time (= bias accumulation across 50–100 parallel
100 /// agents dulls decisions). To structurally remove this drift, a literal
101 /// instruction text — "launch a SubAgent, hand it the token + endpoint, and
102 /// let the SubAgent do the fetch / execution / post" — is explicitly embedded
103 /// into the payload (= implicit convention → literal statement).
104 ///
105 /// This field carries **natural-language text intended for the MainAI to read**
106 /// (= not a JSON schema target for parsing). See
107 /// `operator_ws::session::default_spawn_directive()` for the server-side
108 /// default text.
109 Spawn {
110 /// Correlation key the client must echo back in `ClientMsg::SpawnAck`.
111 req_id: String,
112 /// Reserved for nested-ask correlation; currently always `None`.
113 #[serde(skip_serializing_if = "Option::is_none")]
114 parent_req_id: Option<String>,
115 /// Task the delegated spawn belongs to.
116 task_id: String,
117 /// Agent ref the Operator is asked to execute.
118 agent: String,
119 /// 1-based dispatch attempt counter for this agent step.
120 attempt: u32,
121 /// `CapToken::encode()` form Bearer credential for the worker HTTP
122 /// endpoints (see the variant doc above for the thin-control contract).
123 capability_token: String,
124 /// Short handle (= `wh-XXXXXXXX`, 12 chars). An alternate Bearer path
125 /// paired with `capability_token`. When `/v1/worker/submit` receives a
126 /// handle in Bearer, the server resolves nonce → `task_id` via the
127 /// `worker_handles` map (the short-handle switchover — removes
128 /// base64 copy-paste accidents). SubAgents (mse-worker) should use
129 /// **this field** instead of `capability_token` as the recommended path.
130 #[serde(skip_serializing_if = "Option::is_none")]
131 worker_handle: Option<String>,
132 /// Literal natural-language instruction for the MainAI (see the
133 /// variant doc above for why this is embedded in the payload).
134 directive: String,
135 },
136}
137
138/// Client → server reply messages on `WS /v1/operators/:sid/ws`. Each variant
139/// resolves the pending oneshot registered under its `req_id`
140/// (see [`super::session::WSOperatorSession::resolve_pending`]).
141#[derive(Debug, Deserialize)]
142#[serde(tag = "type", rename_all = "snake_case")]
143pub enum ClientMsg {
144 /// Reply to `ServerMsg::Ask` (`SeniorBridge.ask` result).
145 Answer {
146 /// Correlation key copied from the originating `ServerMsg::Ask`.
147 req_id: String,
148 /// Answer payload returned to the engine middleware.
149 value: Value,
150 },
151 /// Ack for `SpawnHook.before`. `ok=false` rejects the spawn
152 /// (= `MainAIMiddleware` converts it into `SpawnError::RejectedByMiddleware`).
153 /// `reason` propagates as `Err(reason)`.
154 HookAck {
155 /// Correlation key copied from the originating `ServerMsg::HookBefore`.
156 req_id: String,
157 /// `true` allows the spawn; `false` rejects it.
158 ok: bool,
159 /// Optional rejection reason surfaced to the engine when `ok=false`.
160 #[serde(default)]
161 reason: Option<String>,
162 },
163 /// Ack for `Operator.execute` (Spawn). `value = WorkerResult.value`,
164 /// `ok = WorkerResult.ok`. When `error` is `Some`, the `Operator` returns
165 /// it as `WorkerError`.
166 ///
167 /// After the thin-path switch (= the thin-control axis): if the MainAI returns this ack
168 /// **after** the SubAgent has hit HTTP `/v1/worker/result`, the server-side
169 /// dispatch path can complete with both the `Final` in `output_tail` and
170 /// this ack's `value` aligned. Sending an empty JSON `{}` for `value` makes
171 /// the `task.last_result` written by the HTTP path (= `post_result`)
172 /// canonical (= the ack-side `value` is duplicate / informational).
173 SpawnAck {
174 /// Correlation key copied from the originating `ServerMsg::Spawn`.
175 req_id: String,
176 /// `WorkerResult.value` equivalent; empty `{}` defers to the HTTP-path
177 /// result (see the variant doc above).
178 #[serde(default)]
179 value: Value,
180 /// `WorkerResult.ok` equivalent; defaults to `true` when omitted.
181 #[serde(default = "default_ok_true")]
182 ok: bool,
183 /// When `Some`, the Operator surfaces it as a `WorkerError`.
184 #[serde(default)]
185 error: Option<String>,
186 },
187}
188
189/// Intermediate representation for the session's `req_id` ↔ oneshot reply
190/// channel. The resolved form of `ClientMsg` looked up on the session side by
191/// `req_id` (= runtime-only, not wire format).
192pub(super) enum PendingReply {
193 /// Answer (return `Value` of `SeniorBridge.ask`).
194 Answer(Value),
195 /// `hook_ack` (OK / NG for `before`).
196 HookAck { ok: bool, reason: Option<String> },
197 /// `spawn_ack` (return of `Operator.execute` = `WorkerResult`-equivalent).
198 SpawnAck {
199 value: Value,
200 ok: bool,
201 error: Option<String>,
202 },
203}