1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
use crate::{
    resp::{cmd, BulkString, SingleArgOrCollection},
    CommandResult, Future, PrepareCommand, PubSubStream,
};

/// A group of Redis commands related to [`Pub/Sub`](https://redis.io/docs/manual/pubsub/)
/// # See Also
/// [Redis Pub/Sub Commands](https://redis.io/commands/?group=pubsub)
pub trait PubSubCommands<T>: PrepareCommand<T> {
    /// Posts a message to the given channel.
    ///
    /// # Return
    /// The number of clients that received the message.
    ///
    /// Note that in a Redis Cluster, only clients that are connected
    /// to the same node as the publishing client are included in the count.
    ///
    /// # See Also
    /// [https://redis.io/commands/publish/](https://redis.io/commands/publish/)
    fn publish<C, M>(&self, channel: C, message: M) -> CommandResult<T, usize>
    where
        C: Into<BulkString>,
        M: Into<BulkString>,
    {
        self.prepare_command(cmd("PUBLISH").arg(channel).arg(message))
    }

    /// Subscribes the client to the specified channels.
    /// 
    /// # Example
    /// ```
    /// use redis_driver::{
    ///     resp::cmd, Client, ClientCommandResult, FlushingMode,
    ///     PubSubCommands, ServerCommands, Result
    /// };
    /// use futures::StreamExt;
    ///
    /// #[tokio::main]
    /// async fn main() -> Result<()> {
    ///     let pub_sub_client = Client::connect("127.0.0.1:6379").await?;
    ///     let regular_client = Client::connect("127.0.0.1:6379").await?;
    ///
    ///     regular_client.flushdb(FlushingMode::Sync).send().await?;
    ///
    ///     let mut pub_sub_stream = pub_sub_client.subscribe("mychannel").await?;
    /// 
    ///     regular_client
    ///         .publish("mychannel", "mymessage")
    ///         .send()
    ///         .await?;
    ///
    ///     let (channel, message): (String, String) = pub_sub_stream
    ///         .next()
    ///         .await
    ///         .unwrap()?
    ///         .into()?;
    /// 
    ///     assert_eq!("mychannel", channel);
    ///     assert_eq!("mymessage", message);
    /// 
    ///     pub_sub_stream.close().await?;
    ///
    ///     Ok(())
    /// }
    /// ```
    ///
    /// # See Also
    /// [https://redis.io/commands/subscribe/](https://redis.io/commands/subscribe/)
    fn subscribe<'a, C, CC>(&'a self, channels: CC) -> Future<'a, PubSubStream>
    where
        C: Into<BulkString> + Send + 'a,
        CC: SingleArgOrCollection<C>;

    /// Unsubscribes the client from the given channels, or from all of them if none is given.
    ///
    /// # See Also
    /// [https://redis.io/commands/unsubscribe/](https://redis.io/commands/unsubscribe/)            
    fn unsubscribe<C, CC>(&self, channels: CC) -> CommandResult<T, ()>
    where
        C: Into<BulkString>,
        CC: SingleArgOrCollection<C>,
    {
        self.prepare_command(cmd("UNSUBSCRIBE").arg(channels))
    }

    /// Subscribes the client to the given patterns.
    ///
    /// # Example
    /// ```
    /// use redis_driver::{
    ///     resp::cmd, Client, ClientCommandResult, FlushingMode,
    ///     PubSubCommands, ServerCommands, Result
    /// };
    /// use futures::StreamExt;
    ///
    /// #[tokio::main]
    /// async fn main() -> Result<()> {
    ///     let pub_sub_client = Client::connect("127.0.0.1:6379").await?;
    ///     let regular_client = Client::connect("127.0.0.1:6379").await?;
    ///
    ///     regular_client.flushdb(FlushingMode::Sync).send().await?;
    ///
    ///     let mut pub_sub_stream = pub_sub_client.psubscribe("mychannel*").await?;
    /// 
    ///     regular_client
    ///         .publish("mychannel1", "mymessage")
    ///         .send()
    ///         .await?;
    ///
    ///     let (pattern, channel, message): (String, String, String) = pub_sub_stream
    ///         .next()
    ///         .await
    ///         .unwrap()?
    ///         .into()?;
    /// 
    ///     assert_eq!("mychannel*", pattern);
    ///     assert_eq!("mychannel1", channel);
    ///     assert_eq!("mymessage", message);
    /// 
    ///     pub_sub_stream.close().await?;
    ///
    ///     Ok(())
    /// }
    /// ```
    /// 
    /// # See Also
    /// [https://redis.io/commands/psubscribe/](https://redis.io/commands/psubscribe/)
    fn psubscribe<'a, P, PP>(&'a self, patterns: PP) -> Future<'a, PubSubStream>
    where
        P: Into<BulkString> + Send + 'a,
        PP: SingleArgOrCollection<P>;

    /// Unsubscribes the client from the given patterns, or from all of them if none is given.
    ///
    /// # See Also
    /// [https://redis.io/commands/punsubscribe/](https://redis.io/commands/punsubscribe/)            
    fn punsubscribe<P, PP>(&self, patterns: PP) -> CommandResult<T, ()>
    where
        P: Into<BulkString> + Send,
        PP: SingleArgOrCollection<P>
    {
        self.prepare_command(cmd("PUNSUBSCRIBE").arg(patterns))
    }
}