1use std::collections::BTreeMap;
11
12use serde::{Deserialize, Deserializer, Serialize, Serializer};
13use serde_json::Value;
14use uuid::Uuid;
15
16use crate::error::{ExecuteErrorCode, RegisterErrorCode};
17
18#[derive(Debug, Clone, Default, PartialEq, Serialize, Deserialize)]
29#[serde(default)]
30pub struct MessageMeta {
31 #[serde(skip_serializing_if = "Option::is_none")]
32 pub traceparent: Option<String>,
33 #[serde(skip_serializing_if = "Option::is_none")]
34 pub tracestate: Option<String>,
35 #[serde(skip_serializing_if = "Option::is_none")]
36 pub baggage: Option<String>,
37
38 #[serde(flatten)]
42 pub extra: BTreeMap<String, Value>,
43}
44
45pub type MessageId = Uuid;
50
51#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)]
55pub struct CommandDef {
56 pub id: String,
57 #[serde(default, skip_serializing_if = "Option::is_none")]
58 pub description: Option<String>,
59 #[serde(default, skip_serializing_if = "Option::is_none")]
60 pub schema: Option<CommandSchema>,
61}
62
63#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)]
70pub struct CommandSchema {
71 #[serde(default, skip_serializing_if = "Option::is_none")]
72 pub request: Option<Value>,
73 #[serde(default, skip_serializing_if = "Option::is_none")]
74 pub response: Option<Value>,
75}
76
77impl CommandSchema {
78 pub fn empty() -> Self {
82 Self {
83 request: None,
84 response: None,
85 }
86 }
87
88 pub fn permissive() -> Self {
97 Self {
98 request: Some(serde_json::json!({
99 "type": "object",
100 "additionalProperties": true,
101 })),
102 response: Some(serde_json::json!({
103 "type": "object",
104 "additionalProperties": true,
105 })),
106 }
107 }
108
109 pub fn with_request(mut self, schema: Value) -> Self {
111 self.request = Some(schema);
112 self
113 }
114
115 pub fn with_response(mut self, schema: Value) -> Self {
117 self.response = Some(schema);
118 self
119 }
120}
121
122#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)]
124pub struct ExecuteError {
125 pub code: ExecuteErrorCode,
126 pub message: String,
127}
128
129#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
135pub struct True;
136
137impl Serialize for True {
138 fn serialize<S: Serializer>(&self, s: S) -> Result<S::Ok, S::Error> {
139 s.serialize_bool(true)
140 }
141}
142
143impl<'de> Deserialize<'de> for True {
144 fn deserialize<D: Deserializer<'de>>(d: D) -> Result<Self, D::Error> {
145 match bool::deserialize(d)? {
146 true => Ok(True),
147 false => Err(serde::de::Error::custom("expected literal `true`")),
148 }
149 }
150}
151
152#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
154pub struct False;
155
156impl Serialize for False {
157 fn serialize<S: Serializer>(&self, s: S) -> Result<S::Ok, S::Error> {
158 s.serialize_bool(false)
159 }
160}
161
162impl<'de> Deserialize<'de> for False {
163 fn deserialize<D: Deserializer<'de>>(d: D) -> Result<Self, D::Error> {
164 match bool::deserialize(d)? {
165 false => Ok(False),
166 true => Err(serde::de::Error::custom("expected literal `false`")),
167 }
168 }
169}
170
171#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)]
173#[serde(untagged)]
174pub enum RegisterResult {
175 Ok { ok: True },
176 Err { ok: False, error: RegisterErrorCode },
177}
178
179#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)]
181#[serde(untagged)]
182pub enum ExecuteResult {
183 Ok {
184 ok: True,
185 #[serde(default, skip_serializing_if = "Option::is_none")]
186 result: Option<Value>,
187 },
188 Err {
189 ok: False,
190 error: ExecuteError,
191 },
192}
193
194#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)]
205#[serde(tag = "type")]
206pub enum Message {
207 #[serde(rename = "register.command.request")]
208 RegisterCommandRequest {
209 id: MessageId,
210 #[serde(rename = "_meta", default, skip_serializing_if = "Option::is_none")]
211 meta: Option<MessageMeta>,
212 command: CommandDef,
213 },
214
215 #[serde(rename = "register.command.response")]
216 RegisterCommandResponse {
217 id: MessageId,
218 #[serde(rename = "_meta", default, skip_serializing_if = "Option::is_none")]
219 meta: Option<MessageMeta>,
220 thid: MessageId,
221 response: RegisterResult,
222 },
223
224 #[serde(rename = "list.commands.request")]
225 ListCommandsRequest {
226 id: MessageId,
227 #[serde(rename = "_meta", default, skip_serializing_if = "Option::is_none")]
228 meta: Option<MessageMeta>,
229 },
230
231 #[serde(rename = "list.commands.response")]
232 ListCommandsResponse {
233 id: MessageId,
234 #[serde(rename = "_meta", default, skip_serializing_if = "Option::is_none")]
235 meta: Option<MessageMeta>,
236 thid: MessageId,
237 commands: Vec<CommandDef>,
238 },
239
240 #[serde(rename = "execute.command.request")]
241 ExecuteCommandRequest {
242 id: MessageId,
243 #[serde(rename = "_meta", default, skip_serializing_if = "Option::is_none")]
244 meta: Option<MessageMeta>,
245 #[serde(rename = "commandId")]
246 command_id: String,
247 #[serde(default, skip_serializing_if = "Option::is_none")]
248 request: Option<Value>,
249 },
250
251 #[serde(rename = "execute.command.response")]
252 ExecuteCommandResponse {
253 id: MessageId,
254 #[serde(rename = "_meta", default, skip_serializing_if = "Option::is_none")]
255 meta: Option<MessageMeta>,
256 thid: MessageId,
257 response: ExecuteResult,
258 },
259
260 #[serde(rename = "event")]
261 Event {
262 id: MessageId,
263 #[serde(rename = "_meta", default, skip_serializing_if = "Option::is_none")]
264 meta: Option<MessageMeta>,
265 #[serde(rename = "eventId")]
266 event_id: String,
267 #[serde(default, skip_serializing_if = "Option::is_none")]
268 payload: Option<Value>,
269 },
270}
271
272impl Message {
273 pub fn id(&self) -> MessageId {
275 match self {
276 Self::RegisterCommandRequest { id, .. }
277 | Self::RegisterCommandResponse { id, .. }
278 | Self::ListCommandsRequest { id, .. }
279 | Self::ListCommandsResponse { id, .. }
280 | Self::ExecuteCommandRequest { id, .. }
281 | Self::ExecuteCommandResponse { id, .. }
282 | Self::Event { id, .. } => *id,
283 }
284 }
285
286 pub fn thid(&self) -> Option<MessageId> {
288 match self {
289 Self::RegisterCommandResponse { thid, .. }
290 | Self::ListCommandsResponse { thid, .. }
291 | Self::ExecuteCommandResponse { thid, .. } => Some(*thid),
292 _ => None,
293 }
294 }
295
296 pub fn meta(&self) -> Option<&MessageMeta> {
298 match self {
299 Self::RegisterCommandRequest { meta, .. }
300 | Self::RegisterCommandResponse { meta, .. }
301 | Self::ListCommandsRequest { meta, .. }
302 | Self::ListCommandsResponse { meta, .. }
303 | Self::ExecuteCommandRequest { meta, .. }
304 | Self::ExecuteCommandResponse { meta, .. }
305 | Self::Event { meta, .. } => meta.as_ref(),
306 }
307 }
308}
309
310#[cfg(test)]
311mod tests {
312 use super::*;
313 use serde_json::json;
314
315 fn uid(s: &str) -> Uuid {
316 Uuid::parse_str(s).unwrap()
317 }
318
319 fn check_roundtrip(msg: &Message, expected: Value) {
323 let serialized = serde_json::to_value(msg).unwrap();
324 assert_eq!(
325 serialized, expected,
326 "serialized form does not match fixture"
327 );
328 let parsed: Message = serde_json::from_value(expected.clone()).unwrap();
329 assert_eq!(&parsed, msg, "fixture did not deserialize back to input");
330 }
331
332 #[test]
333 fn register_command_request_roundtrip() {
334 let msg = Message::RegisterCommandRequest {
335 id: uid("11111111-1111-1111-1111-111111111111"),
336 meta: None,
337 command: CommandDef {
338 id: "math.add".to_string(),
339 description: Some("Adds two numbers".to_string()),
340 schema: Some(CommandSchema {
341 request: Some(json!({
342 "type": "object",
343 "properties": { "a": { "type": "number" }, "b": { "type": "number" } },
344 "required": ["a", "b"]
345 })),
346 response: Some(json!({ "type": "number" })),
347 }),
348 },
349 };
350 check_roundtrip(
351 &msg,
352 json!({
353 "type": "register.command.request",
354 "id": "11111111-1111-1111-1111-111111111111",
355 "command": {
356 "id": "math.add",
357 "description": "Adds two numbers",
358 "schema": {
359 "request": {
360 "type": "object",
361 "properties": { "a": { "type": "number" }, "b": { "type": "number" } },
362 "required": ["a", "b"]
363 },
364 "response": { "type": "number" }
365 }
366 }
367 }),
368 );
369 }
370
371 #[test]
372 fn register_command_response_ok_roundtrip() {
373 let msg = Message::RegisterCommandResponse {
374 id: uid("22222222-2222-2222-2222-222222222222"),
375 meta: None,
376 thid: uid("11111111-1111-1111-1111-111111111111"),
377 response: RegisterResult::Ok { ok: True },
378 };
379 check_roundtrip(
380 &msg,
381 json!({
382 "type": "register.command.response",
383 "id": "22222222-2222-2222-2222-222222222222",
384 "thid": "11111111-1111-1111-1111-111111111111",
385 "response": { "ok": true }
386 }),
387 );
388 }
389
390 #[test]
391 fn register_command_response_err_roundtrip() {
392 let msg = Message::RegisterCommandResponse {
393 id: uid("22222222-2222-2222-2222-222222222222"),
394 meta: None,
395 thid: uid("11111111-1111-1111-1111-111111111111"),
396 response: RegisterResult::Err {
397 ok: False,
398 error: RegisterErrorCode::DuplicateCommand,
399 },
400 };
401 check_roundtrip(
402 &msg,
403 json!({
404 "type": "register.command.response",
405 "id": "22222222-2222-2222-2222-222222222222",
406 "thid": "11111111-1111-1111-1111-111111111111",
407 "response": { "ok": false, "error": "duplicate_command" }
408 }),
409 );
410 }
411
412 #[test]
413 fn list_commands_request_roundtrip() {
414 let msg = Message::ListCommandsRequest {
415 id: uid("33333333-3333-3333-3333-333333333333"),
416 meta: None,
417 };
418 check_roundtrip(
419 &msg,
420 json!({
421 "type": "list.commands.request",
422 "id": "33333333-3333-3333-3333-333333333333"
423 }),
424 );
425 }
426
427 #[test]
428 fn list_commands_response_roundtrip() {
429 let msg = Message::ListCommandsResponse {
430 id: uid("44444444-4444-4444-4444-444444444444"),
431 meta: None,
432 thid: uid("33333333-3333-3333-3333-333333333333"),
433 commands: vec![CommandDef {
434 id: "user.create".to_string(),
435 description: None,
436 schema: None,
437 }],
438 };
439 check_roundtrip(
440 &msg,
441 json!({
442 "type": "list.commands.response",
443 "id": "44444444-4444-4444-4444-444444444444",
444 "thid": "33333333-3333-3333-3333-333333333333",
445 "commands": [{ "id": "user.create" }]
446 }),
447 );
448 }
449
450 #[test]
451 fn execute_command_request_roundtrip() {
452 let msg = Message::ExecuteCommandRequest {
453 id: uid("55555555-5555-5555-5555-555555555555"),
454 meta: None,
455 command_id: "math.add".to_string(),
456 request: Some(json!({ "a": 1, "b": 2 })),
457 };
458 check_roundtrip(
459 &msg,
460 json!({
461 "type": "execute.command.request",
462 "id": "55555555-5555-5555-5555-555555555555",
463 "commandId": "math.add",
464 "request": { "a": 1, "b": 2 }
465 }),
466 );
467 }
468
469 #[test]
470 fn execute_command_request_no_payload() {
471 let msg = Message::ExecuteCommandRequest {
472 id: uid("55555555-5555-5555-5555-555555555555"),
473 meta: None,
474 command_id: "system.ping".to_string(),
475 request: None,
476 };
477 check_roundtrip(
478 &msg,
479 json!({
480 "type": "execute.command.request",
481 "id": "55555555-5555-5555-5555-555555555555",
482 "commandId": "system.ping"
483 }),
484 );
485 }
486
487 #[test]
488 fn execute_command_response_ok_roundtrip() {
489 let msg = Message::ExecuteCommandResponse {
490 id: uid("66666666-6666-6666-6666-666666666666"),
491 meta: None,
492 thid: uid("55555555-5555-5555-5555-555555555555"),
493 response: ExecuteResult::Ok {
494 ok: True,
495 result: Some(json!(3)),
496 },
497 };
498 check_roundtrip(
499 &msg,
500 json!({
501 "type": "execute.command.response",
502 "id": "66666666-6666-6666-6666-666666666666",
503 "thid": "55555555-5555-5555-5555-555555555555",
504 "response": { "ok": true, "result": 3 }
505 }),
506 );
507 }
508
509 #[test]
510 fn execute_command_response_err_roundtrip() {
511 let msg = Message::ExecuteCommandResponse {
512 id: uid("66666666-6666-6666-6666-666666666666"),
513 meta: None,
514 thid: uid("55555555-5555-5555-5555-555555555555"),
515 response: ExecuteResult::Err {
516 ok: False,
517 error: ExecuteError {
518 code: ExecuteErrorCode::NotFound,
519 message: "no such command".to_string(),
520 },
521 },
522 };
523 check_roundtrip(
524 &msg,
525 json!({
526 "type": "execute.command.response",
527 "id": "66666666-6666-6666-6666-666666666666",
528 "thid": "55555555-5555-5555-5555-555555555555",
529 "response": {
530 "ok": false,
531 "error": { "code": "not_found", "message": "no such command" }
532 }
533 }),
534 );
535 }
536
537 #[test]
538 fn event_roundtrip() {
539 let msg = Message::Event {
540 id: uid("77777777-7777-7777-7777-777777777777"),
541 meta: None,
542 event_id: "user.created".to_string(),
543 payload: Some(json!({ "userId": "u1" })),
544 };
545 check_roundtrip(
546 &msg,
547 json!({
548 "type": "event",
549 "id": "77777777-7777-7777-7777-777777777777",
550 "eventId": "user.created",
551 "payload": { "userId": "u1" }
552 }),
553 );
554 }
555
556 #[test]
557 fn event_private_prefix_preserved() {
558 let msg = Message::Event {
561 id: uid("77777777-7777-7777-7777-777777777777"),
562 meta: None,
563 event_id: "_internal.tick".to_string(),
564 payload: None,
565 };
566 check_roundtrip(
567 &msg,
568 json!({
569 "type": "event",
570 "id": "77777777-7777-7777-7777-777777777777",
571 "eventId": "_internal.tick"
572 }),
573 );
574 }
575
576 #[test]
577 fn unknown_type_rejected() {
578 let bad =
579 json!({ "type": "not.a.real.type", "id": "00000000-0000-0000-0000-000000000000" });
580 assert!(serde_json::from_value::<Message>(bad).is_err());
581 }
582
583 #[test]
584 fn register_result_err_requires_ok_false() {
585 let bad = json!({ "ok": true, "error": "duplicate_command" });
587 let parsed: RegisterResult = serde_json::from_value(bad).unwrap();
588 assert!(matches!(parsed, RegisterResult::Ok { .. }));
589 }
590
591 #[test]
592 fn meta_absent_serializes_clean() {
593 let msg = Message::ListCommandsRequest {
595 id: uid("11111111-1111-1111-1111-111111111111"),
596 meta: None,
597 };
598 let json = serde_json::to_value(&msg).unwrap();
599 assert!(
600 json.get("_meta").is_none(),
601 "absent _meta must not serialize: {json:?}"
602 );
603 }
604
605 #[test]
606 fn meta_w3c_only_roundtrip() {
607 let msg = Message::ExecuteCommandRequest {
608 id: uid("55555555-5555-5555-5555-555555555555"),
609 meta: Some(MessageMeta {
610 traceparent: Some("00-0af7651916cd43dd8448eb211c80319c-b7ad6b7169203331-01".into()),
611 tracestate: Some("vendor1=opaque".into()),
612 baggage: Some("userId=alice".into()),
613 extra: BTreeMap::new(),
614 }),
615 command_id: "math.add".into(),
616 request: Some(json!({ "a": 1, "b": 2 })),
617 };
618 check_roundtrip(
619 &msg,
620 json!({
621 "type": "execute.command.request",
622 "id": "55555555-5555-5555-5555-555555555555",
623 "_meta": {
624 "traceparent": "00-0af7651916cd43dd8448eb211c80319c-b7ad6b7169203331-01",
625 "tracestate": "vendor1=opaque",
626 "baggage": "userId=alice"
627 },
628 "commandId": "math.add",
629 "request": { "a": 1, "b": 2 }
630 }),
631 );
632 }
633
634 #[test]
635 fn meta_extras_round_trip_via_flatten() {
636 let incoming = json!({
638 "type": "event",
639 "id": "77777777-7777-7777-7777-777777777777",
640 "_meta": {
641 "traceparent": "00-0af7651916cd43dd8448eb211c80319c-b7ad6b7169203331-01",
642 "x-tenant-id": "acme",
643 "retry-count": 2,
644 "routing": { "region": "eu-west-1", "shard": 7 }
645 },
646 "eventId": "user.created",
647 "payload": { "userId": "u1" }
648 });
649 let parsed: Message = serde_json::from_value(incoming.clone()).unwrap();
650 let meta = parsed.meta().expect("_meta present");
651 assert_eq!(
652 meta.traceparent.as_deref(),
653 Some("00-0af7651916cd43dd8448eb211c80319c-b7ad6b7169203331-01")
654 );
655 assert_eq!(meta.extra.get("x-tenant-id"), Some(&json!("acme")));
656 assert_eq!(meta.extra.get("retry-count"), Some(&json!(2)));
657 assert_eq!(
658 meta.extra.get("routing"),
659 Some(&json!({ "region": "eu-west-1", "shard": 7 }))
660 );
661
662 let reserialized = serde_json::to_value(&parsed).unwrap();
664 assert_eq!(reserialized, incoming);
665 }
666
667 #[test]
668 fn meta_absent_deserializes_to_none() {
669 let incoming = json!({
670 "type": "list.commands.request",
671 "id": "33333333-3333-3333-3333-333333333333"
672 });
673 let parsed: Message = serde_json::from_value(incoming).unwrap();
674 assert!(parsed.meta().is_none());
675 }
676}