use std::collections::HashMap;
use std::fmt;
use std::sync::Arc;
use std::time::Duration;
use async_trait::async_trait;
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use uuid::Uuid;
pub mod config;
pub mod error;
#[cfg(feature = "rabbitmq")]
pub mod rabbitmq;
#[cfg(feature = "kafka")]
pub mod kafka;
#[cfg(feature = "nats")]
pub mod nats;
#[cfg(feature = "aws")]
pub mod aws;
pub use config::*;
pub use error::*;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Message {
pub id: String,
pub payload: Vec<u8>,
pub headers: HashMap<String, String>,
pub topic: String,
pub timestamp: DateTime<Utc>,
pub correlation_id: Option<String>,
pub reply_to: Option<String>,
pub content_type: Option<String>,
pub priority: Option<u8>,
pub ttl: Option<u64>,
}
impl Message {
pub fn new<T: Into<Vec<u8>>>(topic: impl Into<String>, payload: T) -> Self {
Self {
id: Uuid::new_v4().to_string(),
payload: payload.into(),
headers: HashMap::new(),
topic: topic.into(),
timestamp: Utc::now(),
correlation_id: None,
reply_to: None,
content_type: None,
priority: None,
ttl: None,
}
}
pub fn json<T: Serialize>(topic: impl Into<String>, value: &T) -> Result<Self, MessagingError> {
let payload =
serde_json::to_vec(value).map_err(|e| MessagingError::Serialization(e.to_string()))?;
let mut msg = Self::new(topic, payload);
msg.content_type = Some("application/json".to_string());
Ok(msg)
}
pub fn parse_json<T: for<'de> Deserialize<'de>>(&self) -> Result<T, MessagingError> {
serde_json::from_slice(&self.payload)
.map_err(|e| MessagingError::Deserialization(e.to_string()))
}
pub fn payload_str(&self) -> Result<&str, MessagingError> {
std::str::from_utf8(&self.payload)
.map_err(|e| MessagingError::Deserialization(e.to_string()))
}
pub fn with_header(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
self.headers.insert(key.into(), value.into());
self
}
pub fn with_correlation_id(mut self, id: impl Into<String>) -> Self {
self.correlation_id = Some(id.into());
self
}
pub fn with_reply_to(mut self, reply_to: impl Into<String>) -> Self {
self.reply_to = Some(reply_to.into());
self
}
pub fn with_content_type(mut self, content_type: impl Into<String>) -> Self {
self.content_type = Some(content_type.into());
self
}
pub fn with_priority(mut self, priority: u8) -> Self {
self.priority = Some(priority.min(9));
self
}
pub fn with_ttl(mut self, ttl_ms: u64) -> Self {
self.ttl = Some(ttl_ms);
self
}
pub fn with_ttl_duration(mut self, ttl: Duration) -> Self {
self.ttl = Some(ttl.as_millis() as u64);
self
}
}
impl fmt::Display for Message {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(
f,
"Message {{ id: {}, topic: {}, size: {} bytes }}",
self.id,
self.topic,
self.payload.len()
)
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
pub enum AckMode {
#[default]
Auto,
Manual,
None,
}
#[derive(Debug, Clone)]
pub enum ProcessingResult {
Success,
Retry,
DeadLetter,
Reject,
}
#[async_trait]
pub trait MessageHandler: Send + Sync + 'static {
async fn handle(&self, message: Message) -> Result<ProcessingResult, MessagingError>;
async fn on_deserialize_error(&self, _error: &MessagingError) -> ProcessingResult {
ProcessingResult::DeadLetter
}
}
pub struct FnHandler<F>(pub F);
#[async_trait]
impl<F, Fut> MessageHandler for FnHandler<F>
where
F: Fn(Message) -> Fut + Send + Sync + 'static,
Fut: std::future::Future<Output = Result<ProcessingResult, MessagingError>> + Send,
{
async fn handle(&self, message: Message) -> Result<ProcessingResult, MessagingError> {
(self.0)(message).await
}
}
#[derive(Debug, Clone, Default)]
pub struct PublishOptions {
pub confirm: bool,
pub timeout: Option<Duration>,
pub persistent: bool,
pub routing_key: Option<String>,
pub exchange: Option<String>,
pub partition_key: Option<String>,
}
impl PublishOptions {
pub fn persistent() -> Self {
Self {
persistent: true,
confirm: true,
..Default::default()
}
}
pub fn with_routing_key(mut self, key: impl Into<String>) -> Self {
self.routing_key = Some(key.into());
self
}
pub fn with_exchange(mut self, exchange: impl Into<String>) -> Self {
self.exchange = Some(exchange.into());
self
}
pub fn with_partition_key(mut self, key: impl Into<String>) -> Self {
self.partition_key = Some(key.into());
self
}
pub fn with_confirm(mut self, timeout: Duration) -> Self {
self.confirm = true;
self.timeout = Some(timeout);
self
}
}
#[derive(Debug, Clone, Default)]
pub struct SubscribeOptions {
pub consumer_group: Option<String>,
pub prefetch_count: Option<u16>,
pub ack_mode: AckMode,
pub from_beginning: bool,
pub filter: Option<String>,
pub concurrency: Option<usize>,
}
impl SubscribeOptions {
pub fn with_consumer_group(mut self, group: impl Into<String>) -> Self {
self.consumer_group = Some(group.into());
self
}
pub fn with_prefetch(mut self, count: u16) -> Self {
self.prefetch_count = Some(count);
self
}
pub fn with_ack_mode(mut self, mode: AckMode) -> Self {
self.ack_mode = mode;
self
}
pub fn from_beginning(mut self) -> Self {
self.from_beginning = true;
self
}
pub fn with_concurrency(mut self, concurrency: usize) -> Self {
self.concurrency = Some(concurrency);
self
}
}
#[async_trait]
pub trait Subscription: Send + Sync {
async fn unsubscribe(&self) -> Result<(), MessagingError>;
fn is_active(&self) -> bool;
fn topic(&self) -> &str;
}
#[async_trait]
pub trait MessageBroker: Send + Sync {
type Subscription: Subscription;
async fn publish(&self, message: Message) -> Result<(), MessagingError>;
async fn publish_with_options(
&self,
message: Message,
options: PublishOptions,
) -> Result<(), MessagingError>;
async fn subscribe(
&self,
topic: &str,
handler: Arc<dyn MessageHandler>,
) -> Result<Self::Subscription, MessagingError>;
async fn subscribe_with_options(
&self,
topic: &str,
handler: Arc<dyn MessageHandler>,
options: SubscribeOptions,
) -> Result<Self::Subscription, MessagingError>;
fn is_connected(&self) -> bool;
async fn close(&self) -> Result<(), MessagingError>;
}
pub struct MessagingBuilder {
pub config: MessagingConfig,
}
impl MessagingBuilder {
pub fn new(config: MessagingConfig) -> Self {
Self { config }
}
#[cfg(feature = "rabbitmq")]
pub async fn build_rabbitmq(self) -> Result<rabbitmq::RabbitMqBroker, MessagingError> {
rabbitmq::RabbitMqBroker::connect(&self.config).await
}
#[cfg(feature = "kafka")]
pub async fn build_kafka(self) -> Result<kafka::KafkaBroker, MessagingError> {
kafka::KafkaBroker::connect(&self.config).await
}
#[cfg(feature = "nats")]
pub async fn build_nats(self) -> Result<nats::NatsBroker, MessagingError> {
nats::NatsBroker::connect(&self.config).await
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_message_creation() {
let msg = Message::new("test-topic", b"hello world".to_vec());
assert_eq!(msg.topic, "test-topic");
assert_eq!(msg.payload, b"hello world");
assert!(!msg.id.is_empty());
}
#[test]
fn test_message_json() {
#[derive(Serialize, Deserialize, Debug, PartialEq)]
struct TestData {
name: String,
value: i32,
}
let data = TestData {
name: "test".to_string(),
value: 42,
};
let msg = Message::json("test-topic", &data).unwrap();
assert_eq!(msg.content_type, Some("application/json".to_string()));
let parsed: TestData = msg.parse_json().unwrap();
assert_eq!(parsed, data);
}
#[test]
fn test_message_builder() {
let msg = Message::new("topic", b"data".to_vec())
.with_header("key", "value")
.with_correlation_id("corr-123")
.with_reply_to("reply-queue")
.with_priority(5)
.with_ttl(60000);
assert_eq!(msg.headers.get("key"), Some(&"value".to_string()));
assert_eq!(msg.correlation_id, Some("corr-123".to_string()));
assert_eq!(msg.reply_to, Some("reply-queue".to_string()));
assert_eq!(msg.priority, Some(5));
assert_eq!(msg.ttl, Some(60000));
}
#[test]
fn test_publish_options() {
let opts = PublishOptions::persistent()
.with_routing_key("my.routing.key")
.with_exchange("my-exchange")
.with_confirm(Duration::from_secs(5));
assert!(opts.persistent);
assert!(opts.confirm);
assert_eq!(opts.routing_key, Some("my.routing.key".to_string()));
assert_eq!(opts.exchange, Some("my-exchange".to_string()));
assert_eq!(opts.timeout, Some(Duration::from_secs(5)));
}
#[test]
fn test_subscribe_options() {
let opts = SubscribeOptions::default()
.with_consumer_group("my-group")
.with_prefetch(10)
.with_ack_mode(AckMode::Manual)
.with_concurrency(4);
assert_eq!(opts.consumer_group, Some("my-group".to_string()));
assert_eq!(opts.prefetch_count, Some(10));
assert_eq!(opts.ack_mode, AckMode::Manual);
assert_eq!(opts.concurrency, Some(4));
}
}