use crate::error::QueueError;
use crate::message::{
Message, MessageId, QueueName, ReceiptHandle, ReceivedMessage, SessionId, Timestamp,
};
use crate::provider::{InMemoryConfig, ProviderConfig, ProviderType, QueueConfig, SessionSupport};
use crate::providers::InMemoryProvider;
use async_trait::async_trait;
use chrono::Duration;
#[cfg(test)]
#[path = "client_tests.rs"]
mod tests;
#[async_trait]
pub trait QueueClient: Send + Sync {
async fn send_message(
&self,
queue: &QueueName,
message: Message,
) -> Result<MessageId, QueueError>;
async fn send_messages(
&self,
queue: &QueueName,
messages: Vec<Message>,
) -> Result<Vec<MessageId>, QueueError>;
async fn receive_message(
&self,
queue: &QueueName,
timeout: Duration,
) -> Result<Option<ReceivedMessage>, QueueError>;
async fn receive_messages(
&self,
queue: &QueueName,
max_messages: u32,
timeout: Duration,
) -> Result<Vec<ReceivedMessage>, QueueError>;
async fn complete_message(&self, receipt: ReceiptHandle) -> Result<(), QueueError>;
async fn abandon_message(&self, receipt: ReceiptHandle) -> Result<(), QueueError>;
async fn dead_letter_message(
&self,
receipt: ReceiptHandle,
reason: String,
) -> Result<(), QueueError>;
async fn accept_session(
&self,
queue: &QueueName,
session_id: Option<SessionId>,
) -> Result<Box<dyn SessionClient>, QueueError>;
fn provider_type(&self) -> ProviderType;
fn supports_sessions(&self) -> bool;
fn supports_batching(&self) -> bool;
}
#[async_trait]
pub trait SessionClient: Send + Sync {
async fn receive_message(
&self,
timeout: Duration,
) -> Result<Option<ReceivedMessage>, QueueError>;
async fn complete_message(&self, receipt: ReceiptHandle) -> Result<(), QueueError>;
async fn abandon_message(&self, receipt: ReceiptHandle) -> Result<(), QueueError>;
async fn dead_letter_message(
&self,
receipt: ReceiptHandle,
reason: String,
) -> Result<(), QueueError>;
async fn renew_session_lock(&self) -> Result<(), QueueError>;
async fn close_session(&self) -> Result<(), QueueError>;
fn session_id(&self) -> &SessionId;
fn session_expires_at(&self) -> Timestamp;
}
#[async_trait]
pub trait QueueProvider: Send + Sync {
async fn send_message(
&self,
queue: &QueueName,
message: &Message,
) -> Result<MessageId, QueueError>;
async fn send_messages(
&self,
queue: &QueueName,
messages: &[Message],
) -> Result<Vec<MessageId>, QueueError>;
async fn receive_message(
&self,
queue: &QueueName,
timeout: Duration,
) -> Result<Option<ReceivedMessage>, QueueError>;
async fn receive_messages(
&self,
queue: &QueueName,
max_messages: u32,
timeout: Duration,
) -> Result<Vec<ReceivedMessage>, QueueError>;
async fn complete_message(&self, receipt: &ReceiptHandle) -> Result<(), QueueError>;
async fn abandon_message(&self, receipt: &ReceiptHandle) -> Result<(), QueueError>;
async fn dead_letter_message(
&self,
receipt: &ReceiptHandle,
reason: &str,
) -> Result<(), QueueError>;
async fn create_session_client(
&self,
queue: &QueueName,
session_id: Option<SessionId>,
) -> Result<Box<dyn SessionProvider>, QueueError>;
fn provider_type(&self) -> ProviderType;
fn supports_sessions(&self) -> SessionSupport;
fn supports_batching(&self) -> bool;
fn max_batch_size(&self) -> u32;
}
#[async_trait]
pub trait SessionProvider: Send + Sync {
async fn receive_message(
&self,
timeout: Duration,
) -> Result<Option<ReceivedMessage>, QueueError>;
async fn complete_message(&self, receipt: &ReceiptHandle) -> Result<(), QueueError>;
async fn abandon_message(&self, receipt: &ReceiptHandle) -> Result<(), QueueError>;
async fn dead_letter_message(
&self,
receipt: &ReceiptHandle,
reason: &str,
) -> Result<(), QueueError>;
async fn renew_session_lock(&self) -> Result<(), QueueError>;
async fn close_session(&self) -> Result<(), QueueError>;
fn session_id(&self) -> &SessionId;
fn session_expires_at(&self) -> Timestamp;
}
pub struct QueueClientFactory;
impl QueueClientFactory {
pub async fn create_client(config: QueueConfig) -> Result<Box<dyn QueueClient>, QueueError> {
let client_config = config.clone();
let provider: Box<dyn QueueProvider> = match config.provider {
ProviderConfig::InMemory(in_memory_config) => {
Box::new(InMemoryProvider::new(in_memory_config))
}
ProviderConfig::AzureServiceBus(azure_config) => {
let azure_provider = crate::providers::AzureServiceBusProvider::new(azure_config)
.await
.map_err(|e| e.to_queue_error())?;
Box::new(azure_provider)
}
ProviderConfig::AwsSqs(aws_config) => {
let aws_provider = crate::providers::AwsSqsProvider::new(aws_config)
.await
.map_err(|e| e.to_queue_error())?;
Box::new(aws_provider)
}
ProviderConfig::RabbitMq(rmq_config) => {
let rmq_provider = crate::providers::RabbitMqProvider::new(rmq_config)
.await
.map_err(|e| e.to_queue_error())?;
Box::new(rmq_provider)
}
ProviderConfig::Nats(nats_config) => {
let nats_provider = crate::providers::NatsProvider::new(nats_config)
.await
.map_err(|e| e.to_queue_error())?;
Box::new(nats_provider)
}
};
Ok(Box::new(StandardQueueClient::new(provider, client_config)))
}
pub fn create_test_client() -> Box<dyn QueueClient> {
let provider = InMemoryProvider::new(InMemoryConfig::default());
let config = QueueConfig::default();
Box::new(StandardQueueClient::new(Box::new(provider), config))
}
}
pub struct StandardQueueClient {
provider: Box<dyn QueueProvider>,
#[allow(dead_code)] config: QueueConfig,
}
impl StandardQueueClient {
pub fn new(provider: Box<dyn QueueProvider>, config: QueueConfig) -> Self {
Self { provider, config }
}
}
#[async_trait]
impl QueueClient for StandardQueueClient {
async fn send_message(
&self,
queue: &QueueName,
message: Message,
) -> Result<MessageId, QueueError> {
self.provider.send_message(queue, &message).await
}
async fn send_messages(
&self,
queue: &QueueName,
messages: Vec<Message>,
) -> Result<Vec<MessageId>, QueueError> {
self.provider.send_messages(queue, &messages).await
}
async fn receive_message(
&self,
queue: &QueueName,
timeout: Duration,
) -> Result<Option<ReceivedMessage>, QueueError> {
self.provider.receive_message(queue, timeout).await
}
async fn receive_messages(
&self,
queue: &QueueName,
max_messages: u32,
timeout: Duration,
) -> Result<Vec<ReceivedMessage>, QueueError> {
self.provider
.receive_messages(queue, max_messages, timeout)
.await
}
async fn complete_message(&self, receipt: ReceiptHandle) -> Result<(), QueueError> {
self.provider.complete_message(&receipt).await
}
async fn abandon_message(&self, receipt: ReceiptHandle) -> Result<(), QueueError> {
self.provider.abandon_message(&receipt).await
}
async fn dead_letter_message(
&self,
receipt: ReceiptHandle,
reason: String,
) -> Result<(), QueueError> {
self.provider.dead_letter_message(&receipt, &reason).await
}
async fn accept_session(
&self,
queue: &QueueName,
session_id: Option<SessionId>,
) -> Result<Box<dyn SessionClient>, QueueError> {
let session_provider = self
.provider
.create_session_client(queue, session_id)
.await?;
Ok(Box::new(StandardSessionClient::new(session_provider)))
}
fn provider_type(&self) -> ProviderType {
self.provider.provider_type()
}
fn supports_sessions(&self) -> bool {
matches!(
self.provider.supports_sessions(),
SessionSupport::Native | SessionSupport::Emulated
)
}
fn supports_batching(&self) -> bool {
self.provider.supports_batching()
}
}
struct StandardSessionClient {
provider: Box<dyn SessionProvider>,
}
impl StandardSessionClient {
fn new(provider: Box<dyn SessionProvider>) -> Self {
Self { provider }
}
}
#[async_trait]
impl SessionClient for StandardSessionClient {
async fn receive_message(
&self,
timeout: Duration,
) -> Result<Option<ReceivedMessage>, QueueError> {
self.provider.receive_message(timeout).await
}
async fn complete_message(&self, receipt: ReceiptHandle) -> Result<(), QueueError> {
self.provider.complete_message(&receipt).await
}
async fn abandon_message(&self, receipt: ReceiptHandle) -> Result<(), QueueError> {
self.provider.abandon_message(&receipt).await
}
async fn dead_letter_message(
&self,
receipt: ReceiptHandle,
reason: String,
) -> Result<(), QueueError> {
self.provider.dead_letter_message(&receipt, &reason).await
}
async fn renew_session_lock(&self) -> Result<(), QueueError> {
self.provider.renew_session_lock().await
}
async fn close_session(&self) -> Result<(), QueueError> {
self.provider.close_session().await
}
fn session_id(&self) -> &SessionId {
self.provider.session_id()
}
fn session_expires_at(&self) -> Timestamp {
self.provider.session_expires_at()
}
}