rsmq_async/
multiplexed_facade.rs1use crate::functions::{CachedScript, RsmqFunctions};
2use crate::r#trait::RsmqConnection;
3use crate::types::{RedisBytes, RsmqMessage, RsmqOptions, RsmqQueueAttributes};
4use crate::RsmqResult;
5use core::convert::TryFrom;
6use core::marker::PhantomData;
7use std::time::Duration;
8
9#[derive(Clone)]
10struct RedisConnection(redis::aio::MultiplexedConnection);
11
12impl std::fmt::Debug for RedisConnection {
13 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
14 write!(f, "MultiplexedRedisAsyncConnnection")
15 }
16}
17
18#[derive(Debug, Clone)]
19pub struct Rsmq {
20 connection: RedisConnection,
21 functions: RsmqFunctions<redis::aio::MultiplexedConnection>,
22 scripts: CachedScript,
23}
24
25impl Rsmq {
26 pub async fn new(options: RsmqOptions) -> RsmqResult<Rsmq> {
28 let conn_info = redis::ConnectionInfo {
29 addr: redis::ConnectionAddr::Tcp(options.host, options.port),
30 redis: redis::RedisConnectionInfo {
31 db: options.db.into(),
32 username: options.username,
33 password: options.password,
34 protocol: options.protocol,
35 },
36 };
37
38 let client = redis::Client::open(conn_info)?;
39
40 let connection = client.get_multiplexed_async_connection().await?;
41
42 Rsmq::new_with_connection(connection, options.realtime, Some(&options.ns)).await
43 }
44
45 pub async fn new_with_connection(
47 mut connection: redis::aio::MultiplexedConnection,
48 realtime: bool,
49 ns: Option<&str>,
50 ) -> RsmqResult<Rsmq> {
51 let functions = RsmqFunctions {
52 ns: ns.unwrap_or("rsmq").to_string(),
53 realtime,
54 conn: PhantomData,
55 };
56
57 let scripts = functions.load_scripts(&mut connection).await?;
58
59 Ok(Rsmq {
60 connection: RedisConnection(connection),
61 functions,
62 scripts,
63 })
64 }
65}
66
67impl RsmqConnection for Rsmq {
68 async fn change_message_visibility(
69 &mut self,
70 qname: &str,
71 message_id: &str,
72 hidden: Duration,
73 ) -> RsmqResult<()> {
74 self.functions
75 .change_message_visibility(
76 &mut self.connection.0,
77 qname,
78 message_id,
79 hidden,
80 &self.scripts,
81 )
82 .await
83 }
84
85 async fn create_queue(
86 &mut self,
87 qname: &str,
88 hidden: Option<Duration>,
89 delay: Option<Duration>,
90 maxsize: Option<i32>,
91 ) -> RsmqResult<()> {
92 self.functions
93 .create_queue(&mut self.connection.0, qname, hidden, delay, maxsize)
94 .await
95 }
96
97 async fn delete_message(&mut self, qname: &str, id: &str) -> RsmqResult<bool> {
98 self.functions
99 .delete_message(&mut self.connection.0, qname, id)
100 .await
101 }
102 async fn delete_queue(&mut self, qname: &str) -> RsmqResult<()> {
103 self.functions
104 .delete_queue(&mut self.connection.0, qname)
105 .await
106 }
107 async fn get_queue_attributes(&mut self, qname: &str) -> RsmqResult<RsmqQueueAttributes> {
108 self.functions
109 .get_queue_attributes(&mut self.connection.0, qname)
110 .await
111 }
112
113 async fn list_queues(&mut self) -> RsmqResult<Vec<String>> {
114 self.functions.list_queues(&mut self.connection.0).await
115 }
116
117 async fn pop_message<E: TryFrom<RedisBytes, Error = Vec<u8>>>(
118 &mut self,
119 qname: &str,
120 ) -> RsmqResult<Option<RsmqMessage<E>>> {
121 self.functions
122 .pop_message::<E>(&mut self.connection.0, qname, &self.scripts)
123 .await
124 }
125
126 async fn receive_message<E: TryFrom<RedisBytes, Error = Vec<u8>>>(
127 &mut self,
128 qname: &str,
129 hidden: Option<Duration>,
130 ) -> RsmqResult<Option<RsmqMessage<E>>> {
131 self.functions
132 .receive_message::<E>(&mut self.connection.0, qname, hidden, &self.scripts)
133 .await
134 }
135
136 async fn send_message<E: Into<RedisBytes> + Send>(
137 &mut self,
138 qname: &str,
139 message: E,
140 delay: Option<Duration>,
141 ) -> RsmqResult<String> {
142 self.functions
143 .send_message(&mut self.connection.0, qname, message, delay)
144 .await
145 }
146
147 async fn set_queue_attributes(
148 &mut self,
149 qname: &str,
150 hidden: Option<Duration>,
151 delay: Option<Duration>,
152 maxsize: Option<i64>,
153 ) -> RsmqResult<RsmqQueueAttributes> {
154 self.functions
155 .set_queue_attributes(&mut self.connection.0, qname, hidden, delay, maxsize)
156 .await
157 }
158}