1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
use crate::{resp::BulkString, ConnectionMultiplexer, PubSubReceiver, Result};
use futures::{Stream, StreamExt};
use std::{
    pin::Pin,
    task::{Context, Poll},
};

pub struct PubSubStream {
    channel: String,
    receiver: PubSubReceiver,
    connection: ConnectionMultiplexer,
}

impl PubSubStream {
    pub(crate) fn new(
        channel: String,
        receiver: PubSubReceiver,
        connection: ConnectionMultiplexer,
    ) -> Self {
        Self {
            channel,
            receiver,
            connection,
        }
    }
}

impl Stream for PubSubStream {
    type Item = Result<BulkString>;

    fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
        self.get_mut().receiver.poll_next_unpin(cx)
    }
}

impl Drop for PubSubStream {
    fn drop(&mut self) {
        let mut channel = String::new();
        std::mem::swap(&mut channel, &mut self.channel);
        let _ = self.connection.unsubscribe(channel.into());
    }
}