use anyhow::Result;
use async_trait::async_trait;
use crate::{
channel::{Channel, ChannelFactory, ChannelType},
common::get_random_routing_key,
queue::{
Connection, DeliveryMode, QueueDurability, QueueHandle, QueueOptions, SyndicationMode,
},
serializer::Serializable,
};
impl From<ChannelType> for QueueOptions {
fn from(channel_type: ChannelType) -> Self {
match channel_type {
ChannelType::ExactlyOnce => QueueOptions {
syndication_mode: SyndicationMode::ExactlyOnce,
delivery_mode: DeliveryMode::Persistent,
durability: QueueDurability::NonDurable,
},
ChannelType::Broadcast => QueueOptions {
syndication_mode: SyndicationMode::Broadcast,
delivery_mode: DeliveryMode::Persistent,
durability: QueueDurability::NonDurable,
},
}
}
}
#[derive(Clone)]
pub struct QueueChannelFactory<Conn> {
connection: Conn,
}
impl<Conn> QueueChannelFactory<Conn> {
pub fn new(connection: Conn) -> Self {
Self { connection }
}
}
#[derive(Clone)]
pub struct QueueChannel<Conn> {
connection: Conn,
identifier: String,
channel_type: ChannelType,
}
#[async_trait]
impl<
QHandle: QueueHandle + Send + Sync + 'static,
Conn: Connection<QueueHandle = QHandle> + Send + Sync + 'static,
> Channel for QueueChannel<Conn>
{
type Acker = <QHandle as QueueHandle>::Acker;
type Sender<'a, T: Serializable + 'a> = <QHandle as QueueHandle>::Publisher<T>;
type Receiver<'a, T: Serializable + 'a> = <QHandle as QueueHandle>::Consumer<T>;
async fn close(&self) -> Result<()> {
self.connection.close().await?;
Ok(())
}
async fn sender<'a, T: Serializable + 'a>(&self) -> Result<Self::Sender<'a, T>> {
let queue = self
.connection
.declare_queue(&self.identifier, self.channel_type.into())
.await?;
Ok(queue.publisher())
}
async fn receiver<'a, T: Serializable + 'a>(&self) -> Result<Self::Receiver<'a, T>> {
let identifier: String = get_random_routing_key();
let queue = self
.connection
.declare_queue(&self.identifier, self.channel_type.into())
.await?;
let consumer = queue.declare_consumer(&identifier).await?;
Ok(consumer)
}
fn release(&self) {
let conn = self.connection.clone();
let identifier = self.identifier.clone();
tokio::spawn(async move {
_ = conn.delete_queue(&identifier).await;
});
}
}
#[async_trait]
impl<Conn: Connection + Send + Sync + 'static> ChannelFactory for QueueChannelFactory<Conn>
where
<Conn as Connection>::QueueHandle: Send + Sync + 'static,
{
type Channel = QueueChannel<Conn>;
async fn get(&self, identifier: String, channel_type: ChannelType) -> Result<Self::Channel> {
Ok(QueueChannel {
connection: self.connection.clone(),
identifier,
channel_type,
})
}
async fn issue(&self, channel_type: ChannelType) -> Result<(String, Self::Channel)> {
let identifier: String = get_random_routing_key();
Ok((
identifier.clone(),
QueueChannel {
connection: self.connection.clone(),
identifier,
channel_type,
},
))
}
}