1pub mod status;
2pub use status::*;
3
4use std::{
5 convert::TryFrom,
6 fmt::Debug,
7 io,
8 net::SocketAddr,
9 time::{Duration, Instant},
10};
11
12use bytes::Bytes;
13
14use crate::{
15 options::*,
16 packet::*,
17 protocol::{
18 handshake::Handshake,
19 output::Output,
20 receiver::{Receiver, ReceiverContext},
21 sender::{Sender, SenderContext},
22 time::Timers,
23 },
24 settings::CipherSettings,
25 statistics::SocketStatistics,
26};
27
28#[derive(Debug, Eq, PartialEq)]
29pub struct Connection {
30 pub settings: ConnectionSettings,
31 pub handshake: Handshake,
32}
33
34#[derive(Debug, Eq, PartialEq, Clone)]
35pub struct ConnectionSettings {
36 pub remote: SocketAddr,
38
39 pub remote_sockid: SocketId,
41
42 pub local_sockid: SocketId,
44
45 pub socket_start_time: Instant,
48
49 pub rtt: Duration,
51
52 pub init_seq_num: SeqNumber,
54
55 pub max_packet_size: PacketSize,
57
58 pub max_flow_size: PacketCount,
60
61 pub send_tsbpd_latency: Duration,
63 pub recv_tsbpd_latency: Duration,
64
65 pub too_late_packet_drop: bool,
69
70 pub peer_idle_timeout: Duration,
71
72 pub recv_buffer_size: PacketCount,
74 pub send_buffer_size: PacketCount,
76 pub cipher: Option<CipherSettings>,
77 pub stream_id: Option<String>,
78 pub bandwidth: LiveBandwidthMode,
79 pub statistics_interval: Duration,
80}
81
82#[derive(Debug)]
83pub struct DuplexConnection {
84 settings: ConnectionSettings,
85 timers: Timers,
86 handshake: Handshake,
87 output: Output,
88 sender: Sender,
89 receiver: Receiver,
90 stats: SocketStatistics,
91 status: ConnectionStatus,
92}
93
94#[allow(clippy::large_enum_variant)]
95#[derive(Debug, Clone, Eq, PartialEq)]
96pub enum Action<'a> {
97 ReleaseData((Instant, Bytes)),
98 SendPacket((Packet, SocketAddr)),
99 UpdateStatistics(&'a SocketStatistics),
100 WaitForData(Duration),
101 Close,
102}
103
104#[allow(clippy::large_enum_variant)]
105#[derive(Debug)]
106pub enum Input {
107 Data(Option<(Instant, Bytes)>),
108 Packet(ReceivePacketResult),
109 DataReleased,
110 PacketSent,
111 StatisticsUpdated,
112 Timer,
113}
114
115impl DuplexConnection {
116 pub fn new(connection: Connection) -> DuplexConnection {
117 let settings = connection.settings;
118
119 DuplexConnection {
120 settings: settings.clone(),
121 handshake: connection.handshake,
122 output: Output::new(&settings),
123 status: ConnectionStatus::new(settings.send_tsbpd_latency * 2), timers: Timers::new(
125 settings.socket_start_time,
126 settings.statistics_interval,
127 settings.peer_idle_timeout,
128 ),
129 stats: SocketStatistics::new(),
130 receiver: Receiver::new(settings.clone()),
131 sender: Sender::new(settings),
132 }
133 }
134
135 pub fn handle_input(&mut self, now: Instant, input: Input) -> Action {
136 self.debug(now, "input", &input);
137
138 match input {
139 Input::Data(data) => self.handle_data_input(now, data),
140 Input::Packet(packet) => self.handle_packet_input(now, packet),
141 _ => {}
142 };
143
144 let action = if self.should_close(now) {
145 Action::Close
146 } else if self.should_update_statistics(now) {
147 self.update_statistics(now);
148 Action::UpdateStatistics(&self.stats)
149 } else if let Some(packet) = self.next_packet(now) {
150 Action::SendPacket(packet)
151 } else if let Some(data) = self.next_data(now) {
152 Action::ReleaseData(data)
153 } else {
154 Action::WaitForData(self.next_timer(now) - now)
155 };
156
157 self.debug(now, "action", &action);
158 action
159 }
160
161 pub fn is_open(&self) -> bool {
162 self.status.is_open()
163 }
164
165 pub fn settings(&self) -> &ConnectionSettings {
166 &self.settings
167 }
168
169 pub fn update_statistics(&mut self, now: Instant) {
170 self.stats.elapsed_time = now - self.settings.socket_start_time;
171 self.stats.tx_buffered_time = self.sender.tx_buffered_time();
172 self.stats.tx_buffered_data = self.sender.tx_buffered_packets();
173 self.stats.tx_buffered_bytes = self.sender.tx_buffered_bytes();
174
175 self.stats.rx_acknowledged_time = self.receiver.rx_acknowledged_time();
176 }
177
178 pub fn next_packet(&mut self, now: Instant) -> Option<(Packet, SocketAddr)> {
179 let p = self.output.pop_packet()?;
180 self.stats.tx_all_packets += 1;
181 self.stats.tx_all_bytes += u64::try_from(p.wire_size()).unwrap();
182
183 match &p {
185 Packet::Data(d) => {
186 self.stats.tx_data += 1;
187 self.stats.tx_bytes += u64::try_from(d.wire_size()).unwrap();
188 }
189 Packet::Control(c) => match c.control_type {
190 ControlTypes::Ack(ref a) => {
191 self.stats.tx_ack += 1;
192 if matches!(a, Acknowledgement::Lite(_)) {
193 self.stats.tx_light_ack += 1;
194 }
195 }
196 ControlTypes::Nak(_) => {
197 self.stats.tx_nak += 1;
198 }
199 ControlTypes::Ack2(_) => {
200 self.stats.tx_ack2 += 1;
201 }
202 _ => {}
203 },
204 }
205 self.debug(now, "send", &p);
206 Some((p, self.settings.remote))
207 }
208
209 pub fn next_data(&mut self, now: Instant) -> Option<(Instant, Bytes)> {
210 match self.receiver.arq.pop_next_message(now) {
211 Ok(Some(data)) => {
212 self.debug(now, "output", &data);
213 Some(data)
214 }
215 Err(error) => {
216 self.warn(now, "output", &error);
217 let dropped = error.too_late_packets.end - error.too_late_packets.start;
218 self.stats.rx_dropped_data += dropped as u64;
219 None
220 }
221 _ => None,
222 }
223 }
224
225 pub fn next_timer(&self, now: Instant) -> Instant {
226 let has_packets_to_send = self.sender.has_packets_to_send();
227 let next_message = self.receiver.arq.next_message_release_time();
228 let unacked_packets = self.receiver.arq.unacked_packet_count();
229 self.timers
230 .next_timer(now, has_packets_to_send, next_message, unacked_packets)
231 }
232
233 pub fn should_close(&mut self, now: Instant) -> bool {
234 if !self.is_open() {
235 true
236 } else {
237 self.check_timers(now);
238 false
239 }
240 }
241
242 pub fn should_update_statistics(&mut self, now: Instant) -> bool {
243 self.timers.check_statistics(now).is_some()
244 }
245
246 pub fn statistics(&self) -> &SocketStatistics {
247 &self.stats
248 }
249
250 pub fn check_timers(&mut self, now: Instant) -> Instant {
251 if self.timers.check_full_ack(now).is_some() {
252 self.receiver().on_full_ack_event(now);
253 }
254 if self.timers.check_nak(now).is_some() {
255 self.receiver().on_nak_event(now);
256 }
257 if self.timers.check_peer_idle_timeout(now).is_some() {
258 self.on_peer_idle_timeout(now);
259 }
260 if let Some(elapsed_periods) = self.timers.check_snd(now) {
261 self.sender().on_snd_event(now, elapsed_periods)
262 }
263
264 if self.status.check_receive_close_timeout(
265 now,
266 self.receiver.is_flushed(),
267 self.settings.local_sockid,
268 ) {
269 self.receiver().on_close_timeout(now);
270 }
271 if self.status.check_sender_shutdown(
272 now,
273 self.sender.is_flushed(),
274 self.receiver.is_flushed(),
275 self.output.is_empty(),
276 ) {
277 self.output.send_control(now, ControlTypes::Shutdown);
278 }
279
280 self.output.ensure_alive(now);
281
282 self.next_timer(now)
283 }
284
285 pub fn handle_data_input(&mut self, now: Instant, data: Option<(Instant, Bytes)>) {
286 self.debug(now, "input", &data);
287 match data {
288 Some(item) => {
289 self.sender().handle_data(now, item);
290 }
291 None => {
292 self.handle_data_stream_close(now);
293 }
294 }
295 }
296
297 pub fn handle_packet_input(&mut self, now: Instant, packet: ReceivePacketResult) {
298 self.debug(now, "packet", &packet);
299 use ReceivePacketError::*;
300 match packet {
301 Ok(packet) => self.handle_packet(now, packet),
302 Err(Io(error)) => self.handle_socket_close(now, error),
303 Err(Parse(e)) => self.warn(now, "packet", &e),
304 }
305 }
306
307 fn handle_data_stream_close(&mut self, now: Instant) {
308 self.info(now, "closed data", &());
309 self.status.on_data_stream_closed(now);
310 }
311
312 fn handle_socket_close(&mut self, now: Instant, error: io::Error) {
313 self.warn(now, "closed socket", &error);
314 self.status.on_socket_closed(now);
315 }
316
317 pub fn on_peer_idle_timeout(&mut self, now: Instant) {
318 self.output.send_control(now, ControlTypes::Shutdown);
319 self.status.on_peer_idle_timeout(now);
320 }
321
322 fn handle_packet(&mut self, now: Instant, (packet, from): (Packet, SocketAddr)) {
323 if from != self.settings.remote {
326 self.info(now, "invalid address", &(packet, from));
327 return;
328 }
329
330 if self.settings.local_sockid != packet.dest_sockid() {
331 self.info(now, "invalid socket id", &(packet, from));
332 return;
333 }
334
335 self.timers.reset_exp(now);
336
337 self.stats.rx_all_packets += 1;
338 self.stats.rx_all_bytes += u64::try_from(packet.wire_size()).unwrap();
339 match packet {
340 Packet::Data(data) => self.receiver().handle_data_packet(now, data),
341 Packet::Control(control) => self.handle_control_packet(now, control),
342 }
343 }
344
345 fn handle_control_packet(&mut self, now: Instant, control: ControlPacket) {
346 self.receiver().synchronize_clock(now, control.timestamp);
347
348 use ControlTypes::*;
349 match control.control_type {
350 Ack(ack) => self.sender().handle_ack_packet(now, ack),
352 DropRequest { range, .. } => self.receiver().handle_drop_request(now, range),
353 Handshake(shake) => self.handle_handshake_packet(now, shake),
354 Nak(nak) => self.sender().handle_nak_packet(now, nak),
355 Ack2(seq_num) => self.receiver().handle_ack2_packet(now, seq_num),
357 Shutdown => self
359 .status
360 .handle_shutdown_packet(now, self.settings.local_sockid),
361 KeepAlive => {}
363 CongestionWarning => todo!(),
367 PeerError(_) => todo!(),
369 Srt(s) => self.handle_srt_control_packet(now, s),
371 }
372 }
373
374 fn handle_handshake_packet(&mut self, now: Instant, handshake: HandshakeControlInfo) {
375 if let Some(control) = self.handshake.handle_handshake(handshake) {
376 self.output.send_control(now, control);
377 }
378 }
379
380 fn handle_srt_control_packet(&mut self, now: Instant, pack: SrtControlPacket) {
381 use self::SrtControlPacket::*;
382 match pack {
383 HandshakeRequest(_) | HandshakeResponse(_) => self.warn(now, "handshake", &pack),
384 KeyRefreshRequest(keying_material) => self
385 .receiver()
386 .handle_key_refresh_request(now, keying_material),
387 KeyRefreshResponse(keying_material) => {
388 self.sender().handle_key_refresh_response(keying_material)
389 }
390 _ => unimplemented!("{:?}", pack),
391 }
392 }
393
394 fn sender(&mut self) -> SenderContext {
395 SenderContext::new(
396 &mut self.status,
397 &mut self.timers,
398 &mut self.output,
399 &mut self.stats,
400 &mut self.sender,
401 )
402 }
403
404 fn receiver(&mut self) -> ReceiverContext {
405 ReceiverContext::new(
406 &mut self.timers,
407 &mut self.output,
408 &mut self.stats,
409 &mut self.receiver,
410 )
411 }
412
413 fn debug(&self, now: Instant, tag: &str, debug: &impl Debug) {
414 log::debug!(
415 "{:?}|{:?}|{} - {:?}",
416 TimeSpan::from_interval(self.settings.socket_start_time, now),
417 self.settings.local_sockid,
418 tag,
419 debug
420 );
421 }
422
423 fn info(&self, now: Instant, tag: &str, debug: &impl Debug) {
424 log::info!(
425 "{:?}|{:?}|{} - {:?}",
426 TimeSpan::from_interval(self.settings.socket_start_time, now),
427 self.settings.local_sockid,
428 tag,
429 debug
430 );
431 }
432
433 fn warn(&self, now: Instant, tag: &str, debug: &impl Debug) {
434 log::warn!(
435 "{:?}|{:?}|{} - {:?}",
436 TimeSpan::from_interval(self.settings.socket_start_time, now),
437 self.settings.local_sockid,
438 tag,
439 debug
440 );
441 }
442}
443
444#[cfg(test)]
445mod duplex_connection {
446 use assert_matches::assert_matches;
447
448 use Action::*;
449 use ControlTypes::*;
450 use Packet::*;
451
452 use crate::protocol::time::Rtt;
453
454 use super::*;
455
456 const MILLIS: Duration = Duration::from_millis(1);
457 const SND: Duration = MILLIS;
458 const TSBPD: Duration = Duration::from_secs(1);
459
460 fn remote_addr() -> SocketAddr {
461 ([127, 0, 0, 1], 2223).into()
462 }
463
464 fn remote_sockid() -> SocketId {
465 SocketId(2)
466 }
467
468 fn local_sockid() -> SocketId {
469 SocketId(2)
470 }
471
472 fn new_connection(now: Instant) -> Connection {
473 Connection {
474 settings: ConnectionSettings {
475 remote: remote_addr(),
476 remote_sockid: remote_sockid(),
477 local_sockid: local_sockid(),
478 socket_start_time: now,
479 rtt: Duration::default(),
480 init_seq_num: SeqNumber::new_truncate(0),
481 max_packet_size: PacketSize(1316),
482 max_flow_size: PacketCount(8192),
483 send_tsbpd_latency: TSBPD,
484 recv_tsbpd_latency: TSBPD,
485 recv_buffer_size: PacketCount(1024),
486 send_buffer_size: PacketCount(1024),
487 cipher: None,
488 stream_id: None,
489 bandwidth: LiveBandwidthMode::Unlimited,
490 statistics_interval: Duration::from_secs(10),
491 peer_idle_timeout: Duration::from_secs(5),
492 too_late_packet_drop: true,
493 },
494 handshake: crate::protocol::handshake::Handshake::Connector,
495 }
496 }
497
498 #[test]
499 fn input_data_close() {
500 let start = Instant::now();
501 let mut connection = DuplexConnection::new(new_connection(start));
502
503 let mut now = start;
504 assert_matches!(connection.handle_input(now, Input::Timer), WaitForData(_));
505 assert_eq!(
506 connection.handle_input(now, Input::Data(Some((start, Bytes::new())))),
507 WaitForData(SND)
508 );
509 assert_eq!(
510 connection.handle_input(now, Input::Data(Some((start, Bytes::new())))),
511 WaitForData(SND)
512 );
513
514 assert_eq!(
515 connection.handle_input(now, Input::Data(None)),
516 WaitForData(SND),
517 "input data 'close' should drain the send buffers"
518 );
519
520 now += SND;
521 assert_matches!(
522 connection.handle_input(now, Input::Timer),
523 SendPacket((Data(_), _))
524 );
525 assert_matches!(connection.handle_input(now, Input::Timer), WaitForData(_));
527
528 now += SND;
529 assert_matches!(
530 connection.handle_input(now, Input::Timer),
531 SendPacket((Data(_), _))
532 );
533
534 let packet = Control(ControlPacket {
536 timestamp: TimeStamp::MIN,
537 dest_sockid: SocketId(2),
538 control_type: Ack(Acknowledgement::Full(
539 SeqNumber(1),
540 AckStatistics {
541 rtt: Rtt::new(TimeSpan::ZERO, TimeSpan::ZERO),
542 buffer_available: 10000,
543 packet_receive_rate: None,
544 estimated_link_capacity: None,
545 data_receive_rate: None,
546 },
547 FullAckSeqNumber::INITIAL,
548 )),
549 });
550 assert_eq!(
551 connection.handle_input(now, Input::Packet(Ok((packet, remote_addr())))),
552 SendPacket((
553 Control(ControlPacket {
554 timestamp: TimeStamp::from_micros(2_000),
555 dest_sockid: SocketId(2),
556 control_type: Ack2(FullAckSeqNumber::INITIAL),
557 }),
558 remote_addr()
559 ))
560 );
561
562 now += Duration::from_secs(4);
565 assert_matches!(
566 connection.handle_input(now, Input::Timer),
567 SendPacket((
568 Control(ControlPacket {
569 control_type: Shutdown,
570 ..
571 }),
572 _
573 ))
574 );
575 assert_matches!(connection.handle_input(now, Input::Timer), WaitForData(_));
576 assert_eq!(connection.handle_input(now, Input::Timer), Close);
577 }
578
579 #[test]
580 fn too_late_packet_drop() {
581 let start = Instant::now();
582 let mut connection = DuplexConnection::new(new_connection(start));
583
584 let mut now = start;
585 assert_matches!(connection.handle_input(now, Input::Timer), WaitForData(_));
586 assert_eq!(
587 connection.handle_input(now, Input::Data(Some((start, Bytes::new())))),
588 WaitForData(SND)
589 );
590 assert_eq!(
593 connection.handle_input(now, Input::Data(Some((start, Bytes::new())))),
594 WaitForData(SND)
595 );
596
597 now += SND;
598 assert_matches!(
599 connection.handle_input(now, Input::Timer),
600 SendPacket((Data(DataPacket { seq_number, retransmitted: false, .. }), _)) if seq_number.0 == 0
601 );
602 assert_matches!(connection.handle_input(now, Input::Timer), WaitForData(SND));
603
604 now += TSBPD;
606
607 assert_matches!(
608 connection.handle_input(now, Input::Timer),
609 SendPacket((Data(DataPacket {seq_number, retransmitted: true, ..}), _)) if seq_number.0 == 0
610 );
611 assert_matches!(
612 connection.handle_input(now, Input::Timer),
613 SendPacket((Data(DataPacket {seq_number, retransmitted: false, ..}), _)) if seq_number.0 == 1
614 );
615
616 assert_matches!(connection.handle_input(now, Input::Timer), WaitForData(_));
617
618 now += TSBPD / 4; assert_matches!(connection.handle_input(now, Input::Timer), WaitForData(_));
620
621 assert_eq!(
639 connection.handle_input(
640 now,
641 Input::Packet(Ok((
642 Control(ControlPacket {
643 timestamp: TimeStamp::MIN + SND + TSBPD + TSBPD / 4,
644 dest_sockid: remote_sockid(),
645 control_type: Nak((SeqNumber(0)..SeqNumber(2)).into()),
646 }),
647 remote_addr()
648 )))
649 ),
650 SendPacket((
651 Control(ControlPacket {
652 timestamp: TimeStamp::MIN + SND + TSBPD + TSBPD / 4,
653 dest_sockid: remote_sockid(),
654 control_type: DropRequest {
655 msg_to_drop: MsgNumber(0),
656 range: SeqNumber(0)..=SeqNumber(1)
657 }
658 }),
659 remote_addr()
660 ))
661 );
662 }
663}