Skip to main content

smooth_operator_server/
protocol.rs

1//! Wire protocol — server→client event envelopes built to match `spec/`.
2//!
3//! Every constructor here produces a `serde_json::Value` whose field names and
4//! nesting match the JSON Schemas in `smooth-operator/spec/events/*.json` exactly,
5//! so the generated TS/Go/.NET/Python clients deserialize them unmodified.
6//!
7//! All events are serialized as a flat envelope with a `type` discriminator
8//! plus the per-event fields documented in `envelope.schema.json`'s
9//! `EventEnvelope` (`requestId`, `status`, `data`, `node`, `token`, `error`,
10//! `timestamp`).
11
12use serde_json::{json, Value};
13
14/// Current Unix epoch milliseconds (for the `timestamp` field).
15#[must_use]
16pub fn now_ms() -> i64 {
17    chrono::Utc::now().timestamp_millis()
18}
19
20/// `pong` — reply to a `ping`. Carries the server timestamp both at the top
21/// level and inside `data` (per `pong.schema.json`).
22#[must_use]
23pub fn pong(request_id: Option<&str>) -> Value {
24    let ts = now_ms();
25    let mut ev = json!({
26        "type": "pong",
27        "timestamp": ts,
28        "data": { "timestamp": ts },
29    });
30    set_request_id(&mut ev, request_id);
31    ev
32}
33
34/// `immediate_response` — synchronous ack. For non-streaming actions this also
35/// carries the full response payload in `data`.
36#[must_use]
37pub fn immediate_response(
38    request_id: Option<&str>,
39    status: i64,
40    message: &str,
41    data: Value,
42) -> Value {
43    let mut ev = json!({
44        "type": "immediate_response",
45        "status": status,
46        "message": message,
47        "data": data,
48        "timestamp": now_ms(),
49    });
50    set_request_id(&mut ev, request_id);
51    ev
52}
53
54/// `stream_token` — a single streamed LLM token. The token is mirrored both at
55/// the envelope level (`token`) and inside `data` (per `stream-token.schema.json`).
56#[must_use]
57pub fn stream_token(request_id: &str, token: &str) -> Value {
58    json!({
59        "type": "stream_token",
60        "requestId": request_id,
61        "token": token,
62        "data": { "requestId": request_id, "token": token },
63        "timestamp": now_ms(),
64    })
65}
66
67/// `stream_chunk` — a per-node state snapshot. `node` is mirrored at the
68/// envelope level and inside `data` (per `stream-chunk.schema.json`). `state`
69/// only carries safe-to-expose fields.
70#[must_use]
71pub fn stream_chunk(request_id: &str, node: &str, state: Value) -> Value {
72    json!({
73        "type": "stream_chunk",
74        "requestId": request_id,
75        "node": node,
76        "data": { "requestId": request_id, "node": node, "state": state },
77        "timestamp": now_ms(),
78    })
79}
80
81/// `eventual_response` — the terminal event of a streaming turn. The payload is
82/// double-nested (`data.data`) per `eventual-response.schema.json`.
83///
84/// `citations` are the sources that grounded the answer. They're attached to
85/// the inner `data.data.citations` array only when non-empty — absent otherwise,
86/// keeping the event back-compatible with clients that predate citations.
87#[must_use]
88pub fn eventual_response(
89    request_id: &str,
90    status: i64,
91    message_id: &str,
92    response: Value,
93    needs_escalation: bool,
94    citations: &[smooth_operator::domain::Citation],
95) -> Value {
96    let mut inner = json!({
97        "messageId": message_id,
98        "response": response,
99        "needsEscalation": needs_escalation,
100    });
101    // Optional + back-compat: only emit `citations` when the turn had sources.
102    if !citations.is_empty() {
103        inner["citations"] = serde_json::to_value(citations).unwrap_or(Value::Null);
104    }
105    json!({
106        "type": "eventual_response",
107        "requestId": request_id,
108        "status": status,
109        "data": {
110            "requestId": request_id,
111            "status": status,
112            "data": inner,
113        },
114        "timestamp": now_ms(),
115    })
116}
117
118/// `error` — an unrecoverable error. The `{ code, message }` descriptor is
119/// duplicated at the envelope level and nested under `data.error` for wire
120/// backward-compatibility (per `error.schema.json`).
121#[must_use]
122pub fn error(request_id: Option<&str>, code: &str, message: &str) -> Value {
123    let err = json!({ "code": code, "message": message });
124    let mut data = json!({ "error": err });
125    if let Some(rid) = request_id {
126        data["requestId"] = json!(rid);
127    }
128    let mut ev = json!({
129        "type": "error",
130        "error": err,
131        "data": data,
132        "timestamp": now_ms(),
133    });
134    set_request_id(&mut ev, request_id);
135    ev
136}
137
138fn set_request_id(ev: &mut Value, request_id: Option<&str>) {
139    if let Some(rid) = request_id {
140        ev["requestId"] = json!(rid);
141    }
142}
143
144#[cfg(test)]
145mod tests {
146    use super::*;
147
148    #[test]
149    fn pong_carries_timestamp_both_places() {
150        let ev = pong(Some("r1"));
151        assert_eq!(ev["type"], "pong");
152        assert_eq!(ev["requestId"], "r1");
153        assert!(ev["timestamp"].is_i64());
154        assert_eq!(ev["timestamp"], ev["data"]["timestamp"]);
155    }
156
157    #[test]
158    fn stream_token_mirrors_token() {
159        let ev = stream_token("r1", "Hel");
160        assert_eq!(ev["type"], "stream_token");
161        assert_eq!(ev["token"], "Hel");
162        assert_eq!(ev["data"]["token"], "Hel");
163        assert_eq!(ev["data"]["requestId"], "r1");
164    }
165
166    #[test]
167    fn stream_chunk_mirrors_node() {
168        let ev = stream_chunk("r1", "knowledge_search", json!({ "rawResponse": "x" }));
169        assert_eq!(ev["type"], "stream_chunk");
170        assert_eq!(ev["node"], "knowledge_search");
171        assert_eq!(ev["data"]["node"], "knowledge_search");
172        assert_eq!(ev["data"]["state"]["rawResponse"], "x");
173    }
174
175    #[test]
176    fn eventual_response_double_nests_payload() {
177        let ev = eventual_response(
178            "r1",
179            200,
180            "m1",
181            json!({"responseParts": ["hi"]}),
182            false,
183            &[],
184        );
185        assert_eq!(ev["type"], "eventual_response");
186        assert_eq!(ev["status"], 200);
187        assert_eq!(ev["data"]["data"]["messageId"], "m1");
188        assert_eq!(ev["data"]["data"]["needsEscalation"], false);
189        assert_eq!(ev["data"]["data"]["response"]["responseParts"][0], "hi");
190    }
191
192    #[test]
193    fn eventual_response_omits_citations_when_empty() {
194        // Back-compat: no `citations` key at all when the turn had no sources.
195        let ev = eventual_response(
196            "r1",
197            200,
198            "m1",
199            json!({"responseParts": ["hi"]}),
200            false,
201            &[],
202        );
203        assert!(
204            ev["data"]["data"].get("citations").is_none(),
205            "citations must be absent when empty for back-compat"
206        );
207    }
208
209    #[test]
210    fn eventual_response_attaches_citations_when_present() {
211        let citations = vec![
212            smooth_operator::domain::Citation {
213                id: "doc-1".into(),
214                title: "acme/handbook@main#wildlife/quokka.md".into(),
215                url: Some("https://github.com/acme/handbook/blob/main/wildlife/quokka.md".into()),
216                snippet: "Quokkas are the friendliest marsupial.".into(),
217                score: 0.91,
218            },
219            smooth_operator::domain::Citation {
220                id: "doc-2".into(),
221                title: "policies/shipping.md".into(),
222                url: None,
223                snippet: "Standard shipping takes 5 to 7 business days.".into(),
224                score: 0.42,
225            },
226        ];
227        let ev = eventual_response(
228            "r1",
229            200,
230            "m1",
231            json!({"responseParts": ["hi"]}),
232            false,
233            &citations,
234        );
235        let cites = &ev["data"]["data"]["citations"];
236        assert!(cites.is_array(), "citations should be an array");
237        assert_eq!(cites.as_array().unwrap().len(), 2);
238        // GitHub-sourced citation carries id + url + snippet on the wire shape.
239        assert_eq!(cites[0]["id"], "doc-1");
240        assert_eq!(
241            cites[0]["url"],
242            "https://github.com/acme/handbook/blob/main/wildlife/quokka.md"
243        );
244        assert_eq!(
245            cites[0]["snippet"],
246            "Quokkas are the friendliest marsupial."
247        );
248        // score is an f32 widened to f64 on the wire, so compare with tolerance.
249        let score = cites[0]["score"].as_f64().expect("score is a number");
250        assert!(
251            (score - 0.91).abs() < 1e-4,
252            "score should round-trip ~0.91, got {score}"
253        );
254        // url is omitted (not null) for a source with no web location.
255        assert!(
256            cites[1].get("url").is_none(),
257            "a urless citation should omit `url`, not emit null"
258        );
259        assert_eq!(cites[1]["id"], "doc-2");
260    }
261
262    #[test]
263    fn error_duplicates_descriptor() {
264        let ev = error(Some("r1"), "VALIDATION_ERROR", "bad");
265        assert_eq!(ev["type"], "error");
266        assert_eq!(ev["error"]["code"], "VALIDATION_ERROR");
267        assert_eq!(ev["data"]["error"]["message"], "bad");
268        assert_eq!(ev["data"]["requestId"], "r1");
269    }
270
271    #[test]
272    fn immediate_response_carries_data() {
273        let ev = immediate_response(Some("r1"), 200, "ok", json!({"sessionId": "s1"}));
274        assert_eq!(ev["type"], "immediate_response");
275        assert_eq!(ev["status"], 200);
276        assert_eq!(ev["data"]["sessionId"], "s1");
277    }
278}