Skip to main content

smooth_operator_server/
protocol.rs

1//! Wire protocol — server→client event envelopes built to match `spec/`.
2//!
3//! Every constructor here produces a `serde_json::Value` whose field names and
4//! nesting match the JSON Schemas in `smooth-operator/spec/events/*.json` exactly,
5//! so the generated TS/Go/.NET/Python clients deserialize them unmodified.
6//!
7//! All events are serialized as a flat envelope with a `type` discriminator
8//! plus the per-event fields documented in `envelope.schema.json`'s
9//! `EventEnvelope` (`requestId`, `status`, `data`, `node`, `token`, `error`,
10//! `timestamp`).
11
12use serde_json::{json, Value};
13
14/// Current Unix epoch milliseconds (for the `timestamp` field).
15#[must_use]
16pub fn now_ms() -> i64 {
17    chrono::Utc::now().timestamp_millis()
18}
19
20/// `pong` — reply to a `ping`. Carries the server timestamp both at the top
21/// level and inside `data` (per `pong.schema.json`).
22#[must_use]
23pub fn pong(request_id: Option<&str>) -> Value {
24    let ts = now_ms();
25    let mut ev = json!({
26        "type": "pong",
27        "timestamp": ts,
28        "data": { "timestamp": ts },
29    });
30    set_request_id(&mut ev, request_id);
31    ev
32}
33
34/// `immediate_response` — synchronous ack. For non-streaming actions this also
35/// carries the full response payload in `data`.
36#[must_use]
37pub fn immediate_response(
38    request_id: Option<&str>,
39    status: i64,
40    message: &str,
41    data: Value,
42) -> Value {
43    let mut ev = json!({
44        "type": "immediate_response",
45        "status": status,
46        "message": message,
47        "data": data,
48        "timestamp": now_ms(),
49    });
50    set_request_id(&mut ev, request_id);
51    ev
52}
53
54/// `stream_token` — a single streamed LLM token. The token is mirrored both at
55/// the envelope level (`token`) and inside `data` (per `stream-token.schema.json`).
56#[must_use]
57pub fn stream_token(request_id: &str, token: &str) -> Value {
58    json!({
59        "type": "stream_token",
60        "requestId": request_id,
61        "token": token,
62        "data": { "requestId": request_id, "token": token },
63        "timestamp": now_ms(),
64    })
65}
66
67/// `stream_chunk` — a per-node state snapshot. `node` is mirrored at the
68/// envelope level and inside `data` (per `stream-chunk.schema.json`). `state`
69/// only carries safe-to-expose fields.
70#[must_use]
71pub fn stream_chunk(request_id: &str, node: &str, state: Value) -> Value {
72    json!({
73        "type": "stream_chunk",
74        "requestId": request_id,
75        "node": node,
76        "data": { "requestId": request_id, "node": node, "state": state },
77        "timestamp": now_ms(),
78    })
79}
80
81/// `eventual_response` — the terminal event of a streaming turn. The payload is
82/// double-nested (`data.data`) per `eventual-response.schema.json`.
83///
84/// `citations` are the sources that grounded the answer. They're attached to
85/// the inner `data.data.citations` array only when non-empty — absent otherwise,
86/// keeping the event back-compatible with clients that predate citations.
87#[must_use]
88pub fn eventual_response(
89    request_id: &str,
90    status: i64,
91    message_id: &str,
92    response: Value,
93    needs_escalation: bool,
94    citations: &[smooth_operator::domain::Citation],
95) -> Value {
96    let mut inner = json!({
97        "messageId": message_id,
98        "response": response,
99        "needsEscalation": needs_escalation,
100    });
101    // Optional + back-compat: only emit `citations` when the turn had sources.
102    if !citations.is_empty() {
103        inner["citations"] = serde_json::to_value(citations).unwrap_or(Value::Null);
104    }
105    json!({
106        "type": "eventual_response",
107        "requestId": request_id,
108        "status": status,
109        "data": {
110            "requestId": request_id,
111            "status": status,
112            "data": inner,
113        },
114        "timestamp": now_ms(),
115    })
116}
117
118/// `write_confirmation_required` — emitted mid-turn when the agent calls a
119/// state-mutating tool that requires explicit human approval before it runs. The
120/// turn is **parked** (the agent loop blocks inside the core
121/// `ConfirmationHook::pre_call`, corresponding to
122/// `AgentEvent::HumanInputRequired { Confirm }`) until the client replies with a
123/// `confirm_tool_action` action carrying the same `requestId` and an `approved`
124/// boolean.
125///
126/// Wire shape matches `spec/events/write-confirmation-required.schema.json`
127/// exactly (the generated TS/Go/.NET/Python clients deserialize it unmodified):
128/// the `requestId` echoes the originating `send_message`, and the prompt detail
129/// is double-nested under `data.data.{ toolId, actionDescription }`. `tool_id` is
130/// an opaque correlation handle (the runner uses the tool name — a turn parks one
131/// tool at a time); `action_description` is the human-readable prompt the client
132/// renders in its confirmation dialog.
133#[must_use]
134pub fn write_confirmation_required(
135    request_id: &str,
136    tool_id: &str,
137    action_description: &str,
138) -> Value {
139    json!({
140        "type": "write_confirmation_required",
141        "requestId": request_id,
142        "data": {
143            "requestId": request_id,
144            "data": {
145                "toolId": tool_id,
146                "actionDescription": action_description,
147            },
148        },
149        "timestamp": now_ms(),
150    })
151}
152
153/// `error` — an unrecoverable error. The `{ code, message }` descriptor is
154/// duplicated at the envelope level and nested under `data.error` for wire
155/// backward-compatibility (per `error.schema.json`).
156#[must_use]
157pub fn error(request_id: Option<&str>, code: &str, message: &str) -> Value {
158    let err = json!({ "code": code, "message": message });
159    let mut data = json!({ "error": err });
160    if let Some(rid) = request_id {
161        data["requestId"] = json!(rid);
162    }
163    let mut ev = json!({
164        "type": "error",
165        "error": err,
166        "data": data,
167        "timestamp": now_ms(),
168    });
169    set_request_id(&mut ev, request_id);
170    ev
171}
172
173fn set_request_id(ev: &mut Value, request_id: Option<&str>) {
174    if let Some(rid) = request_id {
175        ev["requestId"] = json!(rid);
176    }
177}
178
179#[cfg(test)]
180mod tests {
181    use super::*;
182
183    #[test]
184    fn pong_carries_timestamp_both_places() {
185        let ev = pong(Some("r1"));
186        assert_eq!(ev["type"], "pong");
187        assert_eq!(ev["requestId"], "r1");
188        assert!(ev["timestamp"].is_i64());
189        assert_eq!(ev["timestamp"], ev["data"]["timestamp"]);
190    }
191
192    #[test]
193    fn stream_token_mirrors_token() {
194        let ev = stream_token("r1", "Hel");
195        assert_eq!(ev["type"], "stream_token");
196        assert_eq!(ev["token"], "Hel");
197        assert_eq!(ev["data"]["token"], "Hel");
198        assert_eq!(ev["data"]["requestId"], "r1");
199    }
200
201    #[test]
202    fn stream_chunk_mirrors_node() {
203        let ev = stream_chunk("r1", "knowledge_search", json!({ "rawResponse": "x" }));
204        assert_eq!(ev["type"], "stream_chunk");
205        assert_eq!(ev["node"], "knowledge_search");
206        assert_eq!(ev["data"]["node"], "knowledge_search");
207        assert_eq!(ev["data"]["state"]["rawResponse"], "x");
208    }
209
210    #[test]
211    fn eventual_response_double_nests_payload() {
212        let ev = eventual_response(
213            "r1",
214            200,
215            "m1",
216            json!({"responseParts": ["hi"]}),
217            false,
218            &[],
219        );
220        assert_eq!(ev["type"], "eventual_response");
221        assert_eq!(ev["status"], 200);
222        assert_eq!(ev["data"]["data"]["messageId"], "m1");
223        assert_eq!(ev["data"]["data"]["needsEscalation"], false);
224        assert_eq!(ev["data"]["data"]["response"]["responseParts"][0], "hi");
225    }
226
227    #[test]
228    fn eventual_response_omits_citations_when_empty() {
229        // Back-compat: no `citations` key at all when the turn had no sources.
230        let ev = eventual_response(
231            "r1",
232            200,
233            "m1",
234            json!({"responseParts": ["hi"]}),
235            false,
236            &[],
237        );
238        assert!(
239            ev["data"]["data"].get("citations").is_none(),
240            "citations must be absent when empty for back-compat"
241        );
242    }
243
244    #[test]
245    fn eventual_response_attaches_citations_when_present() {
246        let citations = vec![
247            smooth_operator::domain::Citation {
248                id: "doc-1".into(),
249                title: "acme/handbook@main#wildlife/quokka.md".into(),
250                url: Some("https://github.com/acme/handbook/blob/main/wildlife/quokka.md".into()),
251                snippet: "Quokkas are the friendliest marsupial.".into(),
252                score: 0.91,
253            },
254            smooth_operator::domain::Citation {
255                id: "doc-2".into(),
256                title: "policies/shipping.md".into(),
257                url: None,
258                snippet: "Standard shipping takes 5 to 7 business days.".into(),
259                score: 0.42,
260            },
261        ];
262        let ev = eventual_response(
263            "r1",
264            200,
265            "m1",
266            json!({"responseParts": ["hi"]}),
267            false,
268            &citations,
269        );
270        let cites = &ev["data"]["data"]["citations"];
271        assert!(cites.is_array(), "citations should be an array");
272        assert_eq!(cites.as_array().unwrap().len(), 2);
273        // GitHub-sourced citation carries id + url + snippet on the wire shape.
274        assert_eq!(cites[0]["id"], "doc-1");
275        assert_eq!(
276            cites[0]["url"],
277            "https://github.com/acme/handbook/blob/main/wildlife/quokka.md"
278        );
279        assert_eq!(
280            cites[0]["snippet"],
281            "Quokkas are the friendliest marsupial."
282        );
283        // score is an f32 widened to f64 on the wire, so compare with tolerance.
284        let score = cites[0]["score"].as_f64().expect("score is a number");
285        assert!(
286            (score - 0.91).abs() < 1e-4,
287            "score should round-trip ~0.91, got {score}"
288        );
289        // url is omitted (not null) for a source with no web location.
290        assert!(
291            cites[1].get("url").is_none(),
292            "a urless citation should omit `url`, not emit null"
293        );
294        assert_eq!(cites[1]["id"], "doc-2");
295    }
296
297    #[test]
298    fn write_confirmation_required_matches_spec_shape() {
299        let ev = write_confirmation_required(
300            "r1",
301            "delete_record",
302            "Tool 'delete_record' requires confirmation. Allow?",
303        );
304        // Per spec/events/write-confirmation-required.schema.json.
305        assert_eq!(ev["type"], "write_confirmation_required");
306        assert_eq!(ev["requestId"], "r1");
307        assert_eq!(ev["data"]["requestId"], "r1");
308        let inner = &ev["data"]["data"];
309        assert_eq!(inner["toolId"], "delete_record");
310        assert!(inner["actionDescription"]
311            .as_str()
312            .unwrap()
313            .contains("delete_record"));
314        assert!(ev["timestamp"].is_i64());
315    }
316
317    #[test]
318    fn error_duplicates_descriptor() {
319        let ev = error(Some("r1"), "VALIDATION_ERROR", "bad");
320        assert_eq!(ev["type"], "error");
321        assert_eq!(ev["error"]["code"], "VALIDATION_ERROR");
322        assert_eq!(ev["data"]["error"]["message"], "bad");
323        assert_eq!(ev["data"]["requestId"], "r1");
324    }
325
326    #[test]
327    fn immediate_response_carries_data() {
328        let ev = immediate_response(Some("r1"), 200, "ok", json!({"sessionId": "s1"}));
329        assert_eq!(ev["type"], "immediate_response");
330        assert_eq!(ev["status"], 200);
331        assert_eq!(ev["data"]["sessionId"], "s1");
332    }
333}