1use std::{error::Error as StdError, fmt, sync::Arc};
62
63use async_trait::async_trait;
64use rand::{Rng, distr::Alphanumeric};
65
66pub mod connection;
67pub mod queue;
68
69mod amqp;
70mod mqtt;
71
72pub use amqp::{AmqpConnection, AmqpConnectionOptions, AmqpQueue, AmqpQueueOptions};
73pub use mqtt::{MqttConnection, MqttConnectionOptions, MqttQueue, MqttQueueOptions};
74use queue::{EventHandler, GmqQueue, MessageHandler, Status};
75
76#[derive(Clone, Debug)]
78pub enum Error {
79 NoMsgHandler,
81 NotConnected,
84 QueueIsReceiver,
86}
87
88#[derive(Clone)]
89pub enum Queue {
90 Amqp(AmqpQueue),
91 Mqtt(MqttQueue),
92}
93
94#[derive(Clone)]
95pub enum QueueOptions<'a> {
96 Amqp(AmqpQueueOptions, &'a AmqpConnection),
97 Mqtt(MqttQueueOptions, &'a MqttConnection),
98}
99
100pub(crate) const ID_SIZE: usize = 24;
102
103impl fmt::Display for Error {
104 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
105 match *self {
106 Error::NoMsgHandler => write!(f, "no message handler"),
107 Error::NotConnected => write!(f, "not connected"),
108 Error::QueueIsReceiver => write!(f, "this queue is a receiver"),
109 }
110 }
111}
112
113impl StdError for Error {}
114
115impl Queue {
116 pub fn new(opts: QueueOptions) -> Result<Self, String> {
117 match opts {
118 QueueOptions::Amqp(opts, conn) => Ok(Queue::Amqp(AmqpQueue::new(opts, conn)?)),
119 QueueOptions::Mqtt(opts, conn) => Ok(Queue::Mqtt(MqttQueue::new(opts, conn)?)),
120 }
121 }
122}
123
124#[async_trait]
125impl GmqQueue for Queue {
126 fn name(&self) -> &str {
127 match self {
128 Queue::Amqp(q) => q.name(),
129 Queue::Mqtt(q) => q.name(),
130 }
131 }
132
133 fn is_recv(&self) -> bool {
134 match self {
135 Queue::Amqp(q) => q.is_recv(),
136 Queue::Mqtt(q) => q.is_recv(),
137 }
138 }
139
140 fn status(&self) -> Status {
141 match self {
142 Queue::Amqp(q) => q.status(),
143 Queue::Mqtt(q) => q.status(),
144 }
145 }
146
147 fn set_handler(&mut self, handler: Arc<dyn EventHandler>) {
148 match self {
149 Queue::Amqp(q) => q.set_handler(handler),
150 Queue::Mqtt(q) => q.set_handler(handler),
151 }
152 }
153
154 fn clear_handler(&mut self) {
155 match self {
156 Queue::Amqp(q) => q.clear_handler(),
157 Queue::Mqtt(q) => q.clear_handler(),
158 }
159 }
160
161 fn set_msg_handler(&mut self, handler: Arc<dyn MessageHandler>) {
162 match self {
163 Queue::Amqp(q) => q.set_msg_handler(handler),
164 Queue::Mqtt(q) => q.set_msg_handler(handler),
165 }
166 }
167
168 fn connect(&mut self) -> Result<(), Box<dyn StdError>> {
169 match self {
170 Queue::Amqp(q) => q.connect(),
171 Queue::Mqtt(q) => q.connect(),
172 }
173 }
174
175 async fn close(&mut self) -> Result<(), Box<dyn StdError + Send + Sync>> {
176 match self {
177 Queue::Amqp(q) => q.close().await,
178 Queue::Mqtt(q) => q.close().await,
179 }
180 }
181
182 async fn send_msg(&self, payload: Vec<u8>) -> Result<(), Box<dyn StdError + Send + Sync>> {
183 match self {
184 Queue::Amqp(q) => q.send_msg(payload).await,
185 Queue::Mqtt(q) => q.send_msg(payload).await,
186 }
187 }
188}
189
190pub fn randomstring(len: usize) -> String {
192 let mut rng = rand::rng();
193 std::iter::repeat(())
194 .map(|()| rng.sample(Alphanumeric))
195 .map(char::from)
196 .take(len)
197 .collect()
198}