use serde::{Serialize, de::DeserializeOwned};
use std::time::Duration;
use wae_types::{WaeError, WaeResult};
pub type MessageId = String;
pub type QueueName = String;
#[derive(Debug, Clone, Default)]
pub struct MessageMetadata {
pub id: Option<MessageId>,
pub correlation_id: Option<String>,
pub reply_to: Option<QueueName>,
pub content_type: Option<String>,
pub timestamp: Option<u64>,
pub priority: Option<u8>,
pub expiration: Option<u64>,
pub headers: std::collections::HashMap<String, String>,
}
#[derive(Debug, Clone)]
pub struct RawMessage {
pub data: Vec<u8>,
pub metadata: MessageMetadata,
}
impl RawMessage {
pub fn new(data: Vec<u8>) -> Self {
Self { data, metadata: MessageMetadata::default() }
}
pub fn with_correlation_id(mut self, id: impl Into<String>) -> Self {
self.metadata.correlation_id = Some(id.into());
self
}
pub fn with_reply_to(mut self, queue: impl Into<String>) -> Self {
self.metadata.reply_to = Some(queue.into());
self
}
pub fn with_priority(mut self, priority: u8) -> Self {
self.metadata.priority = Some(priority.min(9));
self
}
pub fn with_expiration(mut self, ms: u64) -> Self {
self.metadata.expiration = Some(ms);
self
}
pub fn with_header(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
self.metadata.headers.insert(key.into(), value.into());
self
}
}
#[derive(Debug, Clone)]
pub struct Message<T> {
pub payload: T,
pub metadata: MessageMetadata,
}
impl<T> Message<T> {
pub fn new(payload: T) -> Self {
Self { payload, metadata: MessageMetadata::default() }
}
pub fn with_correlation_id(mut self, id: impl Into<String>) -> Self {
self.metadata.correlation_id = Some(id.into());
self
}
pub fn with_reply_to(mut self, queue: impl Into<String>) -> Self {
self.metadata.reply_to = Some(queue.into());
self
}
pub fn with_priority(mut self, priority: u8) -> Self {
self.metadata.priority = Some(priority.min(9));
self
}
pub fn with_expiration(mut self, ms: u64) -> Self {
self.metadata.expiration = Some(ms);
self
}
pub fn with_header(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
self.metadata.headers.insert(key.into(), value.into());
self
}
pub fn into_raw(self) -> WaeResult<RawMessage>
where
T: Serialize,
{
let data = serde_json::to_vec(&self.payload).map_err(|_e| WaeError::serialization_failed("Message"))?;
Ok(RawMessage { data, metadata: self.metadata })
}
pub fn to_raw(&self) -> WaeResult<RawMessage>
where
T: Serialize,
{
let data = serde_json::to_vec(&self.payload).map_err(|_e| WaeError::serialization_failed("Message"))?;
Ok(RawMessage { data, metadata: self.metadata.clone() })
}
}
impl RawMessage {
pub fn into_typed<T: DeserializeOwned>(self) -> WaeResult<Message<T>> {
let payload = serde_json::from_slice(&self.data).map_err(|_e| WaeError::deserialization_failed("Message"))?;
Ok(Message { payload, metadata: self.metadata })
}
}
#[derive(Debug)]
pub struct ReceivedRawMessage {
pub message: RawMessage,
pub delivery_tag: u64,
pub redelivery_count: u32,
}
#[derive(Debug)]
pub struct ReceivedMessage<T> {
pub message: Message<T>,
pub delivery_tag: u64,
pub redelivery_count: u32,
}
#[derive(Debug, Clone)]
pub struct QueueConfig {
pub name: QueueName,
pub durable: bool,
pub auto_delete: bool,
pub max_messages: Option<u64>,
pub max_message_size: Option<u64>,
pub message_ttl: Option<u64>,
pub dead_letter_queue: Option<QueueName>,
}
impl QueueConfig {
pub fn new(name: impl Into<String>) -> Self {
Self {
name: name.into(),
durable: true,
auto_delete: false,
max_messages: None,
max_message_size: None,
message_ttl: None,
dead_letter_queue: None,
}
}
pub fn durable(mut self, durable: bool) -> Self {
self.durable = durable;
self
}
pub fn auto_delete(mut self, auto_delete: bool) -> Self {
self.auto_delete = auto_delete;
self
}
pub fn max_messages(mut self, max: u64) -> Self {
self.max_messages = Some(max);
self
}
pub fn message_ttl(mut self, ttl_ms: u64) -> Self {
self.message_ttl = Some(ttl_ms);
self
}
pub fn dead_letter_queue(mut self, queue: impl Into<String>) -> Self {
self.dead_letter_queue = Some(queue.into());
self
}
}
#[derive(Debug, Clone)]
pub struct ProducerConfig {
pub default_queue: Option<QueueName>,
pub confirm_timeout: Duration,
pub retry_count: u32,
pub retry_interval: Duration,
}
impl Default for ProducerConfig {
fn default() -> Self {
Self {
default_queue: None,
confirm_timeout: Duration::from_secs(5),
retry_count: 3,
retry_interval: Duration::from_millis(100),
}
}
}
#[derive(Debug, Clone)]
pub struct ConsumerConfig {
pub queue: QueueName,
pub consumer_tag: Option<String>,
pub auto_ack: bool,
pub prefetch_count: u16,
pub exclusive: bool,
}
impl ConsumerConfig {
pub fn new(queue: impl Into<String>) -> Self {
Self { queue: queue.into(), consumer_tag: None, auto_ack: false, prefetch_count: 10, exclusive: false }
}
pub fn auto_ack(mut self, auto_ack: bool) -> Self {
self.auto_ack = auto_ack;
self
}
pub fn prefetch(mut self, count: u16) -> Self {
self.prefetch_count = count;
self
}
}