redis_driver/commands/pub_sub_commands.rs
1use crate::{
2 prepare_command,
3 resp::{
4 cmd, CommandArg, CommandArgs, FromKeyValueValueArray, FromSingleValueArray, FromValue,
5 IntoArgs, SingleArgOrCollection,
6 },
7 PreparedCommand, Future, PubSubStream,
8};
9
10/// A group of Redis commands related to [`Pub/Sub`](https://redis.io/docs/manual/pubsub/)
11/// # See Also
12/// [Redis Pub/Sub Commands](https://redis.io/commands/?group=pubsub)
13pub trait PubSubCommands {
14 /// Subscribes the client to the given patterns.
15 ///
16 /// # Example
17 /// ```
18 /// use redis_driver::{
19 /// resp::cmd, Client, ClientPreparedCommand, FlushingMode,
20 /// PubSubCommands, ServerCommands, Result
21 /// };
22 /// use futures::StreamExt;
23 ///
24 /// #[tokio::main]
25 /// async fn main() -> Result<()> {
26 /// let mut pub_sub_client = Client::connect("127.0.0.1:6379").await?;
27 /// let mut regular_client = Client::connect("127.0.0.1:6379").await?;
28 ///
29 /// regular_client.flushdb(FlushingMode::Sync).await?;
30 ///
31 /// let mut pub_sub_stream = pub_sub_client.psubscribe("mychannel*").await?;
32 ///
33 /// regular_client.publish("mychannel1", "mymessage").await?;
34 ///
35 /// let (pattern, channel, message): (String, String, String) = pub_sub_stream
36 /// .next()
37 /// .await
38 /// .unwrap()?
39 /// .into()?;
40 ///
41 /// assert_eq!("mychannel*", pattern);
42 /// assert_eq!("mychannel1", channel);
43 /// assert_eq!("mymessage", message);
44 ///
45 /// pub_sub_stream.close().await?;
46 ///
47 /// Ok(())
48 /// }
49 /// ```
50 ///
51 /// # See Also
52 /// [<https://redis.io/commands/psubscribe/>](https://redis.io/commands/psubscribe/)
53 fn psubscribe<'a, P, PP>(&'a mut self, patterns: PP) -> Future<'a, PubSubStream>
54 where
55 P: Into<CommandArg> + Send + 'a,
56 PP: SingleArgOrCollection<P>;
57
58 /// Posts a message to the given channel.
59 ///
60 /// # Return
61 /// The number of clients that received the message.
62 ///
63 /// Note that in a Redis Cluster, only clients that are connected
64 /// to the same node as the publishing client are included in the count.
65 ///
66 /// # See Also
67 /// [<https://redis.io/commands/publish/>](https://redis.io/commands/publish/)
68 fn publish<C, M>(&mut self, channel: C, message: M) -> PreparedCommand<Self, usize>
69 where
70 Self: Sized,
71 C: Into<CommandArg>,
72 M: Into<CommandArg>,
73 {
74 prepare_command(self, cmd("PUBLISH").arg(channel).arg(message))
75 }
76
77 /// Lists the currently active channels.
78 ///
79 /// # Return
80 /// A collection of active channels, optionally matching the specified pattern.
81 ///
82 /// # See Also
83 /// [<https://redis.io/commands/pubsub-channels/>](https://redis.io/commands/pubsub-channels/)
84 fn pub_sub_channels<C, CC>(
85 &mut self,
86 options: PubSubChannelsOptions,
87 ) -> PreparedCommand<Self, CC>
88 where
89 Self: Sized,
90 C: FromValue,
91 CC: FromSingleValueArray<C>,
92 {
93 prepare_command(self, cmd("PUBSUB").arg("CHANNELS").arg(options))
94 }
95
96 /// Returns the number of unique patterns that are subscribed to by clients
97 /// (that are performed using the PSUBSCRIBE command).
98 ///
99 /// # Return
100 /// The number of patterns all the clients are subscribed to.
101 ///
102 /// # See Also
103 /// [<https://redis.io/commands/pubsub-numpat/>](https://redis.io/commands/pubsub-numpat/)
104 fn pub_sub_numpat(&mut self) -> PreparedCommand<Self, usize>
105 where
106 Self: Sized,
107 {
108 prepare_command(self, cmd("PUBSUB").arg("NUMPAT"))
109 }
110
111 /// Returns the number of subscribers (exclusive of clients subscribed to patterns)
112 /// for the specified channels.
113 ///
114 /// # Return
115 /// A collection of channels and number of subscribers for every channel.
116 ///
117 /// # See Also
118 /// [<https://redis.io/commands/pubsub-numsub/>](https://redis.io/commands/pubsub-numsub/)
119 fn pub_sub_numsub<C, CC, R, RR>(&mut self, channels: CC) -> PreparedCommand<Self, RR>
120 where
121 Self: Sized,
122 C: Into<CommandArg>,
123 CC: SingleArgOrCollection<C>,
124 R: FromValue,
125 RR: FromKeyValueValueArray<R, usize>,
126 {
127 prepare_command(self, cmd("PUBSUB").arg("NUMSUB").arg(channels))
128 }
129
130 /// Lists the currently active shard channels.
131 ///
132 /// # Return
133 /// A collection of active channels, optionally matching the specified pattern.
134 ///
135 /// # See Also
136 /// [<https://redis.io/commands/pubsub-shardchannels/>](https://redis.io/commands/pubsub-shardchannels/)
137 fn pub_sub_shardchannels<C, CC>(
138 &mut self,
139 options: PubSubChannelsOptions,
140 ) -> PreparedCommand<Self, CC>
141 where
142 Self: Sized,
143 C: FromValue,
144 CC: FromSingleValueArray<C>,
145 {
146 prepare_command(self, cmd("PUBSUB").arg("SHARDCHANNELS").arg(options))
147 }
148
149 /// Returns the number of subscribers for the specified shard channels.
150 ///
151 /// # Return
152 /// A collection of channels and number of subscribers for every channel.
153 ///
154 /// # See Also
155 /// [<https://redis.io/commands/pubsub-shardnumsub/>](https://redis.io/commands/pubsub-shardnumsub/)
156 fn pub_sub_shardnumsub<C, CC, R, RR>(&mut self, channels: CC) -> PreparedCommand<Self, RR>
157 where
158 Self: Sized,
159 C: Into<CommandArg>,
160 CC: SingleArgOrCollection<C>,
161 R: FromValue,
162 RR: FromKeyValueValueArray<R, usize>,
163 {
164 prepare_command(self, cmd("PUBSUB").arg("SHARDNUMSUB").arg(channels))
165 }
166
167 /// Posts a message to the given shard channel.
168 ///
169 /// # Return
170 /// The number of clients that received the message.
171 ///
172 /// # See Also
173 /// [<https://redis.io/commands/spublish/>](https://redis.io/commands/spublish/)
174 fn spublish<C, M>(&mut self, shardchannel: C, message: M) -> PreparedCommand<Self, usize>
175 where
176 Self: Sized,
177 C: Into<CommandArg>,
178 M: Into<CommandArg>,
179 {
180 prepare_command(self, cmd("SPUBLISH").arg(shardchannel).arg(message))
181 }
182
183 /// Subscribes the client to the specified channels.
184 ///
185 /// # See Also
186 /// [<https://redis.io/commands/subscribe/>](https://redis.io/commands/subscribe/)
187 fn ssubscribe<'a, C, CC>(&'a mut self, shardchannels: CC) -> Future<'a, PubSubStream>
188 where
189 C: Into<CommandArg> + Send + 'a,
190 CC: SingleArgOrCollection<C>;
191
192 /// Subscribes the client to the specified channels.
193 ///
194 /// # Example
195 /// ```
196 /// use redis_driver::{
197 /// resp::cmd, Client, ClientPreparedCommand, FlushingMode,
198 /// PubSubCommands, ServerCommands, Result
199 /// };
200 /// use futures::StreamExt;
201 ///
202 /// #[tokio::main]
203 /// async fn main() -> Result<()> {
204 /// let mut pub_sub_client = Client::connect("127.0.0.1:6379").await?;
205 /// let mut regular_client = Client::connect("127.0.0.1:6379").await?;
206 ///
207 /// regular_client.flushdb(FlushingMode::Sync).await?;
208 ///
209 /// let mut pub_sub_stream = pub_sub_client.subscribe("mychannel").await?;
210 ///
211 /// regular_client.publish("mychannel", "mymessage").await?;
212 ///
213 /// let (channel, message): (String, String) = pub_sub_stream
214 /// .next()
215 /// .await
216 /// .unwrap()?
217 /// .into()?;
218 ///
219 /// assert_eq!("mychannel", channel);
220 /// assert_eq!("mymessage", message);
221 ///
222 /// pub_sub_stream.close().await?;
223 ///
224 /// Ok(())
225 /// }
226 /// ```
227 ///
228 /// # See Also
229 /// [<https://redis.io/commands/subscribe/>](https://redis.io/commands/subscribe/)
230 fn subscribe<'a, C, CC>(&'a mut self, channels: CC) -> Future<'a, PubSubStream>
231 where
232 C: Into<CommandArg> + Send + 'a,
233 CC: SingleArgOrCollection<C>;
234}
235
236/// Options for the [`pub_sub_channels`](crate::PubSubCommands::pub_sub_channels) command
237#[derive(Default)]
238pub struct PubSubChannelsOptions {
239 command_args: CommandArgs,
240}
241
242impl PubSubChannelsOptions {
243 pub fn pattern<P: Into<CommandArg>>(self, pattern: P) -> Self {
244 Self {
245 command_args: self.command_args.arg(pattern),
246 }
247 }
248}
249
250impl IntoArgs for PubSubChannelsOptions {
251 fn into_args(self, args: CommandArgs) -> CommandArgs {
252 args.arg(self.command_args)
253 }
254}