redis_driver/commands/
pub_sub_commands.rs

1use crate::{
2    prepare_command,
3    resp::{
4        cmd, CommandArg, CommandArgs, FromKeyValueValueArray, FromSingleValueArray, FromValue,
5        IntoArgs, SingleArgOrCollection,
6    },
7    PreparedCommand, Future, PubSubStream,
8};
9
10/// A group of Redis commands related to [`Pub/Sub`](https://redis.io/docs/manual/pubsub/)
11/// # See Also
12/// [Redis Pub/Sub Commands](https://redis.io/commands/?group=pubsub)
13pub trait PubSubCommands {
14    /// Subscribes the client to the given patterns.
15    ///
16    /// # Example
17    /// ```
18    /// use redis_driver::{
19    ///     resp::cmd, Client, ClientPreparedCommand, FlushingMode,
20    ///     PubSubCommands, ServerCommands, Result
21    /// };
22    /// use futures::StreamExt;
23    ///
24    /// #[tokio::main]
25    /// async fn main() -> Result<()> {
26    ///     let mut pub_sub_client = Client::connect("127.0.0.1:6379").await?;
27    ///     let mut regular_client = Client::connect("127.0.0.1:6379").await?;
28    ///
29    ///     regular_client.flushdb(FlushingMode::Sync).await?;
30    ///
31    ///     let mut pub_sub_stream = pub_sub_client.psubscribe("mychannel*").await?;
32    ///
33    ///     regular_client.publish("mychannel1", "mymessage").await?;
34    ///
35    ///     let (pattern, channel, message): (String, String, String) = pub_sub_stream
36    ///         .next()
37    ///         .await
38    ///         .unwrap()?
39    ///         .into()?;
40    ///
41    ///     assert_eq!("mychannel*", pattern);
42    ///     assert_eq!("mychannel1", channel);
43    ///     assert_eq!("mymessage", message);
44    ///
45    ///     pub_sub_stream.close().await?;
46    ///
47    ///     Ok(())
48    /// }
49    /// ```
50    ///
51    /// # See Also
52    /// [<https://redis.io/commands/psubscribe/>](https://redis.io/commands/psubscribe/)
53    fn psubscribe<'a, P, PP>(&'a mut self, patterns: PP) -> Future<'a, PubSubStream>
54    where
55        P: Into<CommandArg> + Send + 'a,
56        PP: SingleArgOrCollection<P>;
57
58    /// Posts a message to the given channel.
59    ///
60    /// # Return
61    /// The number of clients that received the message.
62    ///
63    /// Note that in a Redis Cluster, only clients that are connected
64    /// to the same node as the publishing client are included in the count.
65    ///
66    /// # See Also
67    /// [<https://redis.io/commands/publish/>](https://redis.io/commands/publish/)
68    fn publish<C, M>(&mut self, channel: C, message: M) -> PreparedCommand<Self, usize>
69    where
70        Self: Sized,
71        C: Into<CommandArg>,
72        M: Into<CommandArg>,
73    {
74        prepare_command(self, cmd("PUBLISH").arg(channel).arg(message))
75    }
76
77    /// Lists the currently active channels.
78    ///
79    /// # Return
80    /// A collection of active channels, optionally matching the specified pattern.
81    ///
82    /// # See Also
83    /// [<https://redis.io/commands/pubsub-channels/>](https://redis.io/commands/pubsub-channels/)
84    fn pub_sub_channels<C, CC>(
85        &mut self,
86        options: PubSubChannelsOptions,
87    ) -> PreparedCommand<Self, CC>
88    where
89        Self: Sized,
90        C: FromValue,
91        CC: FromSingleValueArray<C>,
92    {
93        prepare_command(self, cmd("PUBSUB").arg("CHANNELS").arg(options))
94    }
95
96    /// Returns the number of unique patterns that are subscribed to by clients
97    /// (that are performed using the PSUBSCRIBE command).
98    ///
99    /// # Return
100    /// The number of patterns all the clients are subscribed to.
101    ///
102    /// # See Also
103    /// [<https://redis.io/commands/pubsub-numpat/>](https://redis.io/commands/pubsub-numpat/)
104    fn pub_sub_numpat(&mut self) -> PreparedCommand<Self, usize>
105    where
106        Self: Sized,
107    {
108        prepare_command(self, cmd("PUBSUB").arg("NUMPAT"))
109    }
110
111    /// Returns the number of subscribers (exclusive of clients subscribed to patterns)
112    ///  for the specified channels.
113    ///
114    /// # Return
115    /// A collection of channels and number of subscribers for every channel.
116    ///
117    /// # See Also
118    /// [<https://redis.io/commands/pubsub-numsub/>](https://redis.io/commands/pubsub-numsub/)
119    fn pub_sub_numsub<C, CC, R, RR>(&mut self, channels: CC) -> PreparedCommand<Self, RR>
120    where
121        Self: Sized,
122        C: Into<CommandArg>,
123        CC: SingleArgOrCollection<C>,
124        R: FromValue,
125        RR: FromKeyValueValueArray<R, usize>,
126    {
127        prepare_command(self, cmd("PUBSUB").arg("NUMSUB").arg(channels))
128    }
129
130    /// Lists the currently active shard channels.
131    ///
132    /// # Return
133    /// A collection of active channels, optionally matching the specified pattern.
134    ///
135    /// # See Also
136    /// [<https://redis.io/commands/pubsub-shardchannels/>](https://redis.io/commands/pubsub-shardchannels/)
137    fn pub_sub_shardchannels<C, CC>(
138        &mut self,
139        options: PubSubChannelsOptions,
140    ) -> PreparedCommand<Self, CC>
141    where
142        Self: Sized,
143        C: FromValue,
144        CC: FromSingleValueArray<C>,
145    {
146        prepare_command(self, cmd("PUBSUB").arg("SHARDCHANNELS").arg(options))
147    }
148
149    /// Returns the number of subscribers for the specified shard channels.
150    ///
151    /// # Return
152    /// A collection of channels and number of subscribers for every channel.
153    ///
154    /// # See Also
155    /// [<https://redis.io/commands/pubsub-shardnumsub/>](https://redis.io/commands/pubsub-shardnumsub/)
156    fn pub_sub_shardnumsub<C, CC, R, RR>(&mut self, channels: CC) -> PreparedCommand<Self, RR>
157    where
158        Self: Sized,
159        C: Into<CommandArg>,
160        CC: SingleArgOrCollection<C>,
161        R: FromValue,
162        RR: FromKeyValueValueArray<R, usize>,
163    {
164        prepare_command(self, cmd("PUBSUB").arg("SHARDNUMSUB").arg(channels))
165    }
166
167    /// Posts a message to the given shard channel.
168    ///
169    /// # Return
170    /// The number of clients that received the message.
171    ///
172    /// # See Also
173    /// [<https://redis.io/commands/spublish/>](https://redis.io/commands/spublish/)
174    fn spublish<C, M>(&mut self, shardchannel: C, message: M) -> PreparedCommand<Self, usize>
175    where
176        Self: Sized,
177        C: Into<CommandArg>,
178        M: Into<CommandArg>,
179    {
180        prepare_command(self, cmd("SPUBLISH").arg(shardchannel).arg(message))
181    }
182
183    /// Subscribes the client to the specified channels.
184    ///
185    /// # See Also
186    /// [<https://redis.io/commands/subscribe/>](https://redis.io/commands/subscribe/)
187    fn ssubscribe<'a, C, CC>(&'a mut self, shardchannels: CC) -> Future<'a, PubSubStream>
188    where
189        C: Into<CommandArg> + Send + 'a,
190        CC: SingleArgOrCollection<C>;    
191
192    /// Subscribes the client to the specified channels.
193    ///
194    /// # Example
195    /// ```
196    /// use redis_driver::{
197    ///     resp::cmd, Client, ClientPreparedCommand, FlushingMode,
198    ///     PubSubCommands, ServerCommands, Result
199    /// };
200    /// use futures::StreamExt;
201    ///
202    /// #[tokio::main]
203    /// async fn main() -> Result<()> {
204    ///     let mut pub_sub_client = Client::connect("127.0.0.1:6379").await?;
205    ///     let mut regular_client = Client::connect("127.0.0.1:6379").await?;
206    ///
207    ///     regular_client.flushdb(FlushingMode::Sync).await?;
208    ///
209    ///     let mut pub_sub_stream = pub_sub_client.subscribe("mychannel").await?;
210    ///
211    ///     regular_client.publish("mychannel", "mymessage").await?;
212    ///
213    ///     let (channel, message): (String, String) = pub_sub_stream
214    ///         .next()
215    ///         .await
216    ///         .unwrap()?
217    ///         .into()?;
218    ///
219    ///     assert_eq!("mychannel", channel);
220    ///     assert_eq!("mymessage", message);
221    ///
222    ///     pub_sub_stream.close().await?;
223    ///
224    ///     Ok(())
225    /// }
226    /// ```
227    ///
228    /// # See Also
229    /// [<https://redis.io/commands/subscribe/>](https://redis.io/commands/subscribe/)
230    fn subscribe<'a, C, CC>(&'a mut self, channels: CC) -> Future<'a, PubSubStream>
231    where
232        C: Into<CommandArg> + Send + 'a,
233        CC: SingleArgOrCollection<C>;
234}
235
236/// Options for the [`pub_sub_channels`](crate::PubSubCommands::pub_sub_channels) command
237#[derive(Default)]
238pub struct PubSubChannelsOptions {
239    command_args: CommandArgs,
240}
241
242impl PubSubChannelsOptions {
243    pub fn pattern<P: Into<CommandArg>>(self, pattern: P) -> Self {
244        Self {
245            command_args: self.command_args.arg(pattern),
246        }
247    }
248}
249
250impl IntoArgs for PubSubChannelsOptions {
251    fn into_args(self, args: CommandArgs) -> CommandArgs {
252        args.arg(self.command_args)
253    }
254}