1use std::{error::Error as StdError, fmt, sync::Arc};
62
63use async_trait::async_trait;
64use rand::{Rng, distr::Alphanumeric};
65
66pub mod connection;
67pub mod queue;
68
69mod amqp;
70mod mqtt;
71
72pub use amqp::{AmqpConnection, AmqpConnectionOptions, AmqpQueue, AmqpQueueOptions};
73pub use mqtt::{MqttConnection, MqttConnectionOptions, MqttQueue, MqttQueueOptions};
74use queue::{EventHandler, GmqQueue, MessageHandler, Status};
75
76#[derive(Clone, Debug)]
78pub enum Error {
79    NoMsgHandler,
81    NotConnected,
84    QueueIsReceiver,
86}
87
88#[derive(Clone)]
89pub enum Queue {
90    Amqp(AmqpQueue),
91    Mqtt(MqttQueue),
92}
93
94#[derive(Clone)]
95pub enum QueueOptions<'a> {
96    Amqp(AmqpQueueOptions, &'a AmqpConnection),
97    Mqtt(MqttQueueOptions, &'a MqttConnection),
98}
99
100pub(crate) const ID_SIZE: usize = 24;
102
103impl fmt::Display for Error {
104    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
105        match *self {
106            Error::NoMsgHandler => write!(f, "no message handler"),
107            Error::NotConnected => write!(f, "not connected"),
108            Error::QueueIsReceiver => write!(f, "this queue is a receiver"),
109        }
110    }
111}
112
113impl StdError for Error {}
114
115impl Queue {
116    pub fn new(opts: QueueOptions) -> Result<Self, String> {
117        match opts {
118            QueueOptions::Amqp(opts, conn) => Ok(Queue::Amqp(AmqpQueue::new(opts, conn)?)),
119            QueueOptions::Mqtt(opts, conn) => Ok(Queue::Mqtt(MqttQueue::new(opts, conn)?)),
120        }
121    }
122}
123
124#[async_trait]
125impl GmqQueue for Queue {
126    fn name(&self) -> &str {
127        match self {
128            Queue::Amqp(q) => q.name(),
129            Queue::Mqtt(q) => q.name(),
130        }
131    }
132
133    fn is_recv(&self) -> bool {
134        match self {
135            Queue::Amqp(q) => q.is_recv(),
136            Queue::Mqtt(q) => q.is_recv(),
137        }
138    }
139
140    fn status(&self) -> Status {
141        match self {
142            Queue::Amqp(q) => q.status(),
143            Queue::Mqtt(q) => q.status(),
144        }
145    }
146
147    fn set_handler(&mut self, handler: Arc<dyn EventHandler>) {
148        match self {
149            Queue::Amqp(q) => q.set_handler(handler),
150            Queue::Mqtt(q) => q.set_handler(handler),
151        }
152    }
153
154    fn clear_handler(&mut self) {
155        match self {
156            Queue::Amqp(q) => q.clear_handler(),
157            Queue::Mqtt(q) => q.clear_handler(),
158        }
159    }
160
161    fn set_msg_handler(&mut self, handler: Arc<dyn MessageHandler>) {
162        match self {
163            Queue::Amqp(q) => q.set_msg_handler(handler),
164            Queue::Mqtt(q) => q.set_msg_handler(handler),
165        }
166    }
167
168    fn connect(&mut self) -> Result<(), Box<dyn StdError>> {
169        match self {
170            Queue::Amqp(q) => q.connect(),
171            Queue::Mqtt(q) => q.connect(),
172        }
173    }
174
175    async fn close(&mut self) -> Result<(), Box<dyn StdError + Send + Sync>> {
176        match self {
177            Queue::Amqp(q) => q.close().await,
178            Queue::Mqtt(q) => q.close().await,
179        }
180    }
181
182    async fn send_msg(&self, payload: Vec<u8>) -> Result<(), Box<dyn StdError + Send + Sync>> {
183        match self {
184            Queue::Amqp(q) => q.send_msg(payload).await,
185            Queue::Mqtt(q) => q.send_msg(payload).await,
186        }
187    }
188}
189
190pub fn randomstring(len: usize) -> String {
192    let mut rng = rand::rng();
193    std::iter::repeat(())
194        .map(|()| rng.sample(Alphanumeric))
195        .map(char::from)
196        .take(len)
197        .collect()
198}