1use serde::{Deserialize, Serialize};
7use std::collections::HashMap;
8use std::fmt;
9use uuid::Uuid;
10
11#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
16#[serde(tag = "type", content = "data")]
17pub enum MessageContent {
18 Binary(Vec<u8>),
20
21 Text(String),
23
24 Json(serde_json::Value),
26
27 AgentRequest {
29 method: String,
30 params: serde_json::Value,
31 },
32
33 AgentResponse { result: serde_json::Value },
35
36 ToolCall {
38 name: String,
39 arguments: serde_json::Value,
40 },
41
42 ToolResult {
44 output: String,
45 error: Option<String>,
46 },
47
48 Empty,
50}
51
52impl MessageContent {
53 pub fn type_name(&self) -> &'static str {
55 match self {
56 MessageContent::Binary(_) => "binary",
57 MessageContent::Text(_) => "text",
58 MessageContent::Json(_) => "json",
59 MessageContent::AgentRequest { .. } => "agent_request",
60 MessageContent::AgentResponse { .. } => "agent_response",
61 MessageContent::ToolCall { .. } => "tool_call",
62 MessageContent::ToolResult { .. } => "tool_result",
63 MessageContent::Empty => "empty",
64 }
65 }
66
67 pub fn is_empty(&self) -> bool {
69 matches!(self, MessageContent::Empty)
70 }
71}
72
73#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
77#[serde(tag = "type", content = "target")]
78pub enum MessageTarget {
79 Agent(String),
81
82 Broadcast,
84
85 Topic(String),
87
88 Node {
90 node_id: String,
91 agent: Option<String>,
92 },
93}
94
95impl MessageTarget {
96 pub fn agent(name: impl Into<String>) -> Self {
98 MessageTarget::Agent(name.into())
99 }
100
101 pub fn broadcast() -> Self {
103 MessageTarget::Broadcast
104 }
105
106 pub fn topic(name: impl Into<String>) -> Self {
108 MessageTarget::Topic(name.into())
109 }
110}
111
112impl fmt::Display for MessageTarget {
113 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
114 match self {
115 MessageTarget::Agent(name) => write!(f, "agent:{}", name),
116 MessageTarget::Broadcast => write!(f, "broadcast"),
117 MessageTarget::Topic(topic) => write!(f, "topic:{}", topic),
118 MessageTarget::Node { node_id, agent } => {
119 if let Some(agent_name) = agent {
120 write!(f, "node:{}@{}", agent_name, node_id)
121 } else {
122 write!(f, "node:{}", node_id)
123 }
124 }
125 }
126 }
127}
128
129#[derive(Debug, Clone, Serialize, Deserialize)]
134pub struct LegacyMessage {
135 pub id: String,
137 pub topic: String,
139 pub payload: Vec<u8>,
141 pub sender: String,
143 pub metadata: HashMap<String, String>,
145 pub created_at: i64,
147}
148
149impl LegacyMessage {
150 pub fn new(topic: impl Into<String>, payload: Vec<u8>, sender: impl Into<String>) -> Self {
152 Self {
153 id: Uuid::new_v4().to_string(),
154 topic: topic.into(),
155 payload,
156 sender: sender.into(),
157 metadata: HashMap::new(),
158 created_at: chrono::Utc::now().timestamp_micros(),
159 }
160 }
161
162 pub fn correlation_id(&self) -> Option<String> {
164 self.metadata.get("correlation_id").cloned()
165 }
166
167 pub fn set_correlation_id(&mut self, id: &str) {
169 self.metadata
170 .insert("correlation_id".to_string(), id.to_string());
171 }
172}
173
174#[derive(Debug, Clone, Serialize, Deserialize)]
197pub struct Message {
198 pub id: String,
200
201 pub version: u8,
203
204 pub from: String,
206
207 pub to: MessageTarget,
209
210 pub content: MessageContent,
212
213 pub correlation_id: Option<String>,
215
216 pub topic: Option<String>,
218
219 pub metadata: HashMap<String, String>,
221
222 pub created_at: i64,
224}
225
226impl Message {
227 pub fn builder(from: impl Into<String>) -> MessageBuilder {
239 MessageBuilder::new(from)
240 }
241
242 pub fn text(from: impl Into<String>, content: impl Into<String>) -> MessageBuilder {
244 MessageBuilder::new(from).content(MessageContent::Text(content.into()))
245 }
246
247 pub fn json(from: impl Into<String>, data: serde_json::Value) -> MessageBuilder {
249 MessageBuilder::new(from).content(MessageContent::Json(data))
250 }
251
252 pub fn binary(from: impl Into<String>, data: Vec<u8>) -> MessageBuilder {
254 MessageBuilder::new(from).content(MessageContent::Binary(data))
255 }
256
257 pub fn tool_call(
259 from: impl Into<String>,
260 name: impl Into<String>,
261 arguments: serde_json::Value,
262 ) -> MessageBuilder {
263 MessageBuilder::new(from).content(MessageContent::ToolCall {
264 name: name.into(),
265 arguments,
266 })
267 }
268
269 pub fn tool_result(
271 from: impl Into<String>,
272 output: impl Into<String>,
273 error: Option<String>,
274 ) -> MessageBuilder {
275 MessageBuilder::new(from).content(MessageContent::ToolResult {
276 output: output.into(),
277 error,
278 })
279 }
280
281 pub fn agent_request(
283 from: impl Into<String>,
284 method: impl Into<String>,
285 params: serde_json::Value,
286 ) -> MessageBuilder {
287 MessageBuilder::new(from).content(MessageContent::AgentRequest {
288 method: method.into(),
289 params,
290 })
291 }
292
293 pub fn agent_response(from: impl Into<String>, result: serde_json::Value) -> MessageBuilder {
295 MessageBuilder::new(from).content(MessageContent::AgentResponse { result })
296 }
297
298 pub fn empty(from: impl Into<String>) -> MessageBuilder {
300 MessageBuilder::new(from).content(MessageContent::Empty)
301 }
302}
303
304pub struct MessageBuilder {
306 id: String,
307 version: u8,
308 from: String,
309 to: Option<MessageTarget>,
310 content: Option<MessageContent>,
311 correlation_id: Option<String>,
312 topic: Option<String>,
313 metadata: HashMap<String, String>,
314}
315
316impl MessageBuilder {
317 fn new(from: impl Into<String>) -> Self {
318 Self {
319 id: Uuid::new_v4().to_string(),
320 version: 1,
321 from: from.into(),
322 to: None,
323 content: None,
324 correlation_id: None,
325 topic: None,
326 metadata: HashMap::new(),
327 }
328 }
329
330 pub fn id(mut self, id: impl Into<String>) -> Self {
332 self.id = id.into();
333 self
334 }
335
336 pub fn to(mut self, target: MessageTarget) -> Self {
338 self.to = Some(target);
339 self
340 }
341
342 pub fn content(mut self, content: MessageContent) -> Self {
344 self.content = Some(content);
345 self
346 }
347
348 pub fn correlation_id(mut self, id: impl Into<String>) -> Self {
350 self.correlation_id = Some(id.into());
351 self
352 }
353
354 pub fn topic(mut self, topic: impl Into<String>) -> Self {
356 self.topic = Some(topic.into());
357 self
358 }
359
360 pub fn metadata(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
362 self.metadata.insert(key.into(), value.into());
363 self
364 }
365
366 pub fn build(self) -> Message {
371 Message {
372 id: self.id,
373 version: self.version,
374 from: self.from,
375 to: self.to.expect("Message target must be set"),
376 content: self.content.expect("Message content must be set"),
377 correlation_id: self.correlation_id,
378 topic: self.topic,
379 metadata: self.metadata,
380 created_at: chrono::Utc::now().timestamp_micros(),
381 }
382 }
383}
384
385impl From<LegacyMessage> for Message {
388 fn from(legacy: LegacyMessage) -> Self {
390 let content = if let Some(content_type) = legacy.metadata.get("content_type") {
392 match content_type.as_str() {
393 "text" => String::from_utf8(legacy.payload.clone())
394 .map(MessageContent::Text)
395 .unwrap_or_else(|_| MessageContent::Binary(legacy.payload.clone())),
396 "json" => serde_json::from_slice(&legacy.payload)
397 .map(MessageContent::Json)
398 .unwrap_or_else(|_| MessageContent::Binary(legacy.payload.clone())),
399 _ => MessageContent::Binary(legacy.payload.clone()),
400 }
401 } else {
402 String::from_utf8(legacy.payload.clone())
404 .map(MessageContent::Text)
405 .unwrap_or_else(|_| MessageContent::Binary(legacy.payload.clone()))
406 };
407
408 let to = legacy
410 .metadata
411 .get("target")
412 .map(|t| MessageTarget::Agent(t.clone()))
413 .unwrap_or(MessageTarget::Broadcast);
414
415 Message {
416 id: legacy.id,
417 version: 1,
418 from: legacy.sender,
419 to,
420 content,
421 correlation_id: legacy.metadata.get("correlation_id").cloned(),
422 topic: Some(legacy.topic),
423 metadata: legacy.metadata,
424 created_at: legacy.created_at,
425 }
426 }
427}
428
429impl TryFrom<Message> for LegacyMessage {
430 type Error = crate::error::Error;
431
432 fn try_from(msg: Message) -> Result<Self, Self::Error> {
434 use crate::error::Error;
435
436 let (topic, payload, content_type) = match msg.content {
437 MessageContent::Text(text) => ("text".to_string(), text.into_bytes(), Some("text")),
438 MessageContent::Json(value) => (
439 "json".to_string(),
440 serde_json::to_vec(&value).map_err(|e| {
441 Error::SerializationError(format!("Failed to serialize JSON: {}", e))
442 })?,
443 Some("json"),
444 ),
445 MessageContent::Binary(data) => ("binary".to_string(), data, None),
446 _ => {
447 return Err(Error::ConversionError(
448 "Unsupported content type for legacy format".to_string(),
449 ))
450 }
451 };
452
453 let mut metadata = msg.metadata;
454 if let Some(corr_id) = msg.correlation_id {
455 metadata.insert("correlation_id".to_string(), corr_id);
456 }
457 if let MessageTarget::Agent(agent) = msg.to {
458 metadata.insert("target".to_string(), agent);
459 }
460 if let Some(ct) = content_type {
461 metadata.insert("content_type".to_string(), ct.to_string());
462 }
463
464 Ok(LegacyMessage {
465 id: msg.id,
466 topic: msg.topic.unwrap_or(topic),
467 payload,
468 sender: msg.from,
469 metadata,
470 created_at: msg.created_at,
471 })
472 }
473}
474
475#[derive(Debug, Clone, Serialize, Deserialize)]
479pub struct GenericMessage {
480 pub content: String,
482 pub metadata: std::collections::HashMap<String, String>,
484}
485
486impl GenericMessage {
487 pub fn new(content: impl Into<String>) -> Self {
489 Self {
490 content: content.into(),
491 metadata: std::collections::HashMap::new(),
492 }
493 }
494}
495
496#[derive(Debug, Clone, Serialize, Deserialize)]
498pub struct GenericResponse {
499 pub content: String,
501}
502
503impl GenericResponse {
504 pub fn new(content: impl Into<String>) -> Self {
506 Self {
507 content: content.into(),
508 }
509 }
510}
511
512#[derive(Debug, Clone, Serialize, Deserialize)]
516pub struct Envelope {
517 pub message: Message,
519 pub target_agent: Option<String>,
521 pub target_node: Option<String>,
523}