rustis/commands/
blocking_commands.rs

1use crate::{
2    client::{prepare_command, MonitorStream, PreparedCommand},
3    commands::{LMoveWhere, ZMPopResult, ZWhere},
4    resp::{cmd, deserialize_vec_of_triplets, PrimitiveResponse, SingleArg, SingleArgCollection},
5    Result,
6};
7use serde::{
8    de::{DeserializeOwned, Visitor},
9    Deserialize, Deserializer,
10};
11use std::{fmt, marker::PhantomData};
12
13/// Result for the [`bzpopmin`](BlockingCommands::bzpopmin)
14/// and [`bzpopmax`](BlockingCommands::bzpopmax) commands
15#[derive(Deserialize)]
16pub struct BZpopMinMaxResult<K, E>(
17    #[serde(deserialize_with = "deserialize_bzop_min_max_result")] pub Option<Vec<(K, E, f64)>>,
18)
19where
20    K: DeserializeOwned,
21    E: DeserializeOwned;
22
23#[allow(clippy::complexity)]
24pub fn deserialize_bzop_min_max_result<'de, D, K, V>(
25    deserializer: D,
26) -> std::result::Result<Option<Vec<(K, V, f64)>>, D::Error>
27where
28    D: Deserializer<'de>,
29    K: DeserializeOwned,
30    V: DeserializeOwned,
31{
32    struct OptionVisitor<K, V> {
33        phantom: PhantomData<(K, V)>,
34    }
35
36    impl<'de, K, V> Visitor<'de> for OptionVisitor<K, V>
37    where
38        K: DeserializeOwned,
39        V: DeserializeOwned,
40    {
41        type Value = Option<Vec<(K, V, f64)>>;
42
43        fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
44            formatter.write_str("Option<Vec<(K, V, f64)>>")
45        }
46
47        fn visit_none<E>(self) -> std::result::Result<Self::Value, E>
48        where
49            E: serde::de::Error,
50        {
51            Ok(None)
52        }
53
54        fn visit_some<D>(self, deserializer: D) -> std::result::Result<Self::Value, D::Error>
55        where
56            D: Deserializer<'de>,
57        {
58            deserialize_vec_of_triplets(deserializer).map(Some)
59        }
60    }
61
62    deserializer.deserialize_option(OptionVisitor {
63        phantom: PhantomData,
64    })
65}
66
67/// A group of blocking commands
68pub trait BlockingCommands<'a> {
69    /// This command is the blocking variant of [`lmove`](crate::commands::ListCommands::lmove).
70    ///
71    /// # Return
72    /// the element being popped from `source` and pushed to `destination`.
73    /// If timeout is reached, a None reply is returned.
74    ///
75    /// # See Also
76    /// [<https://redis.io/commands/blmove/>](https://redis.io/commands/blmove/)
77    #[must_use]
78    fn blmove<S, D, E>(
79        self,
80        source: S,
81        destination: D,
82        where_from: LMoveWhere,
83        where_to: LMoveWhere,
84        timeout: f64,
85    ) -> PreparedCommand<'a, Self, E>
86    where
87        Self: Sized,
88        S: SingleArg,
89        D: SingleArg,
90        E: PrimitiveResponse,
91    {
92        prepare_command(
93            self,
94            cmd("BLMOVE")
95                .arg(source)
96                .arg(destination)
97                .arg(where_from)
98                .arg(where_to)
99                .arg(timeout),
100        )
101    }
102
103    /// This command is the blocking variant of [`lmpop`](crate::commands::ListCommands::lmpop).
104    ///
105    /// # Return
106    /// - None when no element could be popped, and timeout is reached.
107    /// - Tuple composed by the name of the key from which elements were popped and the list of popped element
108    ///
109    /// # See Also
110    /// [<https://redis.io/commands/blmpop/>](https://redis.io/commands/blmpop/)
111    #[must_use]
112    fn blmpop<K, KK, E>(
113        self,
114        timeout: f64,
115        keys: KK,
116        where_: LMoveWhere,
117        count: usize,
118    ) -> PreparedCommand<'a, Self, Option<(String, Vec<E>)>>
119    where
120        Self: Sized,
121        K: SingleArg,
122        KK: SingleArgCollection<K>,
123        E: PrimitiveResponse + DeserializeOwned,
124    {
125        prepare_command(
126            self,
127            cmd("BLMPOP")
128                .arg(timeout)
129                .arg(keys.num_args())
130                .arg(keys)
131                .arg(where_)
132                .arg("COUNT")
133                .arg(count),
134        )
135    }
136
137    /// This command is a blocking list pop primitive.
138    ///
139    /// It is the blocking version of [`lpop`](crate::commands::ListCommands::lpop) because it
140    /// blocks the connection when there are no elements to pop from any of the given lists.
141    ///
142    /// An element is popped from the head of the first list that is non-empty,
143    /// with the given keys being checked in the order that they are given.
144    ///
145    /// # Return
146    /// - `None` when no element could be popped and the timeout expired
147    /// - a tuple with the first element being the name of the key where an element was popped
148    /// and the second element being the value of the popped element.
149    ///
150    /// # See Also
151    /// [<https://redis.io/commands/blpop/>](https://redis.io/commands/blpop/)
152    #[must_use]
153    fn blpop<K, KK, K1, V>(self, keys: KK, timeout: f64) -> PreparedCommand<'a, Self, Option<(K1, V)>>
154    where
155        Self: Sized,
156        K: SingleArg,
157        KK: SingleArgCollection<K>,
158        K1: PrimitiveResponse + DeserializeOwned,
159        V: PrimitiveResponse + DeserializeOwned,
160    {
161        prepare_command(self, cmd("BLPOP").arg(keys).arg(timeout))
162    }
163
164    /// This command is a blocking list pop primitive.
165    ///
166    /// It is the blocking version of [`rpop`](crate::commands::ListCommands::rpop) because it
167    /// blocks the connection when there are no elements to pop from any of the given lists.
168    ///
169    /// An element is popped from the tail of the first list that is non-empty,
170    /// with the given keys being checked in the order that they are given.
171    ///
172    /// # Return
173    /// - `None` when no element could be popped and the timeout expired
174    /// - a tuple with the first element being the name of the key where an element was popped
175    /// and the second element being the value of the popped element.
176    ///
177    /// # See Also
178    /// [<https://redis.io/commands/brpop/>](https://redis.io/commands/brpop/)
179    #[must_use]
180    fn brpop<K, KK, K1, V>(self, keys: KK, timeout: f64) -> PreparedCommand<'a, Self, Option<(K1, V)>>
181    where
182        Self: Sized,
183        K: SingleArg,
184        KK: SingleArgCollection<K>,
185        K1: PrimitiveResponse + DeserializeOwned,
186        V: PrimitiveResponse + DeserializeOwned,
187    {
188        prepare_command(self, cmd("BRPOP").arg(keys).arg(timeout))
189    }
190
191    /// This command is the blocking variant of [`zmpop`](crate::commands::SortedSetCommands::zmpop).
192    ///
193    /// # Return
194    /// * `None` if no element could be popped
195    /// * A tuple made up of
196    ///     * The name of the key from which elements were popped
197    ///     * An array of tuples with all the popped members and their scores
198    ///
199    /// # See Also
200    /// [<https://redis.io/commands/bzmpop/>](https://redis.io/commands/bzmpop/)
201    #[must_use]
202    fn bzmpop<K, KK, E>(
203        self,
204        timeout: f64,
205        keys: KK,
206        where_: ZWhere,
207        count: usize,
208    ) -> PreparedCommand<'a, Self, Option<ZMPopResult<E>>>
209    where
210        Self: Sized,
211        K: SingleArg,
212        KK: SingleArgCollection<K>,
213        E: PrimitiveResponse + DeserializeOwned,
214    {
215        prepare_command(
216            self,
217            cmd("BZMPOP")
218                .arg(timeout)
219                .arg(keys.num_args())
220                .arg(keys)
221                .arg(where_)
222                .arg("COUNT")
223                .arg(count),
224        )
225    }
226
227    /// This command is the blocking variant of [`zpopmax`](crate::commands::SortedSetCommands::zpopmax).
228    ///
229    /// # Return
230    /// * `None` when no element could be popped and the timeout expired.
231    /// * The list of tuple with
232    ///     * the first element being the name of the key where a member was popped,
233    ///     * the second element is the popped member itself,
234    ///     * and the third element is the score of the popped element.
235    ///
236    /// # See Also
237    /// [<https://redis.io/commands/bzpopmax/>](https://redis.io/commands/bzpopmax/)
238    #[must_use]
239    fn bzpopmax<K, KK, E, K1>(
240        self,
241        keys: KK,
242        timeout: f64,
243    ) -> PreparedCommand<'a, Self, BZpopMinMaxResult<K1, E>>
244    where
245        Self: Sized,
246        K: SingleArg,
247        KK: SingleArgCollection<K>,
248        K1: PrimitiveResponse + DeserializeOwned,
249        E: PrimitiveResponse + DeserializeOwned,
250    {
251        prepare_command(self, cmd("BZPOPMAX").arg(keys).arg(timeout))
252    }
253
254    /// This command is the blocking variant of [`zpopmin`](crate::commands::SortedSetCommands::zpopmin).
255    ///
256    /// # Return
257    /// * `None` when no element could be popped and the timeout expired.
258    /// * The list of tuple with
259    ///     * the first element being the name of the key where a member was popped,
260    ///     * the second element is the popped member itself,
261    ///     * and the third element is the score of the popped element.
262    ///
263    /// # See Also
264    /// [<https://redis.io/commands/bzpopmin/>](https://redis.io/commands/bzpopmin/)
265    #[must_use]
266    fn bzpopmin<K, KK, E, K1>(
267        self,
268        keys: KK,
269        timeout: f64,
270    ) -> PreparedCommand<'a, Self, BZpopMinMaxResult<K1, E>>
271    where
272        Self: Sized,
273        K: SingleArg,
274        KK: SingleArgCollection<K>,
275        K1: PrimitiveResponse + DeserializeOwned,
276        E: PrimitiveResponse + DeserializeOwned,
277    {
278        prepare_command(self, cmd("BZPOPMIN").arg(keys).arg(timeout))
279    }
280
281    /// Debugging command that streams back every command processed by the Redis server.
282    ///
283    /// # See Also
284    /// [<https://redis.io/commands/monitor/>](https://redis.io/commands/monitor/)
285    #[must_use]
286    #[allow(async_fn_in_trait)]
287    async fn monitor(self) -> Result<MonitorStream>;
288}