brainwires_network/network/
envelope.rs1use chrono::{DateTime, Utc};
2use serde::{Deserialize, Serialize};
3use uuid::Uuid;
4
5#[derive(Debug, Clone, Serialize, Deserialize)]
10pub struct MessageEnvelope {
11 pub id: Uuid,
13 pub sender: Uuid,
15 pub recipient: MessageTarget,
17 pub payload: Payload,
19 pub timestamp: DateTime<Utc>,
21 pub ttl: Option<u32>,
23 pub correlation_id: Option<Uuid>,
25 pub trace_id: Option<Uuid>,
30}
31
32impl MessageEnvelope {
33 pub fn direct(sender: Uuid, recipient: Uuid, payload: impl Into<Payload>) -> Self {
35 Self {
36 id: Uuid::new_v4(),
37 sender,
38 recipient: MessageTarget::Direct(recipient),
39 payload: payload.into(),
40 timestamp: Utc::now(),
41 ttl: None,
42 correlation_id: None,
43 trace_id: None,
44 }
45 }
46
47 pub fn broadcast(sender: Uuid, payload: impl Into<Payload>) -> Self {
49 Self {
50 id: Uuid::new_v4(),
51 sender,
52 recipient: MessageTarget::Broadcast,
53 payload: payload.into(),
54 timestamp: Utc::now(),
55 ttl: None,
56 correlation_id: None,
57 trace_id: None,
58 }
59 }
60
61 pub fn topic(sender: Uuid, topic: impl Into<String>, payload: impl Into<Payload>) -> Self {
63 Self {
64 id: Uuid::new_v4(),
65 sender,
66 recipient: MessageTarget::Topic(topic.into()),
67 payload: payload.into(),
68 timestamp: Utc::now(),
69 ttl: None,
70 correlation_id: None,
71 trace_id: None,
72 }
73 }
74
75 pub fn with_ttl(mut self, ttl: u32) -> Self {
77 self.ttl = Some(ttl);
78 self
79 }
80
81 pub fn with_correlation(mut self, correlation_id: Uuid) -> Self {
83 self.correlation_id = Some(correlation_id);
84 self
85 }
86
87 pub fn reply(&self, sender: Uuid, payload: impl Into<Payload>) -> Self {
92 Self {
93 id: Uuid::new_v4(),
94 sender,
95 recipient: MessageTarget::Direct(self.sender),
96 payload: payload.into(),
97 timestamp: Utc::now(),
98 ttl: None,
99 correlation_id: Some(self.id),
100 trace_id: self.trace_id,
101 }
102 }
103
104 pub fn with_trace(mut self, trace_id: Uuid) -> Self {
106 self.trace_id = Some(trace_id);
107 self
108 }
109}
110
111#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
113pub enum MessageTarget {
114 Direct(Uuid),
116 Broadcast,
118 Topic(String),
120}
121
122#[derive(Debug, Clone, Serialize, Deserialize)]
124pub enum Payload {
125 Json(serde_json::Value),
127 #[serde(with = "base64_bytes")]
129 Binary(Vec<u8>),
130 Text(String),
132}
133
134impl From<serde_json::Value> for Payload {
135 fn from(v: serde_json::Value) -> Self {
136 Payload::Json(v)
137 }
138}
139
140impl From<String> for Payload {
141 fn from(s: String) -> Self {
142 Payload::Text(s)
143 }
144}
145
146impl From<&str> for Payload {
147 fn from(s: &str) -> Self {
148 Payload::Text(s.to_string())
149 }
150}
151
152impl From<Vec<u8>> for Payload {
153 fn from(b: Vec<u8>) -> Self {
154 Payload::Binary(b)
155 }
156}
157
158mod base64_bytes {
160 use base64::Engine;
161 use base64::engine::general_purpose::STANDARD;
162 use serde::{Deserialize, Deserializer, Serialize, Serializer};
163
164 pub fn serialize<S: Serializer>(bytes: &Vec<u8>, s: S) -> Result<S::Ok, S::Error> {
165 STANDARD.encode(bytes).serialize(s)
166 }
167
168 pub fn deserialize<'de, D: Deserializer<'de>>(d: D) -> Result<Vec<u8>, D::Error> {
169 let encoded = String::deserialize(d)?;
170 STANDARD.decode(&encoded).map_err(serde::de::Error::custom)
171 }
172}
173
174#[cfg(test)]
175mod tests {
176 use super::*;
177
178 #[test]
179 fn direct_envelope_fields() {
180 let sender = Uuid::new_v4();
181 let recipient = Uuid::new_v4();
182 let env = MessageEnvelope::direct(sender, recipient, "hello");
183
184 assert_eq!(env.sender, sender);
185 assert_eq!(env.recipient, MessageTarget::Direct(recipient));
186 assert!(env.ttl.is_none());
187 assert!(env.correlation_id.is_none());
188 }
189
190 #[test]
191 fn broadcast_envelope() {
192 let sender = Uuid::new_v4();
193 let env = MessageEnvelope::broadcast(sender, "ping");
194 assert_eq!(env.recipient, MessageTarget::Broadcast);
195 }
196
197 #[test]
198 fn topic_envelope() {
199 let sender = Uuid::new_v4();
200 let env = MessageEnvelope::topic(sender, "status-updates", "agent online");
201 assert_eq!(env.recipient, MessageTarget::Topic("status-updates".into()));
202 }
203
204 #[test]
205 fn reply_sets_correlation() {
206 let sender_a = Uuid::new_v4();
207 let sender_b = Uuid::new_v4();
208 let original = MessageEnvelope::direct(sender_a, sender_b, "request");
209 let reply = original.reply(sender_b, "response");
210
211 assert_eq!(reply.sender, sender_b);
212 assert_eq!(reply.recipient, MessageTarget::Direct(sender_a));
213 assert_eq!(reply.correlation_id, Some(original.id));
214 }
215
216 #[test]
217 fn with_ttl() {
218 let env = MessageEnvelope::broadcast(Uuid::new_v4(), "test").with_ttl(5);
219 assert_eq!(env.ttl, Some(5));
220 }
221
222 #[test]
223 fn envelope_serde_roundtrip() {
224 let env = MessageEnvelope::direct(Uuid::new_v4(), Uuid::new_v4(), "hello");
225 let json = serde_json::to_string(&env).unwrap();
226 let deserialized: MessageEnvelope = serde_json::from_str(&json).unwrap();
227 assert_eq!(deserialized.id, env.id);
228 assert_eq!(deserialized.sender, env.sender);
229 }
230
231 #[test]
232 fn binary_payload_serde_roundtrip() {
233 let env =
234 MessageEnvelope::direct(Uuid::new_v4(), Uuid::new_v4(), vec![0xDE, 0xAD, 0xBE, 0xEF]);
235 let json = serde_json::to_string(&env).unwrap();
236 let deserialized: MessageEnvelope = serde_json::from_str(&json).unwrap();
237 match deserialized.payload {
238 Payload::Binary(bytes) => assert_eq!(bytes, vec![0xDE, 0xAD, 0xBE, 0xEF]),
239 _ => panic!("expected Binary payload"),
240 }
241 }
242
243 #[test]
244 fn json_payload_from_value() {
245 let payload: Payload = serde_json::json!({"key": "value"}).into();
246 match payload {
247 Payload::Json(v) => assert_eq!(v["key"], "value"),
248 _ => panic!("expected Json payload"),
249 }
250 }
251}