1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
use std::fmt;
use std::io;
use std::pin::Pin;
use std::task::{Context, Poll};

use crate::asynk::client::Client;
use crate::asynk::message::Message;
use crate::smol::channel;
use crate::smol::prelude::*;

/// A subscription to a subject.
pub struct Subscription {
    /// Subscription ID.
    pub(crate) sid: u64,

    /// Subject.
    pub(crate) subject: String,

    /// MSG operations received from the server.
    pub(crate) messages: channel::Receiver<Message>,

    /// Client associated with subscription.
    pub(crate) client: Client,
}

impl Subscription {
    /// Creates a subscription.
    pub(crate) fn new(
        sid: u64,
        subject: String,
        messages: channel::Receiver<Message>,
        client: Client,
    ) -> Subscription {
        Subscription {
            sid,
            subject,
            messages,
            client,
        }
    }

    /// Attemps to receive the next message without blocking.
    pub(crate) fn try_next(&self) -> Option<Message> {
        self.messages.try_recv().ok()
    }

    /// Stops listening for new messages, but the remaining queued messages can still be received.
    pub async fn drain(&self) -> io::Result<()> {
        self.client.flush().await?;
        self.client.unsubscribe(self.sid);
        self.messages.close();
        Ok(())
    }

    /// Stops listening for new messages and discards the remaining queued messages.
    pub async fn unsubscribe(&self) -> io::Result<()> {
        self.drain().await?;
        // Discard all queued messages.
        while self.messages.try_recv().is_ok() {}
        Ok(())
    }
}

impl Drop for Subscription {
    fn drop(&mut self) {
        self.client.unsubscribe(self.sid);
    }
}

impl fmt::Debug for Subscription {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
        f.debug_struct("Subscription")
            .field("sid", &self.sid)
            .finish()
    }
}

impl Stream for Subscription {
    type Item = Message;

    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
        Pin::new(&mut self.messages).poll_next(cx)
    }
}