1mod input;
2mod session;
3mod statistics;
4
5use std::{collections::HashMap, fmt::Debug, net::SocketAddr, time::Duration, time::Instant};
6
7use crate::{packet::*, protocol::time::Timer, settings::ConnInitSettings};
8
9use session::*;
10
11pub use input::*;
12pub use statistics::*;
13
14#[derive(Clone, Debug)]
15pub struct ListenerSettings {}
16
17#[derive(Debug)]
18pub struct MultiplexListener {
19 start_time: Instant,
20 local_address: SocketAddr,
21 settings: ConnInitSettings,
22 sessions: HashMap<SessionId, SessionState>,
23 stats: ListenerStatistics,
24 stats_timer: Timer,
25}
26
27impl MultiplexListener {
28 pub fn new(now: Instant, local_address: SocketAddr, settings: ConnInitSettings) -> Self {
29 Self {
30 start_time: now,
31 local_address,
32 settings,
33 sessions: Default::default(),
34 stats: Default::default(),
35 stats_timer: Timer::new(now, Duration::from_secs(1)),
36 }
37 }
38
39 pub fn handle_input(&mut self, now: Instant, input: Input) -> Action {
40 match input {
41 Input::Packet(packet) => self.handle_input_packet(now, packet),
42 Input::AccessResponse(response) => self.handle_input_access_response(now, response),
43 Input::Timer => self.handle_timer(now),
44 Input::Success(result_of) => self.handle_success(now, result_of),
45 Input::Failure(result_of) => self.handle_failure(now, result_of),
46 }
47 }
48
49 fn handle_input_packet(&mut self, now: Instant, packet: ReceivePacketResult) -> Action {
50 match packet {
51 Ok(packet) => self.handle_packet(now, packet),
52 Err(error) => self.handle_packet_receive_error(now, error),
53 }
54 }
55
56 fn handle_input_access_response(
57 &mut self,
58 now: Instant,
59 response: Option<(SessionId, AccessControlResponse)>,
60 ) -> Action {
61 match response {
62 Some((session_id, response)) => {
63 self.handle_access_control_response(now, session_id, response)
64 }
65 None => self.handle_close(),
66 }
67 }
68
69 fn handle_packet(&mut self, now: Instant, packet: (Packet, SocketAddr)) -> Action {
70 self.stats.rx_packets += 1;
71 let session_id = SessionId(packet.1);
73 let settings = &self.settings;
74 self.sessions
75 .entry(session_id)
76 .or_insert_with(|| SessionState::new_pending(settings.clone()))
77 .handle_packet(now, session_id, packet)
78 }
79
80 fn handle_packet_receive_error(&mut self, now: Instant, error: ReceivePacketError) -> Action {
81 self.warn(now, "packet", &error);
82
83 use ReceivePacketError::*;
84 match error {
85 Parse(_) => self.stats.rx_parse_errors += 1,
86 Io(_) => self.stats.rx_io_errors += 1,
87 }
88
89 Action::WaitForInput
90 }
91
92 fn handle_access_control_response(
93 &mut self,
94 now: Instant,
95 session_id: SessionId,
96 response: AccessControlResponse,
97 ) -> Action {
98 match self.sessions.get_mut(&session_id) {
99 Some(session) => session.handle_access_control_response(now, session_id, response),
100 None => Action::DropConnection(session_id),
101 }
102 }
103
104 fn handle_timer(&mut self, now: Instant) -> Action {
105 if self.stats_timer.check_expired(now).is_some() {
106 Action::UpdateStatistics(&self.stats)
107 } else {
108 Action::WaitForInput
111 }
112 }
113
114 fn handle_success(&mut self, _now: Instant, result_of: ResultOf) -> Action {
115 use ResultOf::*;
116 match result_of {
117 SendPacket(_) => {
118 self.stats.tx_packets += 1;
119 }
120 RequestAccess(_) => {
121 self.stats.cx_inbound += 1;
122 }
123 RejectConnection(session_id) => {
124 self.stats.cx_rejected += 1;
125 self.sessions.remove(&session_id);
126 }
127 OpenConnection(_) => {
128 self.stats.cx_opened += 1;
129 }
130 DelegatePacket(_) => {
131 self.stats.delegated_packets += 1;
132 }
133 DropConnection(session_id) => {
134 self.stats.cx_dropped += 1;
135 self.sessions.remove(&session_id);
136 }
137 UpdateStatistics => {}
138 }
139 Action::WaitForInput
140 }
141
142 fn handle_failure(&mut self, now: Instant, result_of: ResultOf) -> Action {
143 self.warn(now, "failure", &result_of);
144
145 use ResultOf::*;
146 match result_of {
147 DelegatePacket(session_id) => Action::DropConnection(session_id),
148 _ => Action::WaitForInput,
150 }
151 }
152
153 fn handle_close(&mut self) -> Action {
154 Action::Close
155 }
156
157 fn warn(&self, now: Instant, tag: &str, debug: &impl Debug) {
158 log::warn!(
159 "{:?}|listen:{}|{} - {:?}",
160 TimeSpan::from_interval(self.start_time, now),
161 self.local_address.port(),
162 tag,
163 debug
164 );
165 }
166}
167
168#[cfg(test)]
169mod test {
170 use assert_matches::assert_matches;
171 use std::{
172 net::{IpAddr, Ipv4Addr},
173 time::Duration,
174 };
175
176 use rand::random;
177
178 use crate::options::{KeySize, PacketCount, PacketSize, SrtVersion};
179
180 use super::*;
181
182 fn conn_addr() -> SocketAddr {
183 SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8765)
184 }
185
186 fn test_induction() -> HandshakeControlInfo {
187 HandshakeControlInfo {
188 init_seq_num: random(),
189 max_packet_size: PacketSize(1316),
190 max_flow_size: PacketCount(256_000),
191 shake_type: ShakeType::Induction,
192 socket_id: SocketId(15),
193 syn_cookie: 0,
194 peer_addr: IpAddr::from([127, 0, 0, 1]),
195 info: HandshakeVsInfo::V5(HsV5Info::default()),
196 }
197 }
198
199 fn test_conclusion() -> HandshakeControlInfo {
200 HandshakeControlInfo {
201 init_seq_num: random(),
202 max_packet_size: PacketSize(1316),
203 max_flow_size: PacketCount(256_000),
204 shake_type: ShakeType::Conclusion,
205 socket_id: SocketId(15),
206 syn_cookie: crate::protocol::pending_connection::cookie::gen_cookie(&conn_addr()),
207 peer_addr: IpAddr::from([127, 0, 0, 1]),
208 info: HandshakeVsInfo::V5(HsV5Info {
209 key_size: KeySize::Unspecified,
210 ext_hs: Some(SrtControlPacket::HandshakeRequest(SrtHandshake {
211 version: SrtVersion::CURRENT,
212 flags: SrtShakeFlags::SUPPORTED,
213 send_latency: Duration::from_secs(1),
214 recv_latency: Duration::from_secs(2),
215 })),
216 ext_km: None,
217 ext_group: None,
218 sid: None,
219 }),
220 }
221 }
222
223 fn dest_sock_id() -> SocketId {
224 SocketId(6)
225 }
226
227 fn session_id() -> SessionId {
228 SessionId(conn_addr())
229 }
230
231 fn build_hs_pack(i: HandshakeControlInfo) -> Packet {
232 Packet::Control(ControlPacket {
233 timestamp: TimeStamp::from_micros(0),
234 dest_sockid: dest_sock_id(),
235 control_type: ControlTypes::Handshake(i),
236 })
237 }
238
239 #[test]
240 fn connect() {
241 let settings = ConnInitSettings::default();
242 let local = "0.0.0.0:2000".parse().unwrap();
243 let mut listener = MultiplexListener::new(Instant::now(), local, settings);
244
245 let packet = build_hs_pack(test_induction());
246 let action =
247 listener.handle_input(Instant::now(), Input::Packet(Ok((packet, conn_addr()))));
248 assert_matches!(action, Action::SendPacket(_));
249
250 let packet = build_hs_pack(test_conclusion());
251 let action =
252 listener.handle_input(Instant::now(), Input::Packet(Ok((packet, conn_addr()))));
253 assert_matches!(action, Action::RequestAccess(_, _));
254
255 let action = listener.handle_input(
256 Instant::now(),
257 Input::AccessResponse(Some((session_id(), AccessControlResponse::Accepted(None)))),
258 );
259 assert_matches!(action, Action::OpenConnection(_, _));
260
261 use crate::listener::ResultOf::*;
262
263 let action =
264 listener.handle_input(Instant::now(), Input::Success(OpenConnection(session_id())));
265 assert_matches!(action, Action::WaitForInput);
266
267 let packet = build_hs_pack(test_conclusion());
268 let action =
269 listener.handle_input(Instant::now(), Input::Packet(Ok((packet, conn_addr()))));
270 assert_matches!(action, Action::DelegatePacket(_, _));
271 }
272
273 #[test]
274 fn reject() {
275 let settings = ConnInitSettings::default();
276 let local = "127.0.0.1:2000".parse().unwrap();
277 let mut listener = MultiplexListener::new(Instant::now(), local, settings);
278
279 let packet = build_hs_pack(test_induction());
280 let action =
281 listener.handle_input(Instant::now(), Input::Packet(Ok((packet, conn_addr()))));
282 assert_matches!(action, Action::SendPacket(_));
283
284 let packet = build_hs_pack(test_conclusion());
285 let action =
286 listener.handle_input(Instant::now(), Input::Packet(Ok((packet, conn_addr()))));
287 assert_matches!(action, Action::RequestAccess(_, _));
288
289 let action = listener.handle_input(
290 Instant::now(),
291 Input::AccessResponse(Some((
292 session_id(),
293 AccessControlResponse::Rejected(RejectReason::User(100)),
294 ))),
295 );
296 assert_matches!(
297 action,
298 Action::RejectConnection(_, Some((Packet::Control(_), _)))
299 );
300
301 let packet = build_hs_pack(test_conclusion());
302 let action =
303 listener.handle_input(Instant::now(), Input::Packet(Ok((packet, conn_addr()))));
304 assert_matches!(
305 action,
306 Action::RejectConnection(_, Some((Packet::Control(_), _)))
307 );
308
309 let action = listener.handle_input(
310 Instant::now(),
311 Input::Success(ResultOf::RejectConnection(session_id())),
312 );
313 assert_eq!(action, Action::WaitForInput);
314 }
315}