use std::collections::BTreeMap;
use serde::{Deserialize, Deserializer, Serialize, Serializer};
use serde_json::Value;
use uuid::Uuid;
use crate::error::{ExecuteErrorCode, RegisterErrorCode};
#[derive(Debug, Clone, Default, PartialEq, Serialize, Deserialize)]
#[serde(default)]
pub struct MessageMeta {
#[serde(skip_serializing_if = "Option::is_none")]
pub traceparent: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub tracestate: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub baggage: Option<String>,
#[serde(flatten)]
pub extra: BTreeMap<String, Value>,
}
pub type MessageId = Uuid;
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)]
pub struct CommandDef {
pub id: String,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub description: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub schema: Option<CommandSchema>,
}
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)]
pub struct CommandSchema {
#[serde(default, skip_serializing_if = "Option::is_none")]
pub request: Option<Value>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub response: Option<Value>,
}
impl CommandSchema {
pub fn empty() -> Self {
Self {
request: None,
response: None,
}
}
pub fn permissive() -> Self {
Self {
request: Some(serde_json::json!({
"type": "object",
"additionalProperties": true,
})),
response: Some(serde_json::json!({
"type": "object",
"additionalProperties": true,
})),
}
}
pub fn with_request(mut self, schema: Value) -> Self {
self.request = Some(schema);
self
}
pub fn with_response(mut self, schema: Value) -> Self {
self.response = Some(schema);
self
}
}
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)]
pub struct ExecuteError {
pub code: ExecuteErrorCode,
pub message: String,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
pub struct True;
impl Serialize for True {
fn serialize<S: Serializer>(&self, s: S) -> Result<S::Ok, S::Error> {
s.serialize_bool(true)
}
}
impl<'de> Deserialize<'de> for True {
fn deserialize<D: Deserializer<'de>>(d: D) -> Result<Self, D::Error> {
match bool::deserialize(d)? {
true => Ok(True),
false => Err(serde::de::Error::custom("expected literal `true`")),
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
pub struct False;
impl Serialize for False {
fn serialize<S: Serializer>(&self, s: S) -> Result<S::Ok, S::Error> {
s.serialize_bool(false)
}
}
impl<'de> Deserialize<'de> for False {
fn deserialize<D: Deserializer<'de>>(d: D) -> Result<Self, D::Error> {
match bool::deserialize(d)? {
false => Ok(False),
true => Err(serde::de::Error::custom("expected literal `false`")),
}
}
}
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)]
#[serde(untagged)]
pub enum RegisterResult {
Ok { ok: True },
Err { ok: False, error: RegisterErrorCode },
}
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)]
#[serde(untagged)]
pub enum ExecuteResult {
Ok {
ok: True,
#[serde(default, skip_serializing_if = "Option::is_none")]
result: Option<Value>,
},
Err {
ok: False,
error: ExecuteError,
},
}
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)]
#[serde(tag = "type")]
pub enum Message {
#[serde(rename = "register.command.request")]
RegisterCommandRequest {
id: MessageId,
#[serde(rename = "_meta", default, skip_serializing_if = "Option::is_none")]
meta: Option<MessageMeta>,
command: CommandDef,
},
#[serde(rename = "register.command.response")]
RegisterCommandResponse {
id: MessageId,
#[serde(rename = "_meta", default, skip_serializing_if = "Option::is_none")]
meta: Option<MessageMeta>,
thid: MessageId,
response: RegisterResult,
},
#[serde(rename = "list.commands.request")]
ListCommandsRequest {
id: MessageId,
#[serde(rename = "_meta", default, skip_serializing_if = "Option::is_none")]
meta: Option<MessageMeta>,
},
#[serde(rename = "list.commands.response")]
ListCommandsResponse {
id: MessageId,
#[serde(rename = "_meta", default, skip_serializing_if = "Option::is_none")]
meta: Option<MessageMeta>,
thid: MessageId,
commands: Vec<CommandDef>,
},
#[serde(rename = "execute.command.request")]
ExecuteCommandRequest {
id: MessageId,
#[serde(rename = "_meta", default, skip_serializing_if = "Option::is_none")]
meta: Option<MessageMeta>,
#[serde(rename = "commandId")]
command_id: String,
#[serde(default, skip_serializing_if = "Option::is_none")]
request: Option<Value>,
},
#[serde(rename = "execute.command.response")]
ExecuteCommandResponse {
id: MessageId,
#[serde(rename = "_meta", default, skip_serializing_if = "Option::is_none")]
meta: Option<MessageMeta>,
thid: MessageId,
response: ExecuteResult,
},
#[serde(rename = "event")]
Event {
id: MessageId,
#[serde(rename = "_meta", default, skip_serializing_if = "Option::is_none")]
meta: Option<MessageMeta>,
#[serde(rename = "eventId")]
event_id: String,
#[serde(default, skip_serializing_if = "Option::is_none")]
payload: Option<Value>,
},
}
impl Message {
pub fn id(&self) -> MessageId {
match self {
Self::RegisterCommandRequest { id, .. }
| Self::RegisterCommandResponse { id, .. }
| Self::ListCommandsRequest { id, .. }
| Self::ListCommandsResponse { id, .. }
| Self::ExecuteCommandRequest { id, .. }
| Self::ExecuteCommandResponse { id, .. }
| Self::Event { id, .. } => *id,
}
}
pub fn thid(&self) -> Option<MessageId> {
match self {
Self::RegisterCommandResponse { thid, .. }
| Self::ListCommandsResponse { thid, .. }
| Self::ExecuteCommandResponse { thid, .. } => Some(*thid),
_ => None,
}
}
pub fn meta(&self) -> Option<&MessageMeta> {
match self {
Self::RegisterCommandRequest { meta, .. }
| Self::RegisterCommandResponse { meta, .. }
| Self::ListCommandsRequest { meta, .. }
| Self::ListCommandsResponse { meta, .. }
| Self::ExecuteCommandRequest { meta, .. }
| Self::ExecuteCommandResponse { meta, .. }
| Self::Event { meta, .. } => meta.as_ref(),
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use serde_json::json;
fn uid(s: &str) -> Uuid {
Uuid::parse_str(s).unwrap()
}
fn check_roundtrip(msg: &Message, expected: Value) {
let serialized = serde_json::to_value(msg).unwrap();
assert_eq!(
serialized, expected,
"serialized form does not match fixture"
);
let parsed: Message = serde_json::from_value(expected.clone()).unwrap();
assert_eq!(&parsed, msg, "fixture did not deserialize back to input");
}
#[test]
fn register_command_request_roundtrip() {
let msg = Message::RegisterCommandRequest {
id: uid("11111111-1111-1111-1111-111111111111"),
meta: None,
command: CommandDef {
id: "math.add".to_string(),
description: Some("Adds two numbers".to_string()),
schema: Some(CommandSchema {
request: Some(json!({
"type": "object",
"properties": { "a": { "type": "number" }, "b": { "type": "number" } },
"required": ["a", "b"]
})),
response: Some(json!({ "type": "number" })),
}),
},
};
check_roundtrip(
&msg,
json!({
"type": "register.command.request",
"id": "11111111-1111-1111-1111-111111111111",
"command": {
"id": "math.add",
"description": "Adds two numbers",
"schema": {
"request": {
"type": "object",
"properties": { "a": { "type": "number" }, "b": { "type": "number" } },
"required": ["a", "b"]
},
"response": { "type": "number" }
}
}
}),
);
}
#[test]
fn register_command_response_ok_roundtrip() {
let msg = Message::RegisterCommandResponse {
id: uid("22222222-2222-2222-2222-222222222222"),
meta: None,
thid: uid("11111111-1111-1111-1111-111111111111"),
response: RegisterResult::Ok { ok: True },
};
check_roundtrip(
&msg,
json!({
"type": "register.command.response",
"id": "22222222-2222-2222-2222-222222222222",
"thid": "11111111-1111-1111-1111-111111111111",
"response": { "ok": true }
}),
);
}
#[test]
fn register_command_response_err_roundtrip() {
let msg = Message::RegisterCommandResponse {
id: uid("22222222-2222-2222-2222-222222222222"),
meta: None,
thid: uid("11111111-1111-1111-1111-111111111111"),
response: RegisterResult::Err {
ok: False,
error: RegisterErrorCode::DuplicateCommand,
},
};
check_roundtrip(
&msg,
json!({
"type": "register.command.response",
"id": "22222222-2222-2222-2222-222222222222",
"thid": "11111111-1111-1111-1111-111111111111",
"response": { "ok": false, "error": "duplicate_command" }
}),
);
}
#[test]
fn list_commands_request_roundtrip() {
let msg = Message::ListCommandsRequest {
id: uid("33333333-3333-3333-3333-333333333333"),
meta: None,
};
check_roundtrip(
&msg,
json!({
"type": "list.commands.request",
"id": "33333333-3333-3333-3333-333333333333"
}),
);
}
#[test]
fn list_commands_response_roundtrip() {
let msg = Message::ListCommandsResponse {
id: uid("44444444-4444-4444-4444-444444444444"),
meta: None,
thid: uid("33333333-3333-3333-3333-333333333333"),
commands: vec![CommandDef {
id: "user.create".to_string(),
description: None,
schema: None,
}],
};
check_roundtrip(
&msg,
json!({
"type": "list.commands.response",
"id": "44444444-4444-4444-4444-444444444444",
"thid": "33333333-3333-3333-3333-333333333333",
"commands": [{ "id": "user.create" }]
}),
);
}
#[test]
fn execute_command_request_roundtrip() {
let msg = Message::ExecuteCommandRequest {
id: uid("55555555-5555-5555-5555-555555555555"),
meta: None,
command_id: "math.add".to_string(),
request: Some(json!({ "a": 1, "b": 2 })),
};
check_roundtrip(
&msg,
json!({
"type": "execute.command.request",
"id": "55555555-5555-5555-5555-555555555555",
"commandId": "math.add",
"request": { "a": 1, "b": 2 }
}),
);
}
#[test]
fn execute_command_request_no_payload() {
let msg = Message::ExecuteCommandRequest {
id: uid("55555555-5555-5555-5555-555555555555"),
meta: None,
command_id: "system.ping".to_string(),
request: None,
};
check_roundtrip(
&msg,
json!({
"type": "execute.command.request",
"id": "55555555-5555-5555-5555-555555555555",
"commandId": "system.ping"
}),
);
}
#[test]
fn execute_command_response_ok_roundtrip() {
let msg = Message::ExecuteCommandResponse {
id: uid("66666666-6666-6666-6666-666666666666"),
meta: None,
thid: uid("55555555-5555-5555-5555-555555555555"),
response: ExecuteResult::Ok {
ok: True,
result: Some(json!(3)),
},
};
check_roundtrip(
&msg,
json!({
"type": "execute.command.response",
"id": "66666666-6666-6666-6666-666666666666",
"thid": "55555555-5555-5555-5555-555555555555",
"response": { "ok": true, "result": 3 }
}),
);
}
#[test]
fn execute_command_response_err_roundtrip() {
let msg = Message::ExecuteCommandResponse {
id: uid("66666666-6666-6666-6666-666666666666"),
meta: None,
thid: uid("55555555-5555-5555-5555-555555555555"),
response: ExecuteResult::Err {
ok: False,
error: ExecuteError {
code: ExecuteErrorCode::NotFound,
message: "no such command".to_string(),
},
},
};
check_roundtrip(
&msg,
json!({
"type": "execute.command.response",
"id": "66666666-6666-6666-6666-666666666666",
"thid": "55555555-5555-5555-5555-555555555555",
"response": {
"ok": false,
"error": { "code": "not_found", "message": "no such command" }
}
}),
);
}
#[test]
fn event_roundtrip() {
let msg = Message::Event {
id: uid("77777777-7777-7777-7777-777777777777"),
meta: None,
event_id: "user.created".to_string(),
payload: Some(json!({ "userId": "u1" })),
};
check_roundtrip(
&msg,
json!({
"type": "event",
"id": "77777777-7777-7777-7777-777777777777",
"eventId": "user.created",
"payload": { "userId": "u1" }
}),
);
}
#[test]
fn event_private_prefix_preserved() {
let msg = Message::Event {
id: uid("77777777-7777-7777-7777-777777777777"),
meta: None,
event_id: "_internal.tick".to_string(),
payload: None,
};
check_roundtrip(
&msg,
json!({
"type": "event",
"id": "77777777-7777-7777-7777-777777777777",
"eventId": "_internal.tick"
}),
);
}
#[test]
fn unknown_type_rejected() {
let bad =
json!({ "type": "not.a.real.type", "id": "00000000-0000-0000-0000-000000000000" });
assert!(serde_json::from_value::<Message>(bad).is_err());
}
#[test]
fn register_result_err_requires_ok_false() {
let bad = json!({ "ok": true, "error": "duplicate_command" });
let parsed: RegisterResult = serde_json::from_value(bad).unwrap();
assert!(matches!(parsed, RegisterResult::Ok { .. }));
}
#[test]
fn meta_absent_serializes_clean() {
let msg = Message::ListCommandsRequest {
id: uid("11111111-1111-1111-1111-111111111111"),
meta: None,
};
let json = serde_json::to_value(&msg).unwrap();
assert!(
json.get("_meta").is_none(),
"absent _meta must not serialize: {json:?}"
);
}
#[test]
fn meta_w3c_only_roundtrip() {
let msg = Message::ExecuteCommandRequest {
id: uid("55555555-5555-5555-5555-555555555555"),
meta: Some(MessageMeta {
traceparent: Some("00-0af7651916cd43dd8448eb211c80319c-b7ad6b7169203331-01".into()),
tracestate: Some("vendor1=opaque".into()),
baggage: Some("userId=alice".into()),
extra: BTreeMap::new(),
}),
command_id: "math.add".into(),
request: Some(json!({ "a": 1, "b": 2 })),
};
check_roundtrip(
&msg,
json!({
"type": "execute.command.request",
"id": "55555555-5555-5555-5555-555555555555",
"_meta": {
"traceparent": "00-0af7651916cd43dd8448eb211c80319c-b7ad6b7169203331-01",
"tracestate": "vendor1=opaque",
"baggage": "userId=alice"
},
"commandId": "math.add",
"request": { "a": 1, "b": 2 }
}),
);
}
#[test]
fn meta_extras_round_trip_via_flatten() {
let incoming = json!({
"type": "event",
"id": "77777777-7777-7777-7777-777777777777",
"_meta": {
"traceparent": "00-0af7651916cd43dd8448eb211c80319c-b7ad6b7169203331-01",
"x-tenant-id": "acme",
"retry-count": 2,
"routing": { "region": "eu-west-1", "shard": 7 }
},
"eventId": "user.created",
"payload": { "userId": "u1" }
});
let parsed: Message = serde_json::from_value(incoming.clone()).unwrap();
let meta = parsed.meta().expect("_meta present");
assert_eq!(
meta.traceparent.as_deref(),
Some("00-0af7651916cd43dd8448eb211c80319c-b7ad6b7169203331-01")
);
assert_eq!(meta.extra.get("x-tenant-id"), Some(&json!("acme")));
assert_eq!(meta.extra.get("retry-count"), Some(&json!(2)));
assert_eq!(
meta.extra.get("routing"),
Some(&json!({ "region": "eu-west-1", "shard": 7 }))
);
let reserialized = serde_json::to_value(&parsed).unwrap();
assert_eq!(reserialized, incoming);
}
#[test]
fn meta_absent_deserializes_to_none() {
let incoming = json!({
"type": "list.commands.request",
"id": "33333333-3333-3333-3333-333333333333"
});
let parsed: Message = serde_json::from_value(incoming).unwrap();
assert!(parsed.meta().is_none());
}
}