general_mq/
lib.rs

1//! General purposed interfaces for message queues. Now we provide the following implementations:
2//!
3//! - AMQP 0-9-1
4//! - MQTT
5//!
6//! By using these classes, you can configure queues with the following properties:
7//!
8//! - Unicast or broadcast.
9//! - Reliable or best-effort.
10//!
11//! **Notes**
12//!
13//! - MQTT uses **shared queues** to implement unicast.
14//! - AMQP uses **confirm channels** to implement reliable publish, and MQTT uses **QoS 1** to
15//!   implement reliable publish/subscribe.
16//!
17//! # Relationships of Connections and Queues
18//!
19//! The term **connection** describes a TCP/TLS connection to the message broker.
20//! The term **queue** describes a message queue or a topic within a connection.
21//! You can use one connection to manage multiple queues, or one connection to manage one queue.
22//!
23//! A queue can only be a receiver or a sender at a time.
24//!
25//! ### Connections for sender/receiver queues with the same name
26//!
27//! The sender and the receiver are usually different programs, there are two connections to hold
28//! two queues.
29//!
30//! For the special case that a program acts both the sender and the receiver using the same queue:
31//!
32//! - The AMQP implementation uses one **Channel** for one queue, so the program can manages all
33//!   queues with one connection.
34//! - The MQTT implementation **MUST** uses one connection for one queue, or both sender and
35//!   receiver will receive packets.
36//!
37//! # Test
38//!
39//! Please prepare a [RabbitMQ](https://www.rabbitmq.com/) broker and a [EMQX](https://emqx.io/)
40//! broker at **localhost** for testing.
41//!
42//! - To install using Docker:
43//!
44//!       $ docker run --rm --name rabbitmq -d -p 5672:5672 rabbitmq:management-alpine
45//!       $ docker run --rm --name emqx -d -p 1883:1883 emqx/emqx
46//!
47//! Then run the test:
48//!
49//!     $ cargo test --test integration_test -- --nocapture
50//!
51//! # Example
52//!
53//! Run RabbitMQ and then run AMQP example:
54//!
55//!     $ cargo run --example simple
56//!
57//! Run EMQX and then run MQTT example:
58//!
59//!     $ RUN_MQTT= cargo run --example simple
60
61use std::{error::Error as StdError, fmt, sync::Arc};
62
63use async_trait::async_trait;
64use rand::{Rng, distr::Alphanumeric};
65
66pub mod connection;
67pub mod queue;
68
69mod amqp;
70mod mqtt;
71
72pub use amqp::{AmqpConnection, AmqpConnectionOptions, AmqpQueue, AmqpQueueOptions};
73pub use mqtt::{MqttConnection, MqttConnectionOptions, MqttQueue, MqttQueueOptions};
74use queue::{EventHandler, GmqQueue, MessageHandler, Status};
75
76/// general-mq error.
77#[derive(Clone, Debug)]
78pub enum Error {
79    /// The queue does not have [`MessageHandler`].
80    NoMsgHandler,
81    /// The connection is not connected or the queue (topic) is not
82    /// connected (declared/subscribed).
83    NotConnected,
84    /// The queue is a receiver that cannot send messages.
85    QueueIsReceiver,
86}
87
88#[derive(Clone)]
89pub enum Queue {
90    Amqp(AmqpQueue),
91    Mqtt(MqttQueue),
92}
93
94#[derive(Clone)]
95pub enum QueueOptions<'a> {
96    Amqp(AmqpQueueOptions, &'a AmqpConnection),
97    Mqtt(MqttQueueOptions, &'a MqttConnection),
98}
99
100/// Identifier length of inner handlers.
101pub(crate) const ID_SIZE: usize = 24;
102
103impl fmt::Display for Error {
104    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
105        match *self {
106            Error::NoMsgHandler => write!(f, "no message handler"),
107            Error::NotConnected => write!(f, "not connected"),
108            Error::QueueIsReceiver => write!(f, "this queue is a receiver"),
109        }
110    }
111}
112
113impl StdError for Error {}
114
115impl Queue {
116    pub fn new(opts: QueueOptions) -> Result<Self, String> {
117        match opts {
118            QueueOptions::Amqp(opts, conn) => Ok(Queue::Amqp(AmqpQueue::new(opts, conn)?)),
119            QueueOptions::Mqtt(opts, conn) => Ok(Queue::Mqtt(MqttQueue::new(opts, conn)?)),
120        }
121    }
122}
123
124#[async_trait]
125impl GmqQueue for Queue {
126    fn name(&self) -> &str {
127        match self {
128            Queue::Amqp(q) => q.name(),
129            Queue::Mqtt(q) => q.name(),
130        }
131    }
132
133    fn is_recv(&self) -> bool {
134        match self {
135            Queue::Amqp(q) => q.is_recv(),
136            Queue::Mqtt(q) => q.is_recv(),
137        }
138    }
139
140    fn status(&self) -> Status {
141        match self {
142            Queue::Amqp(q) => q.status(),
143            Queue::Mqtt(q) => q.status(),
144        }
145    }
146
147    fn set_handler(&mut self, handler: Arc<dyn EventHandler>) {
148        match self {
149            Queue::Amqp(q) => q.set_handler(handler),
150            Queue::Mqtt(q) => q.set_handler(handler),
151        }
152    }
153
154    fn clear_handler(&mut self) {
155        match self {
156            Queue::Amqp(q) => q.clear_handler(),
157            Queue::Mqtt(q) => q.clear_handler(),
158        }
159    }
160
161    fn set_msg_handler(&mut self, handler: Arc<dyn MessageHandler>) {
162        match self {
163            Queue::Amqp(q) => q.set_msg_handler(handler),
164            Queue::Mqtt(q) => q.set_msg_handler(handler),
165        }
166    }
167
168    fn connect(&mut self) -> Result<(), Box<dyn StdError>> {
169        match self {
170            Queue::Amqp(q) => q.connect(),
171            Queue::Mqtt(q) => q.connect(),
172        }
173    }
174
175    async fn close(&mut self) -> Result<(), Box<dyn StdError + Send + Sync>> {
176        match self {
177            Queue::Amqp(q) => q.close().await,
178            Queue::Mqtt(q) => q.close().await,
179        }
180    }
181
182    async fn send_msg(&self, payload: Vec<u8>) -> Result<(), Box<dyn StdError + Send + Sync>> {
183        match self {
184            Queue::Amqp(q) => q.send_msg(payload).await,
185            Queue::Mqtt(q) => q.send_msg(payload).await,
186        }
187    }
188}
189
190/// Generate random alphanumeric with the specified length.
191pub fn randomstring(len: usize) -> String {
192    let mut rng = rand::rng();
193    std::iter::repeat(())
194        .map(|()| rng.sample(Alphanumeric))
195        .map(char::from)
196        .take(len)
197        .collect()
198}