srt_protocol/listener/
mod.rs

1mod input;
2mod session;
3mod statistics;
4
5use std::{collections::HashMap, fmt::Debug, net::SocketAddr, time::Duration, time::Instant};
6
7use crate::{packet::*, protocol::time::Timer, settings::ConnInitSettings};
8
9use session::*;
10
11pub use input::*;
12pub use statistics::*;
13
14#[derive(Clone, Debug)]
15pub struct ListenerSettings {}
16
17#[derive(Debug)]
18pub struct MultiplexListener {
19    start_time: Instant,
20    local_address: SocketAddr,
21    settings: ConnInitSettings,
22    sessions: HashMap<SessionId, SessionState>,
23    stats: ListenerStatistics,
24    stats_timer: Timer,
25}
26
27impl MultiplexListener {
28    pub fn new(now: Instant, local_address: SocketAddr, settings: ConnInitSettings) -> Self {
29        Self {
30            start_time: now,
31            local_address,
32            settings,
33            sessions: Default::default(),
34            stats: Default::default(),
35            stats_timer: Timer::new(now, Duration::from_secs(1)),
36        }
37    }
38
39    pub fn handle_input(&mut self, now: Instant, input: Input) -> Action {
40        match input {
41            Input::Packet(packet) => self.handle_input_packet(now, packet),
42            Input::AccessResponse(response) => self.handle_input_access_response(now, response),
43            Input::Timer => self.handle_timer(now),
44            Input::Success(result_of) => self.handle_success(now, result_of),
45            Input::Failure(result_of) => self.handle_failure(now, result_of),
46        }
47    }
48
49    fn handle_input_packet(&mut self, now: Instant, packet: ReceivePacketResult) -> Action {
50        match packet {
51            Ok(packet) => self.handle_packet(now, packet),
52            Err(error) => self.handle_packet_receive_error(now, error),
53        }
54    }
55
56    fn handle_input_access_response(
57        &mut self,
58        now: Instant,
59        response: Option<(SessionId, AccessControlResponse)>,
60    ) -> Action {
61        match response {
62            Some((session_id, response)) => {
63                self.handle_access_control_response(now, session_id, response)
64            }
65            None => self.handle_close(),
66        }
67    }
68
69    fn handle_packet(&mut self, now: Instant, packet: (Packet, SocketAddr)) -> Action {
70        self.stats.rx_packets += 1;
71        //self.stats.rx_bytes += packet
72        let session_id = SessionId(packet.1);
73        let settings = &self.settings;
74        self.sessions
75            .entry(session_id)
76            .or_insert_with(|| SessionState::new_pending(settings.clone()))
77            .handle_packet(now, session_id, packet)
78    }
79
80    fn handle_packet_receive_error(&mut self, now: Instant, error: ReceivePacketError) -> Action {
81        self.warn(now, "packet", &error);
82
83        use ReceivePacketError::*;
84        match error {
85            Parse(_) => self.stats.rx_parse_errors += 1,
86            Io(_) => self.stats.rx_io_errors += 1,
87        }
88
89        Action::WaitForInput
90    }
91
92    fn handle_access_control_response(
93        &mut self,
94        now: Instant,
95        session_id: SessionId,
96        response: AccessControlResponse,
97    ) -> Action {
98        match self.sessions.get_mut(&session_id) {
99            Some(session) => session.handle_access_control_response(now, session_id, response),
100            None => Action::DropConnection(session_id),
101        }
102    }
103
104    fn handle_timer(&mut self, now: Instant) -> Action {
105        if self.stats_timer.check_expired(now).is_some() {
106            Action::UpdateStatistics(&self.stats)
107        } else {
108            // TODO: create an action that returns an action with an Iterator that ticks time forward
109            //  for all the sessions, yielding the results to the I/O loop
110            Action::WaitForInput
111        }
112    }
113
114    fn handle_success(&mut self, _now: Instant, result_of: ResultOf) -> Action {
115        use ResultOf::*;
116        match result_of {
117            SendPacket(_) => {
118                self.stats.tx_packets += 1;
119            }
120            RequestAccess(_) => {
121                self.stats.cx_inbound += 1;
122            }
123            RejectConnection(session_id) => {
124                self.stats.cx_rejected += 1;
125                self.sessions.remove(&session_id);
126            }
127            OpenConnection(_) => {
128                self.stats.cx_opened += 1;
129            }
130            DelegatePacket(_) => {
131                self.stats.delegated_packets += 1;
132            }
133            DropConnection(session_id) => {
134                self.stats.cx_dropped += 1;
135                self.sessions.remove(&session_id);
136            }
137            UpdateStatistics => {}
138        }
139        Action::WaitForInput
140    }
141
142    fn handle_failure(&mut self, now: Instant, result_of: ResultOf) -> Action {
143        self.warn(now, "failure", &result_of);
144
145        use ResultOf::*;
146        match result_of {
147            DelegatePacket(session_id) => Action::DropConnection(session_id),
148            // TODO: stats? anything else?
149            _ => Action::WaitForInput,
150        }
151    }
152
153    fn handle_close(&mut self) -> Action {
154        Action::Close
155    }
156
157    fn warn(&self, now: Instant, tag: &str, debug: &impl Debug) {
158        log::warn!(
159            "{:?}|listen:{}|{} - {:?}",
160            TimeSpan::from_interval(self.start_time, now),
161            self.local_address.port(),
162            tag,
163            debug
164        );
165    }
166}
167
168#[cfg(test)]
169mod test {
170    use assert_matches::assert_matches;
171    use std::{
172        net::{IpAddr, Ipv4Addr},
173        time::Duration,
174    };
175
176    use rand::random;
177
178    use crate::options::{KeySize, PacketCount, PacketSize, SrtVersion};
179
180    use super::*;
181
182    fn conn_addr() -> SocketAddr {
183        SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8765)
184    }
185
186    fn test_induction() -> HandshakeControlInfo {
187        HandshakeControlInfo {
188            init_seq_num: random(),
189            max_packet_size: PacketSize(1316),
190            max_flow_size: PacketCount(256_000),
191            shake_type: ShakeType::Induction,
192            socket_id: SocketId(15),
193            syn_cookie: 0,
194            peer_addr: IpAddr::from([127, 0, 0, 1]),
195            info: HandshakeVsInfo::V5(HsV5Info::default()),
196        }
197    }
198
199    fn test_conclusion() -> HandshakeControlInfo {
200        HandshakeControlInfo {
201            init_seq_num: random(),
202            max_packet_size: PacketSize(1316),
203            max_flow_size: PacketCount(256_000),
204            shake_type: ShakeType::Conclusion,
205            socket_id: SocketId(15),
206            syn_cookie: crate::protocol::pending_connection::cookie::gen_cookie(&conn_addr()),
207            peer_addr: IpAddr::from([127, 0, 0, 1]),
208            info: HandshakeVsInfo::V5(HsV5Info {
209                key_size: KeySize::Unspecified,
210                ext_hs: Some(SrtControlPacket::HandshakeRequest(SrtHandshake {
211                    version: SrtVersion::CURRENT,
212                    flags: SrtShakeFlags::SUPPORTED,
213                    send_latency: Duration::from_secs(1),
214                    recv_latency: Duration::from_secs(2),
215                })),
216                ext_km: None,
217                ext_group: None,
218                sid: None,
219            }),
220        }
221    }
222
223    fn dest_sock_id() -> SocketId {
224        SocketId(6)
225    }
226
227    fn session_id() -> SessionId {
228        SessionId(conn_addr())
229    }
230
231    fn build_hs_pack(i: HandshakeControlInfo) -> Packet {
232        Packet::Control(ControlPacket {
233            timestamp: TimeStamp::from_micros(0),
234            dest_sockid: dest_sock_id(),
235            control_type: ControlTypes::Handshake(i),
236        })
237    }
238
239    #[test]
240    fn connect() {
241        let settings = ConnInitSettings::default();
242        let local = "0.0.0.0:2000".parse().unwrap();
243        let mut listener = MultiplexListener::new(Instant::now(), local, settings);
244
245        let packet = build_hs_pack(test_induction());
246        let action =
247            listener.handle_input(Instant::now(), Input::Packet(Ok((packet, conn_addr()))));
248        assert_matches!(action, Action::SendPacket(_));
249
250        let packet = build_hs_pack(test_conclusion());
251        let action =
252            listener.handle_input(Instant::now(), Input::Packet(Ok((packet, conn_addr()))));
253        assert_matches!(action, Action::RequestAccess(_, _));
254
255        let action = listener.handle_input(
256            Instant::now(),
257            Input::AccessResponse(Some((session_id(), AccessControlResponse::Accepted(None)))),
258        );
259        assert_matches!(action, Action::OpenConnection(_, _));
260
261        use crate::listener::ResultOf::*;
262
263        let action =
264            listener.handle_input(Instant::now(), Input::Success(OpenConnection(session_id())));
265        assert_matches!(action, Action::WaitForInput);
266
267        let packet = build_hs_pack(test_conclusion());
268        let action =
269            listener.handle_input(Instant::now(), Input::Packet(Ok((packet, conn_addr()))));
270        assert_matches!(action, Action::DelegatePacket(_, _));
271    }
272
273    #[test]
274    fn reject() {
275        let settings = ConnInitSettings::default();
276        let local = "127.0.0.1:2000".parse().unwrap();
277        let mut listener = MultiplexListener::new(Instant::now(), local, settings);
278
279        let packet = build_hs_pack(test_induction());
280        let action =
281            listener.handle_input(Instant::now(), Input::Packet(Ok((packet, conn_addr()))));
282        assert_matches!(action, Action::SendPacket(_));
283
284        let packet = build_hs_pack(test_conclusion());
285        let action =
286            listener.handle_input(Instant::now(), Input::Packet(Ok((packet, conn_addr()))));
287        assert_matches!(action, Action::RequestAccess(_, _));
288
289        let action = listener.handle_input(
290            Instant::now(),
291            Input::AccessResponse(Some((
292                session_id(),
293                AccessControlResponse::Rejected(RejectReason::User(100)),
294            ))),
295        );
296        assert_matches!(
297            action,
298            Action::RejectConnection(_, Some((Packet::Control(_), _)))
299        );
300
301        let packet = build_hs_pack(test_conclusion());
302        let action =
303            listener.handle_input(Instant::now(), Input::Packet(Ok((packet, conn_addr()))));
304        assert_matches!(
305            action,
306            Action::RejectConnection(_, Some((Packet::Control(_), _)))
307        );
308
309        let action = listener.handle_input(
310            Instant::now(),
311            Input::Success(ResultOf::RejectConnection(session_id())),
312        );
313        assert_eq!(action, Action::WaitForInput);
314    }
315}