use async_trait::async_trait;
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use std::fmt::Debug;
use std::sync::Arc;
use crate::MessageProperties;
use crate::error::WorkerResult;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Message<T> {
pub id: String,
pub payload: T,
pub metadata: MessageMetadata,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct MessageMetadata {
pub received_at: DateTime<Utc>,
pub attempt: u32,
pub source: String,
pub correlation_id: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub routing_key: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub properties: Option<MessageProperties>,
}
impl MessageMetadata {
pub fn new(source: impl Into<String>) -> Self {
Self {
received_at: Utc::now(),
attempt: 0, source: source.into(),
correlation_id: None,
routing_key: None,
properties: None,
}
}
pub fn with_correlation_id(mut self, correlation_id: impl Into<String>) -> Self {
self.correlation_id = Some(correlation_id.into());
self
}
pub fn with_routing_key(mut self, routing_key: impl Into<String>) -> Self {
self.routing_key = Some(routing_key.into());
self
}
pub fn with_properties(mut self, properties: MessageProperties) -> Self {
self.properties = Some(properties);
self
}
pub fn increment_attempt(&mut self) {
self.attempt += 1;
}
}
#[async_trait]
pub trait AckHandle: Send + Sync + Debug {
async fn ack(&self) -> WorkerResult<()>;
async fn nack(&self, requeue: bool) -> WorkerResult<()>;
}
pub struct ReceivedMessage<T> {
pub message: Message<T>,
pub ack_handle: Arc<dyn AckHandle>,
}
impl<T: Send + Sync> ReceivedMessage<T> {
pub fn new(message: Message<T>, ack_handle: Arc<dyn AckHandle>) -> Self {
Self {
message,
ack_handle,
}
}
pub async fn ack(&self) -> WorkerResult<()> {
self.ack_handle.ack().await
}
pub async fn nack(&self, requeue: bool) -> WorkerResult<()> {
self.ack_handle.nack(requeue).await
}
pub fn into_message(self) -> Message<T> {
self.message
}
}
impl<T: Clone + Send + Sync> Clone for ReceivedMessage<T> {
fn clone(&self) -> Self {
Self {
message: self.message.clone(),
ack_handle: self.ack_handle.clone(),
}
}
}
impl<T: Debug> Debug for ReceivedMessage<T> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("ReceivedMessage")
.field("message", &self.message)
.field("ack_handle", &"<AckHandle>")
.finish()
}
}
pub type JsonMessage = Message<serde_json::Value>;
pub type ReceivedJsonMessage = ReceivedMessage<serde_json::Value>;
#[cfg(test)]
mod tests {
use super::*;
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
#[derive(Debug)]
struct MockAckHandle {
acked: Arc<AtomicBool>,
nacked: Arc<AtomicBool>,
}
impl MockAckHandle {
fn new() -> (Self, Arc<AtomicBool>, Arc<AtomicBool>) {
let acked = Arc::new(AtomicBool::new(false));
let nacked = Arc::new(AtomicBool::new(false));
(
Self {
acked: acked.clone(),
nacked: nacked.clone(),
},
acked,
nacked,
)
}
}
#[async_trait]
impl AckHandle for MockAckHandle {
async fn ack(&self) -> WorkerResult<()> {
self.acked.store(true, Ordering::SeqCst);
Ok(())
}
async fn nack(&self, _requeue: bool) -> WorkerResult<()> {
self.nacked.store(true, Ordering::SeqCst);
Ok(())
}
}
#[tokio::test]
async fn test_message_creation() {
let message = Message {
id: "test-1".to_string(),
payload: "test data",
metadata: MessageMetadata::new("test-queue"),
};
assert_eq!(message.id, "test-1");
assert_eq!(message.payload, "test data");
assert_eq!(message.metadata.attempt, 0); }
#[tokio::test]
async fn test_received_message_ack() {
let (ack_handle, acked, _) = MockAckHandle::new();
let message = Message {
id: "test-1".to_string(),
payload: "test data",
metadata: MessageMetadata::new("test-queue"),
};
let received = ReceivedMessage::new(message, Arc::new(ack_handle));
received.ack().await.unwrap();
assert!(acked.load(Ordering::SeqCst));
}
#[tokio::test]
async fn test_received_message_nack() {
let (ack_handle, _, nacked) = MockAckHandle::new();
let message = Message {
id: "test-1".to_string(),
payload: "test data",
metadata: MessageMetadata::new("test-queue"),
};
let received = ReceivedMessage::new(message, Arc::new(ack_handle));
received.nack(true).await.unwrap();
assert!(nacked.load(Ordering::SeqCst));
}
#[tokio::test]
async fn test_metadata_with_correlation_id() {
let metadata = MessageMetadata::new("test-queue").with_correlation_id("corr-123");
assert_eq!(metadata.correlation_id, Some("corr-123".to_string()));
}
#[tokio::test]
async fn test_metadata_increment_attempt() {
let mut metadata = MessageMetadata::new("test-queue");
assert_eq!(metadata.attempt, 0);
metadata.increment_attempt();
assert_eq!(metadata.attempt, 1); }
}