redis_driver/clients/
pub_sub_stream.rs

1use crate::{
2    resp::Value, ClientPreparedCommand, InternalPubSubCommands, PubSubReceiver, Result, InnerClient,
3};
4use futures::{Stream, StreamExt};
5use std::{
6    pin::Pin,
7    task::{Context, Poll},
8};
9
10/// Stream to get messages from the channels or patterns [`subscribed`](https://redis.io/docs/manual/pubsub/) to
11///
12/// # Example
13/// ```
14/// use redis_driver::{
15///     resp::cmd, Client, ClientPreparedCommand, FlushingMode,
16///     PubSubCommands, ServerCommands, Result
17/// };
18/// use futures::StreamExt;
19///
20/// #[tokio::main]
21/// async fn main() -> Result<()> {
22///     let mut pub_sub_client = Client::connect("127.0.0.1:6379").await?;
23///     let mut regular_client = Client::connect("127.0.0.1:6379").await?;
24///
25///     regular_client.flushdb(FlushingMode::Sync).await?;
26///
27///     let mut pub_sub_stream = pub_sub_client.subscribe("mychannel").await?;
28///
29///     regular_client.publish("mychannel", "mymessage").await?;
30///
31///     let (channel, message): (String, String) = pub_sub_stream
32///         .next()
33///         .await
34///         .unwrap()?
35///         .into()?;
36///
37///     assert_eq!("mychannel", channel);
38///     assert_eq!("mymessage", message);
39///
40///     pub_sub_stream.close().await?;
41///
42///     Ok(())
43/// }
44/// ```
45pub struct PubSubStream {
46    closed: bool,
47    channels: Vec<String>,
48    patterns: Vec<String>,
49    shardchannels: Vec<String>,
50    receiver: PubSubReceiver,
51    client: InnerClient,
52}
53
54impl PubSubStream {
55    pub(crate) fn from_channels(
56        channels: Vec<String>,
57        receiver: PubSubReceiver,
58        client: InnerClient,
59    ) -> Self {
60        Self {
61            closed: false,
62            channels,
63            patterns: Vec::new(),
64            shardchannels: Vec::new(),
65            receiver,
66            client,
67        }
68    }
69
70    pub(crate) fn from_patterns(
71        patterns: Vec<String>,
72        receiver: PubSubReceiver,
73        client: InnerClient,
74    ) -> Self {
75        Self {
76            closed: false,
77            channels: Vec::new(),
78            patterns,
79            shardchannels: Vec::new(),
80            receiver,
81            client,
82        }
83    }
84
85    pub(crate) fn from_shardchannels(
86        shardchannels: Vec<String>,
87        receiver: PubSubReceiver,
88        client: InnerClient,
89    ) -> Self {
90        Self {
91            closed: false,
92            channels: Vec::new(),
93            patterns: Vec::new(),
94            shardchannels,
95            receiver,
96            client,
97        }
98    }
99
100    pub async fn close(&mut self) -> Result<()> {
101        let mut channels = Vec::<String>::new();
102        std::mem::swap(&mut channels, &mut self.channels);
103        if !channels.is_empty() {
104            self.client.unsubscribe(channels).await?;
105        }
106
107        let mut patterns = Vec::<String>::new();
108        std::mem::swap(&mut patterns, &mut self.patterns);
109        if !patterns.is_empty() {
110            self.client.punsubscribe(patterns).await?;
111        }
112
113        let mut shardchannels = Vec::<String>::new();
114        std::mem::swap(&mut shardchannels, &mut self.shardchannels);
115        if !shardchannels.is_empty() {
116            self.client.sunsubscribe(shardchannels).await?;
117        }
118
119        self.closed = true;
120
121        Ok(())
122    }
123}
124
125impl Stream for PubSubStream {
126    type Item = Result<Value>;
127
128    fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
129        if self.closed {
130            Poll::Ready(None)
131        } else {
132            self.get_mut().receiver.poll_next_unpin(cx)
133        }
134    }
135}
136
137impl Drop for PubSubStream {
138    fn drop(&mut self) {
139        if self.closed {
140            return;
141        }
142
143        let mut channels = Vec::<String>::new();
144        std::mem::swap(&mut channels, &mut self.channels);
145        if !channels.is_empty() {
146            let _result = self.client.unsubscribe(channels).forget();
147        }
148
149        let mut patterns = Vec::<String>::new();
150        std::mem::swap(&mut patterns, &mut self.patterns);
151        if !patterns.is_empty() {
152            let _result = self.client.punsubscribe(patterns).forget();
153        }
154
155        let mut shardchannels = Vec::<String>::new();
156        std::mem::swap(&mut shardchannels, &mut self.shardchannels);
157        if !shardchannels.is_empty() {
158            let _result = self.client.sunsubscribe(shardchannels).forget();
159        }
160    }
161}