Skip to main content

mlua_swarm_server/operator_ws/
protocol.rs

1//! Wire format (= S↔C JSON message schema) for `WS /v1/operators/:sid/ws`.
2//!
3//! `ServerMsg` = 4 messages the server pushes to the client (Ask / HookBefore /
4//! HookAfter / Spawn).
5//! `ClientMsg` = 3 messages the client replies with (Answer / HookAck / SpawnAck).
6//! For the parent module's message-flow figure, see the doc of `mod.rs`.
7//!
8//! `PendingReply` is the intermediate representation delivered over the internal
9//! `oneshot` reply channel, used to resolve a `ClientMsg` (arriving from a client)
10//! against the session's pending `HashMap` keyed by `req_id`.
11//! See `session::WSOperatorSession::resolve_pending` for details.
12
13use serde::{Deserialize, Serialize};
14use serde_json::Value;
15
16// parent_req_id schema field carry: the engine middleware does not fire the
17// true nested-ask case (another ask running mid-ask), so this stays None.
18// The field is kept for schema compatibility; when a middleware extension
19// starts firing the true nested case, it can be reintroduced via task_local
20// or similar.
21pub(super) fn current_parent_req_id() -> Option<String> {
22    None
23}
24
25pub(super) fn default_ok_true() -> bool {
26    true
27}
28
29/// Server → client push messages on `WS /v1/operators/:sid/ws`. Each variant
30/// pairs with a `ClientMsg` reply carrying the same `req_id` (except
31/// `HookAfter`, which is fire-and-forget).
32#[derive(Debug, Serialize)]
33#[serde(tag = "type", rename_all = "snake_case")]
34pub enum ServerMsg {
35    /// `SeniorBridge.ask` request.
36    Ask {
37        /// Correlation key the client must echo back in `ClientMsg::Answer`.
38        req_id: String,
39        /// Reserved for nested-ask correlation; currently always `None`
40        /// (see [`current_parent_req_id`]).
41        #[serde(skip_serializing_if = "Option::is_none")]
42        parent_req_id: Option<String>,
43        /// Task the question originates from.
44        task_id: String,
45        /// Free-form question payload produced by the engine middleware.
46        question: Value,
47    },
48    /// `SpawnHook.before` request (= the client returns OK / NG via ack).
49    HookBefore {
50        /// Correlation key the client must echo back in `ClientMsg::HookAck`.
51        req_id: String,
52        /// Reserved for nested-ask correlation; currently always `None`.
53        #[serde(skip_serializing_if = "Option::is_none")]
54        parent_req_id: Option<String>,
55        /// Task whose spawn is being gated.
56        task_id: String,
57        /// Agent ref about to be spawned.
58        agent: String,
59        /// 1-based dispatch attempt counter for this agent step.
60        attempt: u32,
61    },
62    /// `SpawnHook.after` notification (= no client ack, fire-and-forget).
63    HookAfter {
64        /// Correlation key (informational only — no reply is expected).
65        req_id: String,
66        /// Reserved for nested-ask correlation; currently always `None`.
67        #[serde(skip_serializing_if = "Option::is_none")]
68        parent_req_id: Option<String>,
69        /// Task the spawn belonged to.
70        task_id: String,
71        /// Agent ref that was spawned.
72        agent: String,
73        /// 1-based dispatch attempt counter for this agent step.
74        attempt: u32,
75        /// Worker result payload observed after the spawn completed.
76        result: Value,
77    },
78    /// `Operator.execute` request (= delegates the whole spawn to an external
79    /// Operator, via `OperatorDelegateMiddleware`). The client replies with the
80    /// `WorkerResult`-equivalent (= value + ok) in `spawn_ack`.
81    ///
82    /// **Thin control channel** (the Spawn thin-control axis): the server sends only
83    /// the `capability_token`. `system_prompt` / `prompt` are NOT carried in the
84    /// WS payload. The MainAI (WS Client) forwards the token to the SubAgent,
85    /// and the SubAgent hits `/v1/worker/prompt` + `/v1/worker/result` itself
86    /// with `Authorization: Bearer <capability_token>` — fetching prompt /
87    /// system and posting the result (= heavy payloads go over HTTP; WS stays
88    /// purely thin control).
89    ///
90    /// `capability_token` is `CapToken::encode()` form (= URL-safe base64 of
91    /// serde_json): a session token with `Role::Worker` + `["*"]` scopes + 600s
92    /// TTL. The HMAC sig is verified server-side by `verify_token_for_task` —
93    /// a self-contained capability token (= no server lookup required).
94    ///
95    /// `directive` (= immediate instruction for the MainAI; fix for observation #7):
96    /// Under thin-push discipline, if the payload were only routing fields, the
97    /// MainAI (a large LLM) would fire the drift "I have a token → I should
98    /// fetch it myself" / "I got the prompt → I should embed it literally into
99    /// the SubAgent" 100% of the time (= bias accumulation across 50–100 parallel
100    /// agents dulls decisions). To structurally remove this drift, a literal
101    /// instruction text — "launch a SubAgent, hand it the token + endpoint, and
102    /// let the SubAgent do the fetch / execution / post" — is explicitly embedded
103    /// into the payload (= implicit convention → literal statement).
104    ///
105    /// This field carries **natural-language text intended for the MainAI to read**
106    /// (= not a JSON schema target for parsing). See
107    /// `operator_ws::session::default_spawn_directive()` for the server-side
108    /// default text.
109    Spawn {
110        /// Correlation key the client must echo back in `ClientMsg::SpawnAck`.
111        req_id: String,
112        /// Reserved for nested-ask correlation; currently always `None`.
113        #[serde(skip_serializing_if = "Option::is_none")]
114        parent_req_id: Option<String>,
115        /// Task the delegated spawn belongs to.
116        task_id: String,
117        /// Agent ref the Operator is asked to execute.
118        agent: String,
119        /// 1-based dispatch attempt counter for this agent step.
120        attempt: u32,
121        /// `CapToken::encode()` form Bearer credential for the worker HTTP
122        /// endpoints (see the variant doc above for the thin-control contract).
123        capability_token: String,
124        /// Short handle (= `wh-XXXXXXXX`, 12 chars). An alternate Bearer path
125        /// paired with `capability_token`. When `/v1/worker/submit` receives a
126        /// handle in Bearer, the server resolves nonce → `task_id` via the
127        /// `worker_handles` map (the short-handle switchover — removes
128        /// base64 copy-paste accidents). SubAgents (mse-worker) should use
129        /// **this field** instead of `capability_token` as the recommended path.
130        #[serde(skip_serializing_if = "Option::is_none")]
131        worker_handle: Option<String>,
132        /// Literal natural-language instruction for the MainAI (see the
133        /// variant doc above for why this is embedded in the payload).
134        directive: String,
135    },
136}
137
138/// Client → server reply messages on `WS /v1/operators/:sid/ws`. Each variant
139/// resolves the pending oneshot registered under its `req_id`
140/// (see [`super::session::WSOperatorSession::resolve_pending`]).
141#[derive(Debug, Deserialize)]
142#[serde(tag = "type", rename_all = "snake_case")]
143pub enum ClientMsg {
144    /// Reply to `ServerMsg::Ask` (`SeniorBridge.ask` result).
145    Answer {
146        /// Correlation key copied from the originating `ServerMsg::Ask`.
147        req_id: String,
148        /// Answer payload returned to the engine middleware.
149        value: Value,
150    },
151    /// Ack for `SpawnHook.before`. `ok=false` rejects the spawn
152    /// (= `MainAIMiddleware` converts it into `SpawnError::RejectedByMiddleware`).
153    /// `reason` propagates as `Err(reason)`.
154    HookAck {
155        /// Correlation key copied from the originating `ServerMsg::HookBefore`.
156        req_id: String,
157        /// `true` allows the spawn; `false` rejects it.
158        ok: bool,
159        /// Optional rejection reason surfaced to the engine when `ok=false`.
160        #[serde(default)]
161        reason: Option<String>,
162    },
163    /// Ack for `Operator.execute` (Spawn). `value = WorkerResult.value`,
164    /// `ok = WorkerResult.ok`. When `error` is `Some`, the `Operator` returns
165    /// it as `WorkerError`.
166    ///
167    /// After the thin-path switch (= the thin-control axis): if the MainAI returns this ack
168    /// **after** the SubAgent has hit HTTP `/v1/worker/result`, the server-side
169    /// dispatch path can complete with both the `Final` in `output_tail` and
170    /// this ack's `value` aligned. Sending an empty JSON `{}` for `value` makes
171    /// the `task.last_result` written by the HTTP path (= `post_result`)
172    /// canonical (= the ack-side `value` is duplicate / informational).
173    SpawnAck {
174        /// Correlation key copied from the originating `ServerMsg::Spawn`.
175        req_id: String,
176        /// `WorkerResult.value` equivalent; empty `{}` defers to the HTTP-path
177        /// result (see the variant doc above).
178        #[serde(default)]
179        value: Value,
180        /// `WorkerResult.ok` equivalent; defaults to `true` when omitted.
181        #[serde(default = "default_ok_true")]
182        ok: bool,
183        /// When `Some`, the Operator surfaces it as a `WorkerError`.
184        #[serde(default)]
185        error: Option<String>,
186    },
187}
188
189/// Intermediate representation for the session's `req_id` ↔ oneshot reply
190/// channel. The resolved form of `ClientMsg` looked up on the session side by
191/// `req_id` (= runtime-only, not wire format).
192pub(super) enum PendingReply {
193    /// Answer (return `Value` of `SeniorBridge.ask`).
194    Answer(Value),
195    /// `hook_ack` (OK / NG for `before`).
196    HookAck { ok: bool, reason: Option<String> },
197    /// `spawn_ack` (return of `Operator.execute` = `WorkerResult`-equivalent).
198    SpawnAck {
199        value: Value,
200        ok: bool,
201        error: Option<String>,
202    },
203}