redis_driver/clients/
pub_sub_stream.rs1use crate::{
2 resp::Value, ClientPreparedCommand, InternalPubSubCommands, PubSubReceiver, Result, InnerClient,
3};
4use futures::{Stream, StreamExt};
5use std::{
6 pin::Pin,
7 task::{Context, Poll},
8};
9
10pub struct PubSubStream {
46 closed: bool,
47 channels: Vec<String>,
48 patterns: Vec<String>,
49 shardchannels: Vec<String>,
50 receiver: PubSubReceiver,
51 client: InnerClient,
52}
53
54impl PubSubStream {
55 pub(crate) fn from_channels(
56 channels: Vec<String>,
57 receiver: PubSubReceiver,
58 client: InnerClient,
59 ) -> Self {
60 Self {
61 closed: false,
62 channels,
63 patterns: Vec::new(),
64 shardchannels: Vec::new(),
65 receiver,
66 client,
67 }
68 }
69
70 pub(crate) fn from_patterns(
71 patterns: Vec<String>,
72 receiver: PubSubReceiver,
73 client: InnerClient,
74 ) -> Self {
75 Self {
76 closed: false,
77 channels: Vec::new(),
78 patterns,
79 shardchannels: Vec::new(),
80 receiver,
81 client,
82 }
83 }
84
85 pub(crate) fn from_shardchannels(
86 shardchannels: Vec<String>,
87 receiver: PubSubReceiver,
88 client: InnerClient,
89 ) -> Self {
90 Self {
91 closed: false,
92 channels: Vec::new(),
93 patterns: Vec::new(),
94 shardchannels,
95 receiver,
96 client,
97 }
98 }
99
100 pub async fn close(&mut self) -> Result<()> {
101 let mut channels = Vec::<String>::new();
102 std::mem::swap(&mut channels, &mut self.channels);
103 if !channels.is_empty() {
104 self.client.unsubscribe(channels).await?;
105 }
106
107 let mut patterns = Vec::<String>::new();
108 std::mem::swap(&mut patterns, &mut self.patterns);
109 if !patterns.is_empty() {
110 self.client.punsubscribe(patterns).await?;
111 }
112
113 let mut shardchannels = Vec::<String>::new();
114 std::mem::swap(&mut shardchannels, &mut self.shardchannels);
115 if !shardchannels.is_empty() {
116 self.client.sunsubscribe(shardchannels).await?;
117 }
118
119 self.closed = true;
120
121 Ok(())
122 }
123}
124
125impl Stream for PubSubStream {
126 type Item = Result<Value>;
127
128 fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
129 if self.closed {
130 Poll::Ready(None)
131 } else {
132 self.get_mut().receiver.poll_next_unpin(cx)
133 }
134 }
135}
136
137impl Drop for PubSubStream {
138 fn drop(&mut self) {
139 if self.closed {
140 return;
141 }
142
143 let mut channels = Vec::<String>::new();
144 std::mem::swap(&mut channels, &mut self.channels);
145 if !channels.is_empty() {
146 let _result = self.client.unsubscribe(channels).forget();
147 }
148
149 let mut patterns = Vec::<String>::new();
150 std::mem::swap(&mut patterns, &mut self.patterns);
151 if !patterns.is_empty() {
152 let _result = self.client.punsubscribe(patterns).forget();
153 }
154
155 let mut shardchannels = Vec::<String>::new();
156 std::mem::swap(&mut shardchannels, &mut self.shardchannels);
157 if !shardchannels.is_empty() {
158 let _result = self.client.sunsubscribe(shardchannels).forget();
159 }
160 }
161}