1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
use crate::Error;
use flume::{Receiver, Sender};
use futures_core::Stream;
use std::sync::Arc;
#[derive(Clone, Debug)]
// Wrap in an arc not to tamper with receiver count
pub(crate) struct Events(Arc<Inner>);
#[derive(Debug)]
struct Inner {
sender: Sender<Event>,
receiver: Receiver<Event>,
}
impl Events {
pub(crate) fn new() -> Self {
let (sender, receiver) = flume::unbounded();
Self(Arc::new(Inner { sender, receiver }))
}
pub(crate) fn sender(&self) -> EventsSender {
EventsSender(self.0.sender.clone())
}
pub(crate) fn listener(&self) -> impl Stream<Item = Event> + Send + 'static {
self.0.receiver.clone().into_stream()
}
}
#[derive(Clone, Debug)]
pub(crate) struct EventsSender(Sender<Event>);
impl EventsSender {
fn send(&self, event: Event) {
// Do nothing if we don't have at least one external receiver
if self.0.receiver_count() > 1 {
// The only possibility of error is if we have several external receivers and the
// connection was already dropped, so we can safely ignore this.
let _ = self.0.send(event);
}
}
pub(crate) fn connected(&self) {
self.send(Event::Connected);
}
pub(crate) fn connection_blocked(&self, reason: String) {
self.send(Event::ConnectionBlocked(reason));
}
pub(crate) fn connection_unblocked(&self) {
self.send(Event::ConnectionUnblocked);
}
pub(crate) fn send_flow(&self, active: bool) {
self.send(Event::SendFlow(active));
}
pub(crate) fn error(&self, error: Error) {
self.send(Event::Error(error));
}
}
/// A connection-level event delivered via [`Connection::events_listener`].
///
/// The stream produced by [`Connection::events_listener`] emits these values
/// as the connection progresses through its lifecycle.
///
/// [`Connection::events_listener`]: crate::Connection::events_listener
#[derive(Clone, Debug)]
#[non_exhaustive]
pub enum Event {
/// The connection has been established (or re-established after recovery).
Connected,
/// The broker has blocked the connection due to resource constraints.
/// The inner string is the human-readable reason supplied by the broker.
ConnectionBlocked(String),
/// The broker has unblocked the connection.
ConnectionUnblocked,
/// The broker has changed the allowed flow direction.
/// `true` means publishing is permitted; `false` means it is paused.
SendFlow(bool),
/// An error occurred on the connection.
Error(Error),
}