1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
use crate::{resp::BulkString, ConnectionMultiplexer, PubSubReceiver, Result};
use futures::{Stream, StreamExt};
use std::{
pin::Pin,
task::{Context, Poll},
};
pub struct PubSubStream {
channel: String,
receiver: PubSubReceiver,
connection: ConnectionMultiplexer,
}
impl PubSubStream {
pub(crate) fn new(
channel: String,
receiver: PubSubReceiver,
connection: ConnectionMultiplexer,
) -> Self {
Self {
channel,
receiver,
connection,
}
}
}
impl Stream for PubSubStream {
type Item = Result<BulkString>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
self.get_mut().receiver.poll_next_unpin(cx)
}
}
impl Drop for PubSubStream {
fn drop(&mut self) {
let mut channel = String::new();
std::mem::swap(&mut channel, &mut self.channel);
let _ = self.connection.unsubscribe(channel.into());
}
}