use std::{error::Error as StdError, fmt, sync::Arc};
use async_trait::async_trait;
use rand::{RngExt, distr::Alphanumeric};
pub mod connection;
pub mod queue;
mod amqp;
mod mqtt;
pub use amqp::{AmqpConnection, AmqpConnectionOptions, AmqpQueue, AmqpQueueOptions};
pub use mqtt::{MqttConnection, MqttConnectionOptions, MqttQueue, MqttQueueOptions};
use queue::{EventHandler, GmqQueue, MessageHandler, Status};
#[derive(Clone, Debug)]
pub enum Error {
NoMsgHandler,
NotConnected,
QueueIsReceiver,
}
#[derive(Clone)]
pub enum Queue {
Amqp(AmqpQueue),
Mqtt(MqttQueue),
}
#[derive(Clone)]
pub enum QueueOptions<'a> {
Amqp(AmqpQueueOptions, &'a AmqpConnection),
Mqtt(MqttQueueOptions, &'a MqttConnection),
}
pub(crate) const ID_SIZE: usize = 24;
impl fmt::Display for Error {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match *self {
Error::NoMsgHandler => write!(f, "no message handler"),
Error::NotConnected => write!(f, "not connected"),
Error::QueueIsReceiver => write!(f, "this queue is a receiver"),
}
}
}
impl StdError for Error {}
impl Queue {
pub fn new(opts: QueueOptions) -> Result<Self, String> {
match opts {
QueueOptions::Amqp(opts, conn) => Ok(Queue::Amqp(AmqpQueue::new(opts, conn)?)),
QueueOptions::Mqtt(opts, conn) => Ok(Queue::Mqtt(MqttQueue::new(opts, conn)?)),
}
}
}
#[async_trait]
impl GmqQueue for Queue {
fn name(&self) -> &str {
match self {
Queue::Amqp(q) => q.name(),
Queue::Mqtt(q) => q.name(),
}
}
fn is_recv(&self) -> bool {
match self {
Queue::Amqp(q) => q.is_recv(),
Queue::Mqtt(q) => q.is_recv(),
}
}
fn status(&self) -> Status {
match self {
Queue::Amqp(q) => q.status(),
Queue::Mqtt(q) => q.status(),
}
}
fn set_handler(&mut self, handler: Arc<dyn EventHandler>) {
match self {
Queue::Amqp(q) => q.set_handler(handler),
Queue::Mqtt(q) => q.set_handler(handler),
}
}
fn clear_handler(&mut self) {
match self {
Queue::Amqp(q) => q.clear_handler(),
Queue::Mqtt(q) => q.clear_handler(),
}
}
fn set_msg_handler(&mut self, handler: Arc<dyn MessageHandler>) {
match self {
Queue::Amqp(q) => q.set_msg_handler(handler),
Queue::Mqtt(q) => q.set_msg_handler(handler),
}
}
fn connect(&mut self) -> Result<(), Box<dyn StdError>> {
match self {
Queue::Amqp(q) => q.connect(),
Queue::Mqtt(q) => q.connect(),
}
}
async fn close(&mut self) -> Result<(), Box<dyn StdError + Send + Sync>> {
match self {
Queue::Amqp(q) => q.close().await,
Queue::Mqtt(q) => q.close().await,
}
}
async fn send_msg(&self, payload: Vec<u8>) -> Result<(), Box<dyn StdError + Send + Sync>> {
match self {
Queue::Amqp(q) => q.send_msg(payload).await,
Queue::Mqtt(q) => q.send_msg(payload).await,
}
}
}
pub fn randomstring(len: usize) -> String {
let mut rng = rand::rng();
std::iter::repeat(())
.map(|()| rng.sample(Alphanumeric))
.map(char::from)
.take(len)
.collect()
}