Skip to main content

coralstack_cmd_ipc/
message.rs

1//! Wire protocol for the command registry.
2//!
3//! This module defines the seven [`Message`] variants exchanged between
4//! registries. The JSON representation is byte-identical to the TypeScript
5//! implementation's `CommandMessage` union (see
6//! `packages/cmd-ipc/src/registry/command-message-schemas.ts`) so that a
7//! Rust process and a Node.js process can talk to each other over any
8//! channel that carries JSON.
9
10use std::collections::BTreeMap;
11
12use serde::{Deserialize, Deserializer, Serialize, Serializer};
13use serde_json::Value;
14use uuid::Uuid;
15
16use crate::error::{ExecuteErrorCode, RegisterErrorCode};
17
18/// Transport-level envelope for cross-cutting context that travels
19/// alongside (not inside) a message's business payload.
20///
21/// Modelled on HTTP headers: transport concerns only. Do NOT put
22/// business data, secrets, or domain identifiers here.
23///
24/// Reserved keys in v1 are the W3C Trace Context headers
25/// (`traceparent`, `tracestate`, `baggage`); wire-compatible with every
26/// OpenTelemetry SDK. All other keys land in `extra` via
27/// `#[serde(flatten)]` and round-trip untouched.
28#[derive(Debug, Clone, Default, PartialEq, Serialize, Deserialize)]
29#[serde(default)]
30pub struct MessageMeta {
31    #[serde(skip_serializing_if = "Option::is_none")]
32    pub traceparent: Option<String>,
33    #[serde(skip_serializing_if = "Option::is_none")]
34    pub tracestate: Option<String>,
35    #[serde(skip_serializing_if = "Option::is_none")]
36    pub baggage: Option<String>,
37
38    /// Free-form extension fields. Applications using this library
39    /// serialize their own keys here and are responsible for their own
40    /// typing.
41    #[serde(flatten)]
42    pub extra: BTreeMap<String, Value>,
43}
44
45/// A unique message identifier.
46///
47/// Matches the TypeScript type alias `MessageID = string` — UUIDs are
48/// serialized as hyphenated strings.
49pub type MessageId = Uuid;
50
51/// Description of a command as advertised across a channel.
52///
53/// Mirrors `CommandDefinitionBaseSchema` in the TypeScript protocol.
54#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)]
55pub struct CommandDef {
56    pub id: String,
57    #[serde(default, skip_serializing_if = "Option::is_none")]
58    pub description: Option<String>,
59    #[serde(default, skip_serializing_if = "Option::is_none")]
60    pub schema: Option<CommandSchema>,
61}
62
63/// A request/response JSON-Schema pair attached to a [`CommandDef`].
64///
65/// Both slots are optional; absent means "no payload expected" (the
66/// TypeScript equivalent is `v.void()`). When populated the value is a
67/// raw JSON Schema; the registry normalizes incoming schemas so wire
68/// representations remain language-agnostic.
69#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)]
70pub struct CommandSchema {
71    #[serde(default, skip_serializing_if = "Option::is_none")]
72    pub request: Option<Value>,
73    #[serde(default, skip_serializing_if = "Option::is_none")]
74    pub response: Option<Value>,
75}
76
77impl CommandSchema {
78    /// Empty schema — both slots unset. Equivalent to advertising a
79    /// command with `request: None, response: None` (the void/void
80    /// shape). Use when the command takes no payload and returns none.
81    pub fn empty() -> Self {
82        Self {
83            request: None,
84            response: None,
85        }
86    }
87
88    /// Maximally permissive schema — both request and response declared
89    /// as open objects with `additionalProperties: true`. Useful for
90    /// runtime plugins whose payload shape isn't known at advertise
91    /// time (e.g. Flow's QuickJS `SourceChannel` when a plugin exports
92    /// an `any → any` function).
93    ///
94    /// Prefer a real schema when you can produce one — consumers use it
95    /// for validation, MCP tool schemas, and generated TS clients.
96    pub fn permissive() -> Self {
97        Self {
98            request: Some(serde_json::json!({
99                "type": "object",
100                "additionalProperties": true,
101            })),
102            response: Some(serde_json::json!({
103                "type": "object",
104                "additionalProperties": true,
105            })),
106        }
107    }
108
109    /// Builder: set only the request schema (leaves response unset).
110    pub fn with_request(mut self, schema: Value) -> Self {
111        self.request = Some(schema);
112        self
113    }
114
115    /// Builder: set only the response schema (leaves request unset).
116    pub fn with_response(mut self, schema: Value) -> Self {
117        self.response = Some(schema);
118        self
119    }
120}
121
122/// Body of an execute-command error response.
123#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)]
124pub struct ExecuteError {
125    pub code: ExecuteErrorCode,
126    pub message: String,
127}
128
129/// Zero-sized marker that (de)serializes as the JSON literal `true`.
130///
131/// Used to distinguish the `ok: true` / `ok: false` variants of
132/// [`RegisterResult`] and [`ExecuteResult`] while keeping them as proper
133/// Rust enums rather than structs with nullable fields.
134#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
135pub struct True;
136
137impl Serialize for True {
138    fn serialize<S: Serializer>(&self, s: S) -> Result<S::Ok, S::Error> {
139        s.serialize_bool(true)
140    }
141}
142
143impl<'de> Deserialize<'de> for True {
144    fn deserialize<D: Deserializer<'de>>(d: D) -> Result<Self, D::Error> {
145        match bool::deserialize(d)? {
146            true => Ok(True),
147            false => Err(serde::de::Error::custom("expected literal `true`")),
148        }
149    }
150}
151
152/// Zero-sized marker that (de)serializes as the JSON literal `false`.
153#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
154pub struct False;
155
156impl Serialize for False {
157    fn serialize<S: Serializer>(&self, s: S) -> Result<S::Ok, S::Error> {
158        s.serialize_bool(false)
159    }
160}
161
162impl<'de> Deserialize<'de> for False {
163    fn deserialize<D: Deserializer<'de>>(d: D) -> Result<Self, D::Error> {
164        match bool::deserialize(d)? {
165            false => Ok(False),
166            true => Err(serde::de::Error::custom("expected literal `false`")),
167        }
168    }
169}
170
171/// Body of a `register.command.response` message.
172#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)]
173#[serde(untagged)]
174pub enum RegisterResult {
175    Ok { ok: True },
176    Err { ok: False, error: RegisterErrorCode },
177}
178
179/// Body of an `execute.command.response` message.
180#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)]
181#[serde(untagged)]
182pub enum ExecuteResult {
183    Ok {
184        ok: True,
185        #[serde(default, skip_serializing_if = "Option::is_none")]
186        result: Option<Value>,
187    },
188    Err {
189        ok: False,
190        error: ExecuteError,
191    },
192}
193
194/// Discriminated union of every message type carried on a channel.
195///
196/// The `type` tag strings are the same dotted identifiers used by the
197/// TypeScript `MessageType` enum.
198//
199// Every variant carries the same base fields (`id` and the optional
200// transport-level `_meta` envelope) plus its own variant-specific data.
201// New cross-cutting message fields should be added in `MessageMeta` (or as
202// new common fields here on every variant) rather than baked into each
203// variant's payload.
204#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)]
205#[serde(tag = "type")]
206pub enum Message {
207    #[serde(rename = "register.command.request")]
208    RegisterCommandRequest {
209        id: MessageId,
210        #[serde(rename = "_meta", default, skip_serializing_if = "Option::is_none")]
211        meta: Option<MessageMeta>,
212        command: CommandDef,
213    },
214
215    #[serde(rename = "register.command.response")]
216    RegisterCommandResponse {
217        id: MessageId,
218        #[serde(rename = "_meta", default, skip_serializing_if = "Option::is_none")]
219        meta: Option<MessageMeta>,
220        thid: MessageId,
221        response: RegisterResult,
222    },
223
224    #[serde(rename = "list.commands.request")]
225    ListCommandsRequest {
226        id: MessageId,
227        #[serde(rename = "_meta", default, skip_serializing_if = "Option::is_none")]
228        meta: Option<MessageMeta>,
229    },
230
231    #[serde(rename = "list.commands.response")]
232    ListCommandsResponse {
233        id: MessageId,
234        #[serde(rename = "_meta", default, skip_serializing_if = "Option::is_none")]
235        meta: Option<MessageMeta>,
236        thid: MessageId,
237        commands: Vec<CommandDef>,
238    },
239
240    #[serde(rename = "execute.command.request")]
241    ExecuteCommandRequest {
242        id: MessageId,
243        #[serde(rename = "_meta", default, skip_serializing_if = "Option::is_none")]
244        meta: Option<MessageMeta>,
245        #[serde(rename = "commandId")]
246        command_id: String,
247        #[serde(default, skip_serializing_if = "Option::is_none")]
248        request: Option<Value>,
249    },
250
251    #[serde(rename = "execute.command.response")]
252    ExecuteCommandResponse {
253        id: MessageId,
254        #[serde(rename = "_meta", default, skip_serializing_if = "Option::is_none")]
255        meta: Option<MessageMeta>,
256        thid: MessageId,
257        response: ExecuteResult,
258    },
259
260    #[serde(rename = "event")]
261    Event {
262        id: MessageId,
263        #[serde(rename = "_meta", default, skip_serializing_if = "Option::is_none")]
264        meta: Option<MessageMeta>,
265        #[serde(rename = "eventId")]
266        event_id: String,
267        #[serde(default, skip_serializing_if = "Option::is_none")]
268        payload: Option<Value>,
269    },
270}
271
272impl Message {
273    /// Returns the message's `id` field.
274    pub fn id(&self) -> MessageId {
275        match self {
276            Self::RegisterCommandRequest { id, .. }
277            | Self::RegisterCommandResponse { id, .. }
278            | Self::ListCommandsRequest { id, .. }
279            | Self::ListCommandsResponse { id, .. }
280            | Self::ExecuteCommandRequest { id, .. }
281            | Self::ExecuteCommandResponse { id, .. }
282            | Self::Event { id, .. } => *id,
283        }
284    }
285
286    /// Returns the `thid` field for messages that carry one.
287    pub fn thid(&self) -> Option<MessageId> {
288        match self {
289            Self::RegisterCommandResponse { thid, .. }
290            | Self::ListCommandsResponse { thid, .. }
291            | Self::ExecuteCommandResponse { thid, .. } => Some(*thid),
292            _ => None,
293        }
294    }
295
296    /// Returns the optional transport-level `_meta` envelope.
297    pub fn meta(&self) -> Option<&MessageMeta> {
298        match self {
299            Self::RegisterCommandRequest { meta, .. }
300            | Self::RegisterCommandResponse { meta, .. }
301            | Self::ListCommandsRequest { meta, .. }
302            | Self::ListCommandsResponse { meta, .. }
303            | Self::ExecuteCommandRequest { meta, .. }
304            | Self::ExecuteCommandResponse { meta, .. }
305            | Self::Event { meta, .. } => meta.as_ref(),
306        }
307    }
308}
309
310#[cfg(test)]
311mod tests {
312    use super::*;
313    use serde_json::json;
314
315    fn uid(s: &str) -> Uuid {
316        Uuid::parse_str(s).unwrap()
317    }
318
319    /// Every fixture below is paired JSON that the TypeScript library
320    /// produces (or accepts). Round-tripping through `Message` must leave
321    /// the semantic JSON tree unchanged.
322    fn check_roundtrip(msg: &Message, expected: Value) {
323        let serialized = serde_json::to_value(msg).unwrap();
324        assert_eq!(
325            serialized, expected,
326            "serialized form does not match fixture"
327        );
328        let parsed: Message = serde_json::from_value(expected.clone()).unwrap();
329        assert_eq!(&parsed, msg, "fixture did not deserialize back to input");
330    }
331
332    #[test]
333    fn register_command_request_roundtrip() {
334        let msg = Message::RegisterCommandRequest {
335            id: uid("11111111-1111-1111-1111-111111111111"),
336            meta: None,
337            command: CommandDef {
338                id: "math.add".to_string(),
339                description: Some("Adds two numbers".to_string()),
340                schema: Some(CommandSchema {
341                    request: Some(json!({
342                        "type": "object",
343                        "properties": { "a": { "type": "number" }, "b": { "type": "number" } },
344                        "required": ["a", "b"]
345                    })),
346                    response: Some(json!({ "type": "number" })),
347                }),
348            },
349        };
350        check_roundtrip(
351            &msg,
352            json!({
353                "type": "register.command.request",
354                "id": "11111111-1111-1111-1111-111111111111",
355                "command": {
356                    "id": "math.add",
357                    "description": "Adds two numbers",
358                    "schema": {
359                        "request": {
360                            "type": "object",
361                            "properties": { "a": { "type": "number" }, "b": { "type": "number" } },
362                            "required": ["a", "b"]
363                        },
364                        "response": { "type": "number" }
365                    }
366                }
367            }),
368        );
369    }
370
371    #[test]
372    fn register_command_response_ok_roundtrip() {
373        let msg = Message::RegisterCommandResponse {
374            id: uid("22222222-2222-2222-2222-222222222222"),
375            meta: None,
376            thid: uid("11111111-1111-1111-1111-111111111111"),
377            response: RegisterResult::Ok { ok: True },
378        };
379        check_roundtrip(
380            &msg,
381            json!({
382                "type": "register.command.response",
383                "id": "22222222-2222-2222-2222-222222222222",
384                "thid": "11111111-1111-1111-1111-111111111111",
385                "response": { "ok": true }
386            }),
387        );
388    }
389
390    #[test]
391    fn register_command_response_err_roundtrip() {
392        let msg = Message::RegisterCommandResponse {
393            id: uid("22222222-2222-2222-2222-222222222222"),
394            meta: None,
395            thid: uid("11111111-1111-1111-1111-111111111111"),
396            response: RegisterResult::Err {
397                ok: False,
398                error: RegisterErrorCode::DuplicateCommand,
399            },
400        };
401        check_roundtrip(
402            &msg,
403            json!({
404                "type": "register.command.response",
405                "id": "22222222-2222-2222-2222-222222222222",
406                "thid": "11111111-1111-1111-1111-111111111111",
407                "response": { "ok": false, "error": "duplicate_command" }
408            }),
409        );
410    }
411
412    #[test]
413    fn list_commands_request_roundtrip() {
414        let msg = Message::ListCommandsRequest {
415            id: uid("33333333-3333-3333-3333-333333333333"),
416            meta: None,
417        };
418        check_roundtrip(
419            &msg,
420            json!({
421                "type": "list.commands.request",
422                "id": "33333333-3333-3333-3333-333333333333"
423            }),
424        );
425    }
426
427    #[test]
428    fn list_commands_response_roundtrip() {
429        let msg = Message::ListCommandsResponse {
430            id: uid("44444444-4444-4444-4444-444444444444"),
431            meta: None,
432            thid: uid("33333333-3333-3333-3333-333333333333"),
433            commands: vec![CommandDef {
434                id: "user.create".to_string(),
435                description: None,
436                schema: None,
437            }],
438        };
439        check_roundtrip(
440            &msg,
441            json!({
442                "type": "list.commands.response",
443                "id": "44444444-4444-4444-4444-444444444444",
444                "thid": "33333333-3333-3333-3333-333333333333",
445                "commands": [{ "id": "user.create" }]
446            }),
447        );
448    }
449
450    #[test]
451    fn execute_command_request_roundtrip() {
452        let msg = Message::ExecuteCommandRequest {
453            id: uid("55555555-5555-5555-5555-555555555555"),
454            meta: None,
455            command_id: "math.add".to_string(),
456            request: Some(json!({ "a": 1, "b": 2 })),
457        };
458        check_roundtrip(
459            &msg,
460            json!({
461                "type": "execute.command.request",
462                "id": "55555555-5555-5555-5555-555555555555",
463                "commandId": "math.add",
464                "request": { "a": 1, "b": 2 }
465            }),
466        );
467    }
468
469    #[test]
470    fn execute_command_request_no_payload() {
471        let msg = Message::ExecuteCommandRequest {
472            id: uid("55555555-5555-5555-5555-555555555555"),
473            meta: None,
474            command_id: "system.ping".to_string(),
475            request: None,
476        };
477        check_roundtrip(
478            &msg,
479            json!({
480                "type": "execute.command.request",
481                "id": "55555555-5555-5555-5555-555555555555",
482                "commandId": "system.ping"
483            }),
484        );
485    }
486
487    #[test]
488    fn execute_command_response_ok_roundtrip() {
489        let msg = Message::ExecuteCommandResponse {
490            id: uid("66666666-6666-6666-6666-666666666666"),
491            meta: None,
492            thid: uid("55555555-5555-5555-5555-555555555555"),
493            response: ExecuteResult::Ok {
494                ok: True,
495                result: Some(json!(3)),
496            },
497        };
498        check_roundtrip(
499            &msg,
500            json!({
501                "type": "execute.command.response",
502                "id": "66666666-6666-6666-6666-666666666666",
503                "thid": "55555555-5555-5555-5555-555555555555",
504                "response": { "ok": true, "result": 3 }
505            }),
506        );
507    }
508
509    #[test]
510    fn execute_command_response_err_roundtrip() {
511        let msg = Message::ExecuteCommandResponse {
512            id: uid("66666666-6666-6666-6666-666666666666"),
513            meta: None,
514            thid: uid("55555555-5555-5555-5555-555555555555"),
515            response: ExecuteResult::Err {
516                ok: False,
517                error: ExecuteError {
518                    code: ExecuteErrorCode::NotFound,
519                    message: "no such command".to_string(),
520                },
521            },
522        };
523        check_roundtrip(
524            &msg,
525            json!({
526                "type": "execute.command.response",
527                "id": "66666666-6666-6666-6666-666666666666",
528                "thid": "55555555-5555-5555-5555-555555555555",
529                "response": {
530                    "ok": false,
531                    "error": { "code": "not_found", "message": "no such command" }
532                }
533            }),
534        );
535    }
536
537    #[test]
538    fn event_roundtrip() {
539        let msg = Message::Event {
540            id: uid("77777777-7777-7777-7777-777777777777"),
541            meta: None,
542            event_id: "user.created".to_string(),
543            payload: Some(json!({ "userId": "u1" })),
544        };
545        check_roundtrip(
546            &msg,
547            json!({
548                "type": "event",
549                "id": "77777777-7777-7777-7777-777777777777",
550                "eventId": "user.created",
551                "payload": { "userId": "u1" }
552            }),
553        );
554    }
555
556    #[test]
557    fn event_private_prefix_preserved() {
558        // Private events (leading underscore) are still valid wire-level;
559        // privacy is enforced by the registry, not the message schema.
560        let msg = Message::Event {
561            id: uid("77777777-7777-7777-7777-777777777777"),
562            meta: None,
563            event_id: "_internal.tick".to_string(),
564            payload: None,
565        };
566        check_roundtrip(
567            &msg,
568            json!({
569                "type": "event",
570                "id": "77777777-7777-7777-7777-777777777777",
571                "eventId": "_internal.tick"
572            }),
573        );
574    }
575
576    #[test]
577    fn unknown_type_rejected() {
578        let bad =
579            json!({ "type": "not.a.real.type", "id": "00000000-0000-0000-0000-000000000000" });
580        assert!(serde_json::from_value::<Message>(bad).is_err());
581    }
582
583    #[test]
584    fn register_result_err_requires_ok_false() {
585        // `ok: true` with an `error` field must not parse as Err.
586        let bad = json!({ "ok": true, "error": "duplicate_command" });
587        let parsed: RegisterResult = serde_json::from_value(bad).unwrap();
588        assert!(matches!(parsed, RegisterResult::Ok { .. }));
589    }
590
591    #[test]
592    fn meta_absent_serializes_clean() {
593        // Missing _meta MUST NOT appear in the serialized form.
594        let msg = Message::ListCommandsRequest {
595            id: uid("11111111-1111-1111-1111-111111111111"),
596            meta: None,
597        };
598        let json = serde_json::to_value(&msg).unwrap();
599        assert!(
600            json.get("_meta").is_none(),
601            "absent _meta must not serialize: {json:?}"
602        );
603    }
604
605    #[test]
606    fn meta_w3c_only_roundtrip() {
607        let msg = Message::ExecuteCommandRequest {
608            id: uid("55555555-5555-5555-5555-555555555555"),
609            meta: Some(MessageMeta {
610                traceparent: Some("00-0af7651916cd43dd8448eb211c80319c-b7ad6b7169203331-01".into()),
611                tracestate: Some("vendor1=opaque".into()),
612                baggage: Some("userId=alice".into()),
613                extra: BTreeMap::new(),
614            }),
615            command_id: "math.add".into(),
616            request: Some(json!({ "a": 1, "b": 2 })),
617        };
618        check_roundtrip(
619            &msg,
620            json!({
621                "type": "execute.command.request",
622                "id": "55555555-5555-5555-5555-555555555555",
623                "_meta": {
624                    "traceparent": "00-0af7651916cd43dd8448eb211c80319c-b7ad6b7169203331-01",
625                    "tracestate": "vendor1=opaque",
626                    "baggage": "userId=alice"
627                },
628                "commandId": "math.add",
629                "request": { "a": 1, "b": 2 }
630            }),
631        );
632    }
633
634    #[test]
635    fn meta_extras_round_trip_via_flatten() {
636        // Free-form keys land in `extra` and re-serialize identically.
637        let incoming = json!({
638            "type": "event",
639            "id": "77777777-7777-7777-7777-777777777777",
640            "_meta": {
641                "traceparent": "00-0af7651916cd43dd8448eb211c80319c-b7ad6b7169203331-01",
642                "x-tenant-id": "acme",
643                "retry-count": 2,
644                "routing": { "region": "eu-west-1", "shard": 7 }
645            },
646            "eventId": "user.created",
647            "payload": { "userId": "u1" }
648        });
649        let parsed: Message = serde_json::from_value(incoming.clone()).unwrap();
650        let meta = parsed.meta().expect("_meta present");
651        assert_eq!(
652            meta.traceparent.as_deref(),
653            Some("00-0af7651916cd43dd8448eb211c80319c-b7ad6b7169203331-01")
654        );
655        assert_eq!(meta.extra.get("x-tenant-id"), Some(&json!("acme")));
656        assert_eq!(meta.extra.get("retry-count"), Some(&json!(2)));
657        assert_eq!(
658            meta.extra.get("routing"),
659            Some(&json!({ "region": "eu-west-1", "shard": 7 }))
660        );
661
662        // Re-serialize and compare semantic JSON equality with the input.
663        let reserialized = serde_json::to_value(&parsed).unwrap();
664        assert_eq!(reserialized, incoming);
665    }
666
667    #[test]
668    fn meta_absent_deserializes_to_none() {
669        let incoming = json!({
670            "type": "list.commands.request",
671            "id": "33333333-3333-3333-3333-333333333333"
672        });
673        let parsed: Message = serde_json::from_value(incoming).unwrap();
674        assert!(parsed.meta().is_none());
675    }
676}