rsmq_async/
multiplexed_facade.rs

1use crate::functions::{CachedScript, RsmqFunctions};
2use crate::r#trait::RsmqConnection;
3use crate::types::{RedisBytes, RsmqMessage, RsmqOptions, RsmqQueueAttributes};
4use crate::RsmqResult;
5use core::convert::TryFrom;
6use core::marker::PhantomData;
7use std::time::Duration;
8
9#[derive(Clone)]
10struct RedisConnection(redis::aio::MultiplexedConnection);
11
12impl std::fmt::Debug for RedisConnection {
13    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
14        write!(f, "MultiplexedRedisAsyncConnnection")
15    }
16}
17
18#[derive(Debug, Clone)]
19pub struct Rsmq {
20    connection: RedisConnection,
21    functions: RsmqFunctions<redis::aio::MultiplexedConnection>,
22    scripts: CachedScript,
23}
24
25impl Rsmq {
26    /// Creates a new RSMQ instance, including its connection
27    pub async fn new(options: RsmqOptions) -> RsmqResult<Rsmq> {
28        let conn_info = redis::ConnectionInfo {
29            addr: redis::ConnectionAddr::Tcp(options.host, options.port),
30            redis: redis::RedisConnectionInfo {
31                db: options.db.into(),
32                username: options.username,
33                password: options.password,
34                protocol: options.protocol,
35            },
36        };
37
38        let client = redis::Client::open(conn_info)?;
39
40        let connection = client.get_multiplexed_async_connection().await?;
41
42        Rsmq::new_with_connection(connection, options.realtime, Some(&options.ns)).await
43    }
44
45    /// Special method for when you already have a redis-rs connection and you don't want redis_async to create a new one.
46    pub async fn new_with_connection(
47        mut connection: redis::aio::MultiplexedConnection,
48        realtime: bool,
49        ns: Option<&str>,
50    ) -> RsmqResult<Rsmq> {
51        let functions = RsmqFunctions {
52            ns: ns.unwrap_or("rsmq").to_string(),
53            realtime,
54            conn: PhantomData,
55        };
56
57        let scripts = functions.load_scripts(&mut connection).await?;
58
59        Ok(Rsmq {
60            connection: RedisConnection(connection),
61            functions,
62            scripts,
63        })
64    }
65}
66
67impl RsmqConnection for Rsmq {
68    async fn change_message_visibility(
69        &mut self,
70        qname: &str,
71        message_id: &str,
72        hidden: Duration,
73    ) -> RsmqResult<()> {
74        self.functions
75            .change_message_visibility(
76                &mut self.connection.0,
77                qname,
78                message_id,
79                hidden,
80                &self.scripts,
81            )
82            .await
83    }
84
85    async fn create_queue(
86        &mut self,
87        qname: &str,
88        hidden: Option<Duration>,
89        delay: Option<Duration>,
90        maxsize: Option<i32>,
91    ) -> RsmqResult<()> {
92        self.functions
93            .create_queue(&mut self.connection.0, qname, hidden, delay, maxsize)
94            .await
95    }
96
97    async fn delete_message(&mut self, qname: &str, id: &str) -> RsmqResult<bool> {
98        self.functions
99            .delete_message(&mut self.connection.0, qname, id)
100            .await
101    }
102    async fn delete_queue(&mut self, qname: &str) -> RsmqResult<()> {
103        self.functions
104            .delete_queue(&mut self.connection.0, qname)
105            .await
106    }
107    async fn get_queue_attributes(&mut self, qname: &str) -> RsmqResult<RsmqQueueAttributes> {
108        self.functions
109            .get_queue_attributes(&mut self.connection.0, qname)
110            .await
111    }
112
113    async fn list_queues(&mut self) -> RsmqResult<Vec<String>> {
114        self.functions.list_queues(&mut self.connection.0).await
115    }
116
117    async fn pop_message<E: TryFrom<RedisBytes, Error = Vec<u8>>>(
118        &mut self,
119        qname: &str,
120    ) -> RsmqResult<Option<RsmqMessage<E>>> {
121        self.functions
122            .pop_message::<E>(&mut self.connection.0, qname, &self.scripts)
123            .await
124    }
125
126    async fn receive_message<E: TryFrom<RedisBytes, Error = Vec<u8>>>(
127        &mut self,
128        qname: &str,
129        hidden: Option<Duration>,
130    ) -> RsmqResult<Option<RsmqMessage<E>>> {
131        self.functions
132            .receive_message::<E>(&mut self.connection.0, qname, hidden, &self.scripts)
133            .await
134    }
135
136    async fn send_message<E: Into<RedisBytes> + Send>(
137        &mut self,
138        qname: &str,
139        message: E,
140        delay: Option<Duration>,
141    ) -> RsmqResult<String> {
142        self.functions
143            .send_message(&mut self.connection.0, qname, message, delay)
144            .await
145    }
146
147    async fn set_queue_attributes(
148        &mut self,
149        qname: &str,
150        hidden: Option<Duration>,
151        delay: Option<Duration>,
152        maxsize: Option<i64>,
153    ) -> RsmqResult<RsmqQueueAttributes> {
154        self.functions
155            .set_queue_attributes(&mut self.connection.0, qname, hidden, delay, maxsize)
156            .await
157    }
158}