use serde::{Deserialize, Serialize};
use uuid::Uuid;
use chrono::{DateTime, Utc};
use std::collections::HashMap;
use crate::error::{RabbitMeshError, Result};
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
#[serde(tag = "type")]
pub enum MessageType {
Request,
Response,
Event,
Ping,
Pong,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Message {
pub id: Uuid,
pub message_type: MessageType,
pub from: String,
pub to: Option<String>,
pub method: String,
pub payload: serde_json::Value,
pub correlation_id: Option<Uuid>,
pub timestamp: DateTime<Utc>,
pub metadata: HashMap<String, String>,
pub retry_count: u32,
}
impl Message {
pub fn new_request(
from: impl Into<String>,
to: impl Into<String>,
method: impl Into<String>,
payload: impl Serialize,
) -> Result<Self> {
let id = Uuid::new_v4();
Ok(Self {
id,
message_type: MessageType::Request,
from: from.into(),
to: Some(to.into()),
method: method.into(),
payload: serde_json::to_value(payload)?,
correlation_id: Some(id), timestamp: Utc::now(),
metadata: HashMap::new(),
retry_count: 0,
})
}
pub fn new_response(
request: &Message,
from: impl Into<String>,
payload: impl Serialize,
) -> Result<Self> {
Ok(Self {
id: Uuid::new_v4(),
message_type: MessageType::Response,
from: from.into(),
to: Some(request.from.clone()),
method: request.method.clone(),
payload: serde_json::to_value(payload)?,
correlation_id: request.correlation_id,
timestamp: Utc::now(),
metadata: HashMap::new(),
retry_count: 0,
})
}
pub fn new_event(
from: impl Into<String>,
method: impl Into<String>,
payload: impl Serialize,
) -> Result<Self> {
Ok(Self {
id: Uuid::new_v4(),
message_type: MessageType::Event,
from: from.into(),
to: None, method: method.into(),
payload: serde_json::to_value(payload)?,
correlation_id: None,
timestamp: Utc::now(),
metadata: HashMap::new(),
retry_count: 0,
})
}
pub fn new_ping(from: impl Into<String>, to: impl Into<String>) -> Self {
Self {
id: Uuid::new_v4(),
message_type: MessageType::Ping,
from: from.into(),
to: Some(to.into()),
method: "ping".to_string(),
payload: serde_json::Value::Null,
correlation_id: None,
timestamp: Utc::now(),
metadata: HashMap::new(),
retry_count: 0,
}
}
pub fn new_pong(ping: &Message, from: impl Into<String>) -> Self {
Self {
id: Uuid::new_v4(),
message_type: MessageType::Pong,
from: from.into(),
to: Some(ping.from.clone()),
method: "pong".to_string(),
payload: serde_json::Value::Null,
correlation_id: ping.correlation_id,
timestamp: Utc::now(),
metadata: HashMap::new(),
retry_count: 0,
}
}
pub fn to_bytes(&self) -> Result<Vec<u8>> {
Ok(serde_json::to_vec(self)?)
}
pub fn from_bytes(bytes: &[u8]) -> Result<Self> {
Ok(serde_json::from_slice(bytes)?)
}
pub fn deserialize_payload<T: for<'de> Deserialize<'de>>(&self) -> Result<T> {
Ok(serde_json::from_value(self.payload.clone())?)
}
pub fn with_metadata(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
self.metadata.insert(key.into(), value.into());
self
}
pub fn get_metadata(&self, key: &str) -> Option<&String> {
self.metadata.get(key)
}
pub fn is_request(&self) -> bool {
matches!(self.message_type, MessageType::Request)
}
pub fn is_response(&self) -> bool {
matches!(self.message_type, MessageType::Response)
}
pub fn is_event(&self) -> bool {
matches!(self.message_type, MessageType::Event)
}
pub fn with_retry(mut self) -> Self {
self.retry_count += 1;
self.timestamp = Utc::now();
self
}
pub fn age_ms(&self) -> i64 {
(Utc::now() - self.timestamp).num_milliseconds()
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RpcRequest {
pub service: String,
pub method: String,
pub params: serde_json::Value,
pub timeout_ms: Option<u64>,
}
impl RpcRequest {
pub fn new(
service: impl Into<String>,
method: impl Into<String>,
params: impl Serialize,
) -> Result<Self> {
Ok(Self {
service: service.into(),
method: method.into(),
params: serde_json::to_value(params)?,
timeout_ms: None,
})
}
pub fn with_timeout(mut self, timeout_ms: u64) -> Self {
self.timeout_ms = Some(timeout_ms);
self
}
pub fn into_message(self, from: impl Into<String>) -> Result<Message> {
Message::new_request(from, self.service, self.method, self.params)
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "status")]
pub enum RpcResponse {
Success {
data: serde_json::Value,
processing_time_ms: u64,
},
Error {
error: String,
code: Option<String>,
details: Option<serde_json::Value>,
},
}
impl RpcResponse {
pub fn success(data: impl Serialize, processing_time_ms: u64) -> Result<Self> {
Ok(Self::Success {
data: serde_json::to_value(data)?,
processing_time_ms,
})
}
pub fn error(error: impl Into<String>) -> Self {
Self::Error {
error: error.into(),
code: None,
details: None,
}
}
pub fn error_detailed(
error: impl Into<String>,
code: impl Into<String>,
details: impl Serialize,
) -> Result<Self> {
Ok(Self::Error {
error: error.into(),
code: Some(code.into()),
details: Some(serde_json::to_value(details)?),
})
}
pub fn is_success(&self) -> bool {
matches!(self, Self::Success { .. })
}
pub fn is_error(&self) -> bool {
matches!(self, Self::Error { .. })
}
pub fn data<T: for<'de> Deserialize<'de>>(&self) -> Result<T> {
match self {
Self::Success { data, .. } => Ok(serde_json::from_value(data.clone())?),
Self::Error { error, .. } => Err(RabbitMeshError::handler_error(error)),
}
}
pub fn into_message(self, request: &Message, from: impl Into<String>) -> Result<Message> {
Message::new_response(request, from, self)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_message_serialization() {
let message = Message::new_request("service-a", "service-b", "get_user", 123).unwrap();
let bytes = message.to_bytes().unwrap();
let deserialized = Message::from_bytes(&bytes).unwrap();
assert_eq!(message.id, deserialized.id);
assert_eq!(message.message_type, deserialized.message_type);
assert_eq!(message.from, deserialized.from);
assert_eq!(message.to, deserialized.to);
assert_eq!(message.method, deserialized.method);
}
#[test]
fn test_rpc_response() {
let success = RpcResponse::success("data", 42).unwrap();
assert!(success.is_success());
assert!(!success.is_error());
let error = RpcResponse::error("Something went wrong");
assert!(!error.is_success());
assert!(error.is_error());
}
#[test]
fn test_message_metadata() {
let message = Message::new_request("a", "b", "method", ()).unwrap()
.with_metadata("trace-id", "abc-123")
.with_metadata("user-id", "user-456");
assert_eq!(message.get_metadata("trace-id"), Some(&"abc-123".to_string()));
assert_eq!(message.get_metadata("user-id"), Some(&"user-456".to_string()));
assert_eq!(message.get_metadata("nonexistent"), None);
}
}