1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
use std::fmt;
use std::io;
use std::pin::Pin;
use std::task::{Context, Poll};
use crate::asynk::client::Client;
use crate::asynk::message::Message;
use crate::smol::channel;
use crate::smol::prelude::*;
pub struct Subscription {
pub(crate) sid: u64,
pub(crate) subject: String,
pub(crate) messages: channel::Receiver<Message>,
pub(crate) client: Client,
}
impl Subscription {
pub(crate) fn new(
sid: u64,
subject: String,
messages: channel::Receiver<Message>,
client: Client,
) -> Subscription {
Subscription {
sid,
subject,
messages,
client,
}
}
pub(crate) fn try_next(&self) -> Option<Message> {
self.messages.try_recv().ok()
}
pub async fn drain(&self) -> io::Result<()> {
self.client.flush().await?;
self.client.unsubscribe(self.sid);
self.messages.close();
Ok(())
}
pub async fn unsubscribe(&self) -> io::Result<()> {
self.drain().await?;
while self.messages.try_recv().is_ok() {}
Ok(())
}
}
impl Drop for Subscription {
fn drop(&mut self) {
self.client.unsubscribe(self.sid);
}
}
impl fmt::Debug for Subscription {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
f.debug_struct("Subscription")
.field("sid", &self.sid)
.finish()
}
}
impl Stream for Subscription {
type Item = Message;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
Pin::new(&mut self.messages).poll_next(cx)
}
}