1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
use std::sync::{Arc, Mutex};
use crate::{ConnectionId, connection::ConnectionHandle};
use samod_core::ListenerId;
use crate::{ConnFinishedReason, PeerInfo, Repo, Stopped, unbounded};
/// Lifecycle events for an acceptor.
#[derive(Debug, Clone)]
pub enum AcceptorEvent {
/// A new inbound connection completed its handshake.
ClientConnected {
/// Information about the connected peer.
peer_info: PeerInfo,
/// The connection ID for correlation.
connection_id: ConnectionId,
},
/// An inbound connection was lost.
ClientDisconnected {
/// The connection ID that was lost.
connection_id: ConnectionId,
/// The reason the connection was lost.
reason: ConnFinishedReason,
},
}
/// Handle to an acceptor (listener) endpoint that accepts inbound connections.
///
/// Returned by [`Repo::make_acceptor()`]. Represents a listener endpoint that
/// accepts inbound connections. The handle is used to feed individual
/// accepted connections via [`AcceptorHandle::accept()`].
///
/// The handle can be used to:
/// - Accept inbound connections with [`AcceptorHandle::accept()`]
/// - Observe lifecycle events with [`AcceptorHandle::events()`]
/// - Check connection count with [`AcceptorHandle::connection_count()`]
/// - Shut down the acceptor with [`AcceptorHandle::close()`]
#[derive(Clone)]
pub struct AcceptorHandle {
inner: Arc<Mutex<AcceptorHandleInner>>,
listener_id: ListenerId,
repo: Repo,
}
struct AcceptorHandleInner {
/// Number of currently active connections.
active_connection_count: usize,
/// Senders for event subscribers.
event_senders: Vec<unbounded::UnboundedSender<AcceptorEvent>>,
}
impl AcceptorHandle {
pub(crate) fn new(listener_id: ListenerId, repo: Repo) -> Self {
Self {
inner: Arc::new(Mutex::new(AcceptorHandleInner {
active_connection_count: 0,
event_senders: Vec::new(),
})),
listener_id,
repo,
}
}
/// The listener ID for this acceptor.
pub fn id(&self) -> ListenerId {
self.listener_id
}
/// Accept an inbound connection.
///
/// Wires up the transport to the hub and starts driving the connection.
/// This is typically called from a server framework's connection handler
/// (e.g. an axum WebSocket upgrade handler).
pub fn accept(&self, transport: crate::Transport) -> Result<ConnectionHandle, Stopped> {
self.repo.accept_on_listener(self.listener_id, transport)
}
/// Accept the length delimited framed connection created by
/// [`TcpDialer`](crate::tokio_io::TcpDialer). See the documentation for
/// that type for examples.
#[cfg(feature = "tokio")]
pub fn accept_tokio_io<S>(&self, io: S) -> Result<ConnectionHandle, Stopped>
where
S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Send + Unpin + 'static,
{
use crate::Transport;
self.repo
.accept_on_listener(self.listener_id, Transport::from_tokio_io(io))
}
/// Returns a stream of lifecycle events for this acceptor.
///
/// The stream yields events for client connections and disconnections.
/// Useful for metrics (e.g. Prometheus gauges for active connection count).
pub fn events(&self) -> impl futures::Stream<Item = AcceptorEvent> + Unpin {
let (tx, rx) = unbounded::channel();
let mut inner = self.inner.lock().unwrap();
inner.event_senders.push(tx);
rx
}
/// Returns the number of currently active connections on this endpoint.
pub fn connection_count(&self) -> usize {
self.inner.lock().unwrap().active_connection_count
}
/// Shut down this acceptor. Closes all active connections and stops
/// accepting new ones.
pub fn close(&self) {
let _ = self.repo.remove_listener_by_id(self.listener_id);
}
// -- Internal methods called from Inner::handle_event --
/// Notify the handle that a client connected.
pub(crate) fn notify_client_connected(&self, peer_info: PeerInfo, connection_id: ConnectionId) {
let mut inner = self.inner.lock().unwrap();
inner.active_connection_count += 1;
let event = AcceptorEvent::ClientConnected {
peer_info,
connection_id,
};
inner
.event_senders
.retain(|tx| tx.unbounded_send(event.clone()).is_ok());
}
/// Notify the handle that a client disconnected.
pub(crate) fn notify_client_disconnected(
&self,
connection_id: ConnectionId,
reason: ConnFinishedReason,
) {
let mut inner = self.inner.lock().unwrap();
inner.active_connection_count = inner.active_connection_count.saturating_sub(1);
let event = AcceptorEvent::ClientDisconnected {
connection_id,
reason,
};
inner
.event_senders
.retain(|tx| tx.unbounded_send(event.clone()).is_ok());
}
}
impl std::fmt::Debug for AcceptorHandle {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("AcceptorHandle")
.field("listener_id", &self.listener_id)
.finish()
}
}