rustis/commands/blocking_commands.rs
1use crate::{
2 client::{prepare_command, MonitorStream, PreparedCommand},
3 commands::{LMoveWhere, ZMPopResult, ZWhere},
4 resp::{cmd, deserialize_vec_of_triplets, PrimitiveResponse, SingleArg, SingleArgCollection},
5 Result,
6};
7use serde::{
8 de::{DeserializeOwned, Visitor},
9 Deserialize, Deserializer,
10};
11use std::{fmt, marker::PhantomData};
12
13/// Result for the [`bzpopmin`](BlockingCommands::bzpopmin)
14/// and [`bzpopmax`](BlockingCommands::bzpopmax) commands
15#[derive(Deserialize)]
16pub struct BZpopMinMaxResult<K, E>(
17 #[serde(deserialize_with = "deserialize_bzop_min_max_result")] pub Option<Vec<(K, E, f64)>>,
18)
19where
20 K: DeserializeOwned,
21 E: DeserializeOwned;
22
23#[allow(clippy::complexity)]
24pub fn deserialize_bzop_min_max_result<'de, D, K, V>(
25 deserializer: D,
26) -> std::result::Result<Option<Vec<(K, V, f64)>>, D::Error>
27where
28 D: Deserializer<'de>,
29 K: DeserializeOwned,
30 V: DeserializeOwned,
31{
32 struct OptionVisitor<K, V> {
33 phantom: PhantomData<(K, V)>,
34 }
35
36 impl<'de, K, V> Visitor<'de> for OptionVisitor<K, V>
37 where
38 K: DeserializeOwned,
39 V: DeserializeOwned,
40 {
41 type Value = Option<Vec<(K, V, f64)>>;
42
43 fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
44 formatter.write_str("Option<Vec<(K, V, f64)>>")
45 }
46
47 fn visit_none<E>(self) -> std::result::Result<Self::Value, E>
48 where
49 E: serde::de::Error,
50 {
51 Ok(None)
52 }
53
54 fn visit_some<D>(self, deserializer: D) -> std::result::Result<Self::Value, D::Error>
55 where
56 D: Deserializer<'de>,
57 {
58 deserialize_vec_of_triplets(deserializer).map(Some)
59 }
60 }
61
62 deserializer.deserialize_option(OptionVisitor {
63 phantom: PhantomData,
64 })
65}
66
67/// A group of blocking commands
68pub trait BlockingCommands<'a> {
69 /// This command is the blocking variant of [`lmove`](crate::commands::ListCommands::lmove).
70 ///
71 /// # Return
72 /// the element being popped from `source` and pushed to `destination`.
73 /// If timeout is reached, a None reply is returned.
74 ///
75 /// # See Also
76 /// [<https://redis.io/commands/blmove/>](https://redis.io/commands/blmove/)
77 #[must_use]
78 fn blmove<S, D, E>(
79 self,
80 source: S,
81 destination: D,
82 where_from: LMoveWhere,
83 where_to: LMoveWhere,
84 timeout: f64,
85 ) -> PreparedCommand<'a, Self, E>
86 where
87 Self: Sized,
88 S: SingleArg,
89 D: SingleArg,
90 E: PrimitiveResponse,
91 {
92 prepare_command(
93 self,
94 cmd("BLMOVE")
95 .arg(source)
96 .arg(destination)
97 .arg(where_from)
98 .arg(where_to)
99 .arg(timeout),
100 )
101 }
102
103 /// This command is the blocking variant of [`lmpop`](crate::commands::ListCommands::lmpop).
104 ///
105 /// # Return
106 /// - None when no element could be popped, and timeout is reached.
107 /// - Tuple composed by the name of the key from which elements were popped and the list of popped element
108 ///
109 /// # See Also
110 /// [<https://redis.io/commands/blmpop/>](https://redis.io/commands/blmpop/)
111 #[must_use]
112 fn blmpop<K, KK, E>(
113 self,
114 timeout: f64,
115 keys: KK,
116 where_: LMoveWhere,
117 count: usize,
118 ) -> PreparedCommand<'a, Self, Option<(String, Vec<E>)>>
119 where
120 Self: Sized,
121 K: SingleArg,
122 KK: SingleArgCollection<K>,
123 E: PrimitiveResponse + DeserializeOwned,
124 {
125 prepare_command(
126 self,
127 cmd("BLMPOP")
128 .arg(timeout)
129 .arg(keys.num_args())
130 .arg(keys)
131 .arg(where_)
132 .arg("COUNT")
133 .arg(count),
134 )
135 }
136
137 /// This command is a blocking list pop primitive.
138 ///
139 /// It is the blocking version of [`lpop`](crate::commands::ListCommands::lpop) because it
140 /// blocks the connection when there are no elements to pop from any of the given lists.
141 ///
142 /// An element is popped from the head of the first list that is non-empty,
143 /// with the given keys being checked in the order that they are given.
144 ///
145 /// # Return
146 /// - `None` when no element could be popped and the timeout expired
147 /// - a tuple with the first element being the name of the key where an element was popped
148 /// and the second element being the value of the popped element.
149 ///
150 /// # See Also
151 /// [<https://redis.io/commands/blpop/>](https://redis.io/commands/blpop/)
152 #[must_use]
153 fn blpop<K, KK, K1, V>(self, keys: KK, timeout: f64) -> PreparedCommand<'a, Self, Option<(K1, V)>>
154 where
155 Self: Sized,
156 K: SingleArg,
157 KK: SingleArgCollection<K>,
158 K1: PrimitiveResponse + DeserializeOwned,
159 V: PrimitiveResponse + DeserializeOwned,
160 {
161 prepare_command(self, cmd("BLPOP").arg(keys).arg(timeout))
162 }
163
164 /// This command is a blocking list pop primitive.
165 ///
166 /// It is the blocking version of [`rpop`](crate::commands::ListCommands::rpop) because it
167 /// blocks the connection when there are no elements to pop from any of the given lists.
168 ///
169 /// An element is popped from the tail of the first list that is non-empty,
170 /// with the given keys being checked in the order that they are given.
171 ///
172 /// # Return
173 /// - `None` when no element could be popped and the timeout expired
174 /// - a tuple with the first element being the name of the key where an element was popped
175 /// and the second element being the value of the popped element.
176 ///
177 /// # See Also
178 /// [<https://redis.io/commands/brpop/>](https://redis.io/commands/brpop/)
179 #[must_use]
180 fn brpop<K, KK, K1, V>(self, keys: KK, timeout: f64) -> PreparedCommand<'a, Self, Option<(K1, V)>>
181 where
182 Self: Sized,
183 K: SingleArg,
184 KK: SingleArgCollection<K>,
185 K1: PrimitiveResponse + DeserializeOwned,
186 V: PrimitiveResponse + DeserializeOwned,
187 {
188 prepare_command(self, cmd("BRPOP").arg(keys).arg(timeout))
189 }
190
191 /// This command is the blocking variant of [`zmpop`](crate::commands::SortedSetCommands::zmpop).
192 ///
193 /// # Return
194 /// * `None` if no element could be popped
195 /// * A tuple made up of
196 /// * The name of the key from which elements were popped
197 /// * An array of tuples with all the popped members and their scores
198 ///
199 /// # See Also
200 /// [<https://redis.io/commands/bzmpop/>](https://redis.io/commands/bzmpop/)
201 #[must_use]
202 fn bzmpop<K, KK, E>(
203 self,
204 timeout: f64,
205 keys: KK,
206 where_: ZWhere,
207 count: usize,
208 ) -> PreparedCommand<'a, Self, Option<ZMPopResult<E>>>
209 where
210 Self: Sized,
211 K: SingleArg,
212 KK: SingleArgCollection<K>,
213 E: PrimitiveResponse + DeserializeOwned,
214 {
215 prepare_command(
216 self,
217 cmd("BZMPOP")
218 .arg(timeout)
219 .arg(keys.num_args())
220 .arg(keys)
221 .arg(where_)
222 .arg("COUNT")
223 .arg(count),
224 )
225 }
226
227 /// This command is the blocking variant of [`zpopmax`](crate::commands::SortedSetCommands::zpopmax).
228 ///
229 /// # Return
230 /// * `None` when no element could be popped and the timeout expired.
231 /// * The list of tuple with
232 /// * the first element being the name of the key where a member was popped,
233 /// * the second element is the popped member itself,
234 /// * and the third element is the score of the popped element.
235 ///
236 /// # See Also
237 /// [<https://redis.io/commands/bzpopmax/>](https://redis.io/commands/bzpopmax/)
238 #[must_use]
239 fn bzpopmax<K, KK, E, K1>(
240 self,
241 keys: KK,
242 timeout: f64,
243 ) -> PreparedCommand<'a, Self, BZpopMinMaxResult<K1, E>>
244 where
245 Self: Sized,
246 K: SingleArg,
247 KK: SingleArgCollection<K>,
248 K1: PrimitiveResponse + DeserializeOwned,
249 E: PrimitiveResponse + DeserializeOwned,
250 {
251 prepare_command(self, cmd("BZPOPMAX").arg(keys).arg(timeout))
252 }
253
254 /// This command is the blocking variant of [`zpopmin`](crate::commands::SortedSetCommands::zpopmin).
255 ///
256 /// # Return
257 /// * `None` when no element could be popped and the timeout expired.
258 /// * The list of tuple with
259 /// * the first element being the name of the key where a member was popped,
260 /// * the second element is the popped member itself,
261 /// * and the third element is the score of the popped element.
262 ///
263 /// # See Also
264 /// [<https://redis.io/commands/bzpopmin/>](https://redis.io/commands/bzpopmin/)
265 #[must_use]
266 fn bzpopmin<K, KK, E, K1>(
267 self,
268 keys: KK,
269 timeout: f64,
270 ) -> PreparedCommand<'a, Self, BZpopMinMaxResult<K1, E>>
271 where
272 Self: Sized,
273 K: SingleArg,
274 KK: SingleArgCollection<K>,
275 K1: PrimitiveResponse + DeserializeOwned,
276 E: PrimitiveResponse + DeserializeOwned,
277 {
278 prepare_command(self, cmd("BZPOPMIN").arg(keys).arg(timeout))
279 }
280
281 /// Debugging command that streams back every command processed by the Redis server.
282 ///
283 /// # See Also
284 /// [<https://redis.io/commands/monitor/>](https://redis.io/commands/monitor/)
285 #[must_use]
286 #[allow(async_fn_in_trait)]
287 async fn monitor(self) -> Result<MonitorStream>;
288}