Skip to main content

smooth_operator_server/
protocol.rs

1//! Wire protocol — server→client event envelopes built to match `spec/`.
2//!
3//! Every constructor here produces a `serde_json::Value` whose field names and
4//! nesting match the JSON Schemas in `smooth-operator/spec/events/*.json` exactly,
5//! so the generated TS/Go/.NET/Python clients deserialize them unmodified.
6//!
7//! All events are serialized as a flat envelope with a `type` discriminator
8//! plus the per-event fields documented in `envelope.schema.json`'s
9//! `EventEnvelope` (`requestId`, `status`, `data`, `node`, `token`, `error`,
10//! `timestamp`).
11
12use serde_json::{json, Value};
13
14/// Current Unix epoch milliseconds (for the `timestamp` field).
15#[must_use]
16pub fn now_ms() -> i64 {
17    chrono::Utc::now().timestamp_millis()
18}
19
20/// `pong` — reply to a `ping`. Carries the server timestamp both at the top
21/// level and inside `data` (per `pong.schema.json`).
22#[must_use]
23pub fn pong(request_id: Option<&str>) -> Value {
24    let ts = now_ms();
25    let mut ev = json!({
26        "type": "pong",
27        "timestamp": ts,
28        "data": { "timestamp": ts },
29    });
30    set_request_id(&mut ev, request_id);
31    ev
32}
33
34/// `immediate_response` — synchronous ack. For non-streaming actions this also
35/// carries the full response payload in `data`.
36#[must_use]
37pub fn immediate_response(
38    request_id: Option<&str>,
39    status: i64,
40    message: &str,
41    data: Value,
42) -> Value {
43    let mut ev = json!({
44        "type": "immediate_response",
45        "status": status,
46        "message": message,
47        "data": data,
48        "timestamp": now_ms(),
49    });
50    set_request_id(&mut ev, request_id);
51    ev
52}
53
54/// `stream_token` — a single streamed LLM token. The token is mirrored both at
55/// the envelope level (`token`) and inside `data` (per `stream-token.schema.json`).
56#[must_use]
57pub fn stream_token(request_id: &str, token: &str) -> Value {
58    json!({
59        "type": "stream_token",
60        "requestId": request_id,
61        "token": token,
62        "data": { "requestId": request_id, "token": token },
63        "timestamp": now_ms(),
64    })
65}
66
67/// `stream_reasoning` — a single streamed *reasoning* token from a reasoning
68/// model's separate thinking channel. Shaped exactly like `stream_token`, but
69/// on a distinct `type` so clients render it as "thinking" and never fold it
70/// into the answer. Clients that don't know the type simply ignore it (the
71/// answer still streams via `stream_token`).
72#[must_use]
73pub fn stream_reasoning(request_id: &str, token: &str) -> Value {
74    json!({
75        "type": "stream_reasoning",
76        "requestId": request_id,
77        "token": token,
78        "data": { "requestId": request_id, "token": token },
79        "timestamp": now_ms(),
80    })
81}
82
83/// `stream_chunk` — a per-node state snapshot. `node` is mirrored at the
84/// envelope level and inside `data` (per `stream-chunk.schema.json`). `state`
85/// only carries safe-to-expose fields.
86#[must_use]
87pub fn stream_chunk(request_id: &str, node: &str, state: Value) -> Value {
88    json!({
89        "type": "stream_chunk",
90        "requestId": request_id,
91        "node": node,
92        "data": { "requestId": request_id, "node": node, "state": state },
93        "timestamp": now_ms(),
94    })
95}
96
97/// Per-turn token-accounting + cost, captured from the engine's terminal
98/// [`AgentEvent::Completed`](smooth_operator_core::AgentEvent::Completed) and
99/// surfaced on the `eventual_response` so clients can accumulate a live session
100/// cost. All fields are accumulated across every LLM call in the turn. `Copy` so
101/// it threads through the runner → handler → protocol by value.
102#[derive(Debug, Clone, Copy, Default, PartialEq)]
103pub struct TurnUsage {
104    /// Accumulated cost in USD for this turn (gateway-priced).
105    pub cost_usd: f64,
106    /// Accumulated prompt (input) tokens for this turn.
107    pub prompt_tokens: u64,
108    /// Accumulated completion (output) tokens for this turn.
109    pub completion_tokens: u64,
110}
111
112/// `eventual_response` — the terminal event of a streaming turn. The payload is
113/// double-nested (`data.data`) per `eventual-response.schema.json`.
114///
115/// `citations` are the sources that grounded the answer. They're attached to
116/// the inner `data.data.citations` array only when non-empty — absent otherwise,
117/// keeping the event back-compatible with clients that predate citations.
118///
119/// `usage`, when `Some`, attaches the turn's token-accounting + cost as a sibling
120/// `data.data.usage` object (`{ costUsd, promptTokens, completionTokens }`) so a
121/// client can accumulate live session cost. Absent when the engine reported no
122/// usage (e.g. an offline mock turn), keeping the event back-compatible with
123/// clients that predate cost reporting.
124#[must_use]
125pub fn eventual_response(
126    request_id: &str,
127    status: i64,
128    message_id: &str,
129    response: Value,
130    needs_escalation: bool,
131    citations: &[smooth_operator::domain::Citation],
132    usage: Option<TurnUsage>,
133) -> Value {
134    let mut inner = json!({
135        "messageId": message_id,
136        "response": response,
137        "needsEscalation": needs_escalation,
138    });
139    // Optional + back-compat: only emit `citations` when the turn had sources.
140    if !citations.is_empty() {
141        inner["citations"] = serde_json::to_value(citations).unwrap_or(Value::Null);
142    }
143    // Optional + back-compat: only emit `usage` when the engine reported it.
144    if let Some(usage) = usage {
145        inner["usage"] = json!({
146            "costUsd": usage.cost_usd,
147            "promptTokens": usage.prompt_tokens,
148            "completionTokens": usage.completion_tokens,
149        });
150    }
151    json!({
152        "type": "eventual_response",
153        "requestId": request_id,
154        "status": status,
155        "data": {
156            "requestId": request_id,
157            "status": status,
158            "data": inner,
159        },
160        "timestamp": now_ms(),
161    })
162}
163
164/// `write_confirmation_required` — emitted mid-turn when the agent calls a
165/// state-mutating tool that requires explicit human approval before it runs. The
166/// turn is **parked** (the agent loop blocks inside the core
167/// `ConfirmationHook::pre_call`, corresponding to
168/// `AgentEvent::HumanInputRequired { Confirm }`) until the client replies with a
169/// `confirm_tool_action` action carrying the same `requestId` and an `approved`
170/// boolean.
171///
172/// Wire shape matches `spec/events/write-confirmation-required.schema.json`
173/// exactly (the generated TS/Go/.NET/Python clients deserialize it unmodified):
174/// the `requestId` echoes the originating `send_message`, and the prompt detail
175/// is double-nested under `data.data.{ toolId, actionDescription }`. `tool_id` is
176/// an opaque correlation handle (the runner uses the tool name — a turn parks one
177/// tool at a time); `action_description` is the human-readable prompt the client
178/// renders in its confirmation dialog.
179#[must_use]
180pub fn write_confirmation_required(
181    request_id: &str,
182    tool_id: &str,
183    action_description: &str,
184) -> Value {
185    json!({
186        "type": "write_confirmation_required",
187        "requestId": request_id,
188        "data": {
189            "requestId": request_id,
190            "data": {
191                "toolId": tool_id,
192                "actionDescription": action_description,
193            },
194        },
195        "timestamp": now_ms(),
196    })
197}
198
199/// `otp_verification_required` — emitted after a turn's auth gate refused an
200/// `end_user` tool on an unverified session and the host has an OTP service
201/// installed. Tells the client to collect a one-time code. Wire shape matches
202/// `spec/events/otp-verification-required.schema.json` (double-nested
203/// `data.data`). `available_channels` are the delivery channels the server can
204/// offer given the session's known contacts (`email` / `sms`).
205#[must_use]
206pub fn otp_verification_required(
207    request_id: &str,
208    tool_id: &str,
209    action_description: &str,
210    available_channels: &[&str],
211    auth_level: &str,
212) -> Value {
213    json!({
214        "type": "otp_verification_required",
215        "requestId": request_id,
216        "data": {
217            "requestId": request_id,
218            "data": {
219                "toolId": tool_id,
220                "actionDescription": action_description,
221                "availableChannels": available_channels,
222                "authLevel": auth_level,
223            },
224        },
225        "timestamp": now_ms(),
226    })
227}
228
229/// `otp_sent` — acknowledgement that a code was dispatched to the caller. Wire
230/// shape matches `spec/events/otp-sent.schema.json`. `masked_destination` is a
231/// partially masked address safe to display (e.g. `j***@example.com`).
232#[must_use]
233pub fn otp_sent(request_id: &str, channel: &str, masked_destination: &str) -> Value {
234    json!({
235        "type": "otp_sent",
236        "requestId": request_id,
237        "data": {
238            "requestId": request_id,
239            "data": {
240                "channel": channel,
241                "maskedDestination": masked_destination,
242            },
243        },
244        "timestamp": now_ms(),
245    })
246}
247
248/// `otp_verified` — emitted when a `verify_otp` attempt succeeds. The session is
249/// now identity-verified; the client re-sends its message to run the gated tool.
250/// Wire shape matches `spec/events/otp-verified.schema.json`.
251#[must_use]
252pub fn otp_verified(request_id: &str, message: &str) -> Value {
253    json!({
254        "type": "otp_verified",
255        "requestId": request_id,
256        "data": {
257            "requestId": request_id,
258            "data": { "message": message },
259        },
260        "timestamp": now_ms(),
261    })
262}
263
264/// `otp_invalid` — emitted when a `verify_otp` attempt is rejected. `error` is an
265/// optional machine-readable reason (`INVALID_CODE` / `MAX_ATTEMPTS` /
266/// `NOT_FOUND` / `EXPIRED`); `attempts_remaining` of 0 means the code is locked
267/// and the client must restart the flow. Wire shape matches
268/// `spec/events/otp-invalid.schema.json`.
269#[must_use]
270pub fn otp_invalid(
271    request_id: &str,
272    error: Option<&str>,
273    attempts_remaining: u32,
274    message: &str,
275) -> Value {
276    let mut inner = json!({
277        "attemptsRemaining": attempts_remaining,
278        "message": message,
279    });
280    // Optional per spec: only emit `error` when the host determined a cause.
281    if let Some(err) = error {
282        inner["error"] = json!(err);
283    }
284    json!({
285        "type": "otp_invalid",
286        "requestId": request_id,
287        "data": {
288            "requestId": request_id,
289            "data": inner,
290        },
291        "timestamp": now_ms(),
292    })
293}
294
295/// `error` — an unrecoverable error. The `{ code, message }` descriptor is
296/// duplicated at the envelope level and nested under `data.error` for wire
297/// backward-compatibility (per `error.schema.json`).
298#[must_use]
299pub fn error(request_id: Option<&str>, code: &str, message: &str) -> Value {
300    let err = json!({ "code": code, "message": message });
301    let mut data = json!({ "error": err });
302    if let Some(rid) = request_id {
303        data["requestId"] = json!(rid);
304    }
305    let mut ev = json!({
306        "type": "error",
307        "error": err,
308        "data": data,
309        "timestamp": now_ms(),
310    });
311    set_request_id(&mut ev, request_id);
312    ev
313}
314
315fn set_request_id(ev: &mut Value, request_id: Option<&str>) {
316    if let Some(rid) = request_id {
317        ev["requestId"] = json!(rid);
318    }
319}
320
321#[cfg(test)]
322mod tests {
323    use super::*;
324
325    #[test]
326    fn pong_carries_timestamp_both_places() {
327        let ev = pong(Some("r1"));
328        assert_eq!(ev["type"], "pong");
329        assert_eq!(ev["requestId"], "r1");
330        assert!(ev["timestamp"].is_i64());
331        assert_eq!(ev["timestamp"], ev["data"]["timestamp"]);
332    }
333
334    #[test]
335    fn stream_token_mirrors_token() {
336        let ev = stream_token("r1", "Hel");
337        assert_eq!(ev["type"], "stream_token");
338        assert_eq!(ev["token"], "Hel");
339        assert_eq!(ev["data"]["token"], "Hel");
340        assert_eq!(ev["data"]["requestId"], "r1");
341    }
342
343    #[test]
344    fn stream_reasoning_is_distinct_type_but_mirrors_token() {
345        let ev = stream_reasoning("r1", "let me think");
346        // Distinct type so clients never fold it into the answer…
347        assert_eq!(ev["type"], "stream_reasoning");
348        // …but shaped exactly like stream_token so they render it the same way.
349        assert_eq!(ev["token"], "let me think");
350        assert_eq!(ev["data"]["token"], "let me think");
351        assert_eq!(ev["data"]["requestId"], "r1");
352    }
353
354    #[test]
355    fn stream_chunk_mirrors_node() {
356        let ev = stream_chunk("r1", "knowledge_search", json!({ "rawResponse": "x" }));
357        assert_eq!(ev["type"], "stream_chunk");
358        assert_eq!(ev["node"], "knowledge_search");
359        assert_eq!(ev["data"]["node"], "knowledge_search");
360        assert_eq!(ev["data"]["state"]["rawResponse"], "x");
361    }
362
363    #[test]
364    fn eventual_response_double_nests_payload() {
365        let ev = eventual_response(
366            "r1",
367            200,
368            "m1",
369            json!({"responseParts": ["hi"]}),
370            false,
371            &[],
372            None,
373        );
374        assert_eq!(ev["type"], "eventual_response");
375        assert_eq!(ev["status"], 200);
376        assert_eq!(ev["data"]["data"]["messageId"], "m1");
377        assert_eq!(ev["data"]["data"]["needsEscalation"], false);
378        assert_eq!(ev["data"]["data"]["response"]["responseParts"][0], "hi");
379    }
380
381    #[test]
382    fn eventual_response_omits_citations_when_empty() {
383        // Back-compat: no `citations` key at all when the turn had no sources.
384        let ev = eventual_response(
385            "r1",
386            200,
387            "m1",
388            json!({"responseParts": ["hi"]}),
389            false,
390            &[],
391            None,
392        );
393        assert!(
394            ev["data"]["data"].get("citations").is_none(),
395            "citations must be absent when empty for back-compat"
396        );
397    }
398
399    #[test]
400    fn eventual_response_omits_usage_when_none() {
401        // Back-compat: no `usage` key at all when the engine reported no cost.
402        let ev = eventual_response(
403            "r1",
404            200,
405            "m1",
406            json!({"responseParts": ["hi"]}),
407            false,
408            &[],
409            None,
410        );
411        assert!(
412            ev["data"]["data"].get("usage").is_none(),
413            "usage must be absent when None for back-compat"
414        );
415    }
416
417    #[test]
418    fn eventual_response_attaches_usage_when_present() {
419        let usage = TurnUsage {
420            cost_usd: 0.0123,
421            prompt_tokens: 1500,
422            completion_tokens: 42,
423        };
424        let ev = eventual_response(
425            "r1",
426            200,
427            "m1",
428            json!({"responseParts": ["hi"]}),
429            false,
430            &[],
431            Some(usage),
432        );
433        let u = &ev["data"]["data"]["usage"];
434        assert!(
435            u.is_object(),
436            "usage should be a sibling object under data.data"
437        );
438        let cost = u["costUsd"].as_f64().expect("costUsd is a number");
439        assert!((cost - 0.0123).abs() < 1e-9, "costUsd should round-trip");
440        assert_eq!(u["promptTokens"], 1500);
441        assert_eq!(u["completionTokens"], 42);
442    }
443
444    #[test]
445    fn eventual_response_attaches_citations_when_present() {
446        let citations = vec![
447            smooth_operator::domain::Citation {
448                id: "doc-1".into(),
449                title: "acme/handbook@main#wildlife/quokka.md".into(),
450                url: Some("https://github.com/acme/handbook/blob/main/wildlife/quokka.md".into()),
451                snippet: "Quokkas are the friendliest marsupial.".into(),
452                score: 0.91,
453            },
454            smooth_operator::domain::Citation {
455                id: "doc-2".into(),
456                title: "policies/shipping.md".into(),
457                url: None,
458                snippet: "Standard shipping takes 5 to 7 business days.".into(),
459                score: 0.42,
460            },
461        ];
462        let ev = eventual_response(
463            "r1",
464            200,
465            "m1",
466            json!({"responseParts": ["hi"]}),
467            false,
468            &citations,
469            None,
470        );
471        let cites = &ev["data"]["data"]["citations"];
472        assert!(cites.is_array(), "citations should be an array");
473        assert_eq!(cites.as_array().unwrap().len(), 2);
474        // GitHub-sourced citation carries id + url + snippet on the wire shape.
475        assert_eq!(cites[0]["id"], "doc-1");
476        assert_eq!(
477            cites[0]["url"],
478            "https://github.com/acme/handbook/blob/main/wildlife/quokka.md"
479        );
480        assert_eq!(
481            cites[0]["snippet"],
482            "Quokkas are the friendliest marsupial."
483        );
484        // score is an f32 widened to f64 on the wire, so compare with tolerance.
485        let score = cites[0]["score"].as_f64().expect("score is a number");
486        assert!(
487            (score - 0.91).abs() < 1e-4,
488            "score should round-trip ~0.91, got {score}"
489        );
490        // url is omitted (not null) for a source with no web location.
491        assert!(
492            cites[1].get("url").is_none(),
493            "a urless citation should omit `url`, not emit null"
494        );
495        assert_eq!(cites[1]["id"], "doc-2");
496    }
497
498    #[test]
499    fn write_confirmation_required_matches_spec_shape() {
500        let ev = write_confirmation_required(
501            "r1",
502            "delete_record",
503            "Tool 'delete_record' requires confirmation. Allow?",
504        );
505        // Per spec/events/write-confirmation-required.schema.json.
506        assert_eq!(ev["type"], "write_confirmation_required");
507        assert_eq!(ev["requestId"], "r1");
508        assert_eq!(ev["data"]["requestId"], "r1");
509        let inner = &ev["data"]["data"];
510        assert_eq!(inner["toolId"], "delete_record");
511        assert!(inner["actionDescription"]
512            .as_str()
513            .unwrap()
514            .contains("delete_record"));
515        assert!(ev["timestamp"].is_i64());
516    }
517
518    #[test]
519    fn otp_verification_required_matches_spec_shape() {
520        let ev = otp_verification_required(
521            "r1",
522            "pay_invoice",
523            "Verify your identity to use pay_invoice.",
524            &["email"],
525            "end_user",
526        );
527        assert_eq!(ev["type"], "otp_verification_required");
528        assert_eq!(ev["requestId"], "r1");
529        assert_eq!(ev["data"]["requestId"], "r1");
530        let inner = &ev["data"]["data"];
531        assert_eq!(inner["toolId"], "pay_invoice");
532        assert_eq!(inner["authLevel"], "end_user");
533        assert_eq!(inner["availableChannels"][0], "email");
534        assert!(inner["actionDescription"]
535            .as_str()
536            .unwrap()
537            .contains("pay_invoice"));
538        assert!(ev["timestamp"].is_i64());
539    }
540
541    #[test]
542    fn otp_sent_matches_spec_shape() {
543        let ev = otp_sent("r1", "email", "j***@example.com");
544        assert_eq!(ev["type"], "otp_sent");
545        assert_eq!(ev["requestId"], "r1");
546        assert_eq!(ev["data"]["data"]["channel"], "email");
547        assert_eq!(ev["data"]["data"]["maskedDestination"], "j***@example.com");
548    }
549
550    #[test]
551    fn otp_verified_matches_spec_shape() {
552        let ev = otp_verified("r1", "Identity verified successfully.");
553        assert_eq!(ev["type"], "otp_verified");
554        assert_eq!(ev["data"]["requestId"], "r1");
555        assert_eq!(
556            ev["data"]["data"]["message"],
557            "Identity verified successfully."
558        );
559    }
560
561    #[test]
562    fn otp_invalid_carries_error_and_attempts() {
563        let ev = otp_invalid(
564            "r1",
565            Some("INVALID_CODE"),
566            2,
567            "Invalid code. 2 attempt(s) remaining.",
568        );
569        assert_eq!(ev["type"], "otp_invalid");
570        let inner = &ev["data"]["data"];
571        assert_eq!(inner["error"], "INVALID_CODE");
572        assert_eq!(inner["attemptsRemaining"], 2);
573        assert!(inner["message"].as_str().unwrap().contains("remaining"));
574    }
575
576    #[test]
577    fn otp_invalid_omits_error_when_none() {
578        // Optional per spec: no `error` key when the host couldn't determine a cause.
579        let ev = otp_invalid("r1", None, 0, "Verification failed.");
580        assert!(
581            ev["data"]["data"].get("error").is_none(),
582            "error must be absent when None"
583        );
584        assert_eq!(ev["data"]["data"]["attemptsRemaining"], 0);
585    }
586
587    #[test]
588    fn error_duplicates_descriptor() {
589        let ev = error(Some("r1"), "VALIDATION_ERROR", "bad");
590        assert_eq!(ev["type"], "error");
591        assert_eq!(ev["error"]["code"], "VALIDATION_ERROR");
592        assert_eq!(ev["data"]["error"]["message"], "bad");
593        assert_eq!(ev["data"]["requestId"], "r1");
594    }
595
596    #[test]
597    fn immediate_response_carries_data() {
598        let ev = immediate_response(Some("r1"), 200, "ok", json!({"sessionId": "s1"}));
599        assert_eq!(ev["type"], "immediate_response");
600        assert_eq!(ev["status"], 200);
601        assert_eq!(ev["data"]["sessionId"], "s1");
602    }
603}