1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180
use crate::{
resp::{cmd, BulkString, SingleArgOrCollection, CommandArgs, IntoArgs, FromValue, FromSingleValueArray, FromKeyValueValueArray},
CommandResult, Future, PrepareCommand, PubSubStream,
};
/// A group of Redis commands related to [`Pub/Sub`](https://redis.io/docs/manual/pubsub/)
/// # See Also
/// [Redis Pub/Sub Commands](https://redis.io/commands/?group=pubsub)
pub trait PubSubCommands<T>: PrepareCommand<T> {
/// Subscribes the client to the given patterns.
///
/// # Example
/// ```
/// use redis_driver::{
/// resp::cmd, Client, ClientCommandResult, FlushingMode,
/// PubSubCommands, ServerCommands, Result
/// };
/// use futures::StreamExt;
///
/// #[tokio::main]
/// async fn main() -> Result<()> {
/// let pub_sub_client = Client::connect("127.0.0.1:6379").await?;
/// let regular_client = Client::connect("127.0.0.1:6379").await?;
///
/// regular_client.flushdb(FlushingMode::Sync).await?;
///
/// let mut pub_sub_stream = pub_sub_client.psubscribe("mychannel*").await?;
///
/// regular_client.publish("mychannel1", "mymessage").await?;
///
/// let (pattern, channel, message): (String, String, String) = pub_sub_stream
/// .next()
/// .await
/// .unwrap()?
/// .into()?;
///
/// assert_eq!("mychannel*", pattern);
/// assert_eq!("mychannel1", channel);
/// assert_eq!("mymessage", message);
///
/// pub_sub_stream.close().await?;
///
/// Ok(())
/// }
/// ```
///
/// # See Also
/// [<https://redis.io/commands/psubscribe/>](https://redis.io/commands/psubscribe/)
fn psubscribe<'a, P, PP>(&'a self, patterns: PP) -> Future<'a, PubSubStream>
where
P: Into<BulkString> + Send + 'a,
PP: SingleArgOrCollection<P>;
/// Posts a message to the given channel.
///
/// # Return
/// The number of clients that received the message.
///
/// Note that in a Redis Cluster, only clients that are connected
/// to the same node as the publishing client are included in the count.
///
/// # See Also
/// [<https://redis.io/commands/publish/>](https://redis.io/commands/publish/)
fn publish<C, M>(&self, channel: C, message: M) -> CommandResult<T, usize>
where
C: Into<BulkString>,
M: Into<BulkString>,
{
self.prepare_command(cmd("PUBLISH").arg(channel).arg(message))
}
/// Lists the currently active channels.
///
/// # Return
/// A collection of active channels, optionally matching the specified pattern.
///
/// # See Also
/// [<https://redis.io/commands/pubsub-channels/>](https://redis.io/commands/pubsub-channels/)
fn pub_sub_channels<C, CC>(&self, options: PubSubChannelsOptions) -> CommandResult<T, CC>
where
C: FromValue,
CC: FromSingleValueArray<C>
{
self.prepare_command(cmd("PUBSUB").arg("CHANNELS").arg(options))
}
/// Returns the number of unique patterns that are subscribed to by clients
/// (that are performed using the PSUBSCRIBE command).
///
/// # Return
/// The number of patterns all the clients are subscribed to.
///
/// # See Also
/// [<https://redis.io/commands/pubsub-numpat/>](https://redis.io/commands/pubsub-numpat/)
fn pub_sub_numpat(&self) -> CommandResult<T, usize>
{
self.prepare_command(cmd("PUBSUB").arg("NUMPAT"))
}
/// Returns the number of subscribers (exclusive of clients subscribed to patterns)
/// for the specified channels.
///
/// # Return
/// A collection of channels and number of subscribers for every channel.
///
/// # See Also
/// [<https://redis.io/commands/pubsub-numsub/>](https://redis.io/commands/pubsub-numsub/)
fn pub_sub_numsub<C, CC, R, RR>(&self, channels: CC) -> CommandResult<T, RR>
where
C: Into<BulkString>,
CC: SingleArgOrCollection<C>,
R: FromValue,
RR: FromKeyValueValueArray<R, usize>
{
self.prepare_command(cmd("PUBSUB").arg("NUMSUB").arg(channels))
}
/// Subscribes the client to the specified channels.
///
/// # Example
/// ```
/// use redis_driver::{
/// resp::cmd, Client, ClientCommandResult, FlushingMode,
/// PubSubCommands, ServerCommands, Result
/// };
/// use futures::StreamExt;
///
/// #[tokio::main]
/// async fn main() -> Result<()> {
/// let pub_sub_client = Client::connect("127.0.0.1:6379").await?;
/// let regular_client = Client::connect("127.0.0.1:6379").await?;
///
/// regular_client.flushdb(FlushingMode::Sync).await?;
///
/// let mut pub_sub_stream = pub_sub_client.subscribe("mychannel").await?;
///
/// regular_client.publish("mychannel", "mymessage").await?;
///
/// let (channel, message): (String, String) = pub_sub_stream
/// .next()
/// .await
/// .unwrap()?
/// .into()?;
///
/// assert_eq!("mychannel", channel);
/// assert_eq!("mymessage", message);
///
/// pub_sub_stream.close().await?;
///
/// Ok(())
/// }
/// ```
///
/// # See Also
/// [<https://redis.io/commands/subscribe/>](https://redis.io/commands/subscribe/)
fn subscribe<'a, C, CC>(&'a self, channels: CC) -> Future<'a, PubSubStream>
where
C: Into<BulkString> + Send + 'a,
CC: SingleArgOrCollection<C>;
}
/// Options for the [`pub_sub_channels`](crate::PubSubCommands::pub_sub_channels) command
#[derive(Default)]
pub struct PubSubChannelsOptions {
command_args: CommandArgs,
}
impl PubSubChannelsOptions {
pub fn pattern<P: Into<BulkString>>(self, pattern: P) -> Self {
Self {
command_args: self.command_args.arg(pattern),
}
}
}
impl IntoArgs for PubSubChannelsOptions {
fn into_args(self, args: CommandArgs) -> CommandArgs {
args.arg(self.command_args)
}
}