1mod loss_compression;
2mod srt;
3
4pub use srt::*;
5
6use std::{
7 convert::TryFrom,
8 convert::TryInto,
9 fmt::{self, Debug, Display, Formatter},
10 iter::FromIterator,
11 mem::size_of,
12 net::{IpAddr, Ipv4Addr, Ipv6Addr},
13 ops::{Add, Range, RangeInclusive, Sub},
14};
15
16use bitflags::bitflags;
17use bytes::{Buf, BufMut};
18use log::warn;
19
20use crate::{
21 options::{KeySize, PacketCount, PacketSize},
22 protocol::time::Rtt,
23};
24
25use super::*;
26
27use loss_compression::{compress_loss_list, decompress_loss_list};
28
29#[derive(Clone, PartialEq, Eq)]
50pub struct ControlPacket {
51 pub timestamp: TimeStamp,
53
54 pub dest_sockid: SocketId,
56
57 pub control_type: ControlTypes,
59}
60
61impl ControlPacket {
62 pub const HEADER_SIZE: usize = super::Packet::HEADER_SIZE.0 as usize;
63}
64
65#[derive(Clone, PartialEq, Eq)]
67#[allow(clippy::large_enum_variant)]
68pub enum ControlTypes {
69 Handshake(HandshakeControlInfo),
72
73 KeepAlive,
76
77 Ack(Acknowledgement),
79
80 Nak(CompressedLossList),
83
84 CongestionWarning,
86
87 Shutdown,
89
90 Ack2(FullAckSeqNumber),
93
94 DropRequest {
96 msg_to_drop: MsgNumber,
99
100 range: RangeInclusive<SeqNumber>,
102 },
103
104 PeerError(u32),
106
107 Srt(SrtControlPacket),
110}
111
112bitflags! {
113 struct ExtFlags: u16 {
115 const HS = 0b1;
117 const KM = 0b10;
119 const CONFIG = 0b100;
121 }
122}
123
124#[derive(Clone, PartialEq, Eq, Debug, Default)]
125pub struct HsV5Info {
126 pub key_size: KeySize,
129
130 pub ext_hs: Option<SrtControlPacket>,
132
133 pub ext_km: Option<SrtControlPacket>,
135
136 pub ext_group: Option<SrtControlPacket>,
137
138 pub sid: Option<String>,
140}
141
142#[derive(Clone, PartialEq, Eq)]
144#[allow(clippy::large_enum_variant)]
145pub enum HandshakeVsInfo {
146 V4(SocketType),
147 V5(HsV5Info),
148}
149
150#[derive(Clone, PartialEq, Eq)]
152pub struct HandshakeControlInfo {
153 pub init_seq_num: SeqNumber,
155
156 pub max_packet_size: PacketSize,
158
159 pub max_flow_size: PacketCount,
161
162 pub shake_type: ShakeType,
164
165 pub socket_id: SocketId,
167
168 pub syn_cookie: i32,
174
175 pub peer_addr: IpAddr,
177
178 pub info: HandshakeVsInfo,
180}
181
182#[derive(Clone, PartialEq, Eq, Debug)]
183pub struct AckStatistics {
184 pub rtt: Rtt,
186 pub buffer_available: u32,
188 pub packet_receive_rate: Option<u32>,
190 pub estimated_link_capacity: Option<u32>,
192 pub data_receive_rate: Option<u32>,
194}
195
196#[derive(Clone, PartialEq, Eq, Debug, Copy, Ord, PartialOrd)]
197pub struct FullAckSeqNumber(u32);
198
199#[derive(Clone, PartialEq, Eq, Debug)]
213pub enum Acknowledgement {
214 Lite(SeqNumber),
215 Small(SeqNumber, AckStatistics),
216 Full(SeqNumber, AckStatistics, FullAckSeqNumber),
217}
218
219#[derive(Clone, Eq, PartialEq)]
220pub struct CompressedLossList(Vec<u32>);
221
222#[derive(Debug, Clone, Copy, PartialEq, Eq)]
224pub enum SocketType {
225 Stream = 1,
227
228 Datagram = 2,
230}
231
232#[derive(Debug, Clone, Copy, PartialEq, Eq)]
254pub enum ShakeType {
255 Induction,
257
258 Waveahand,
260
261 Conclusion,
264
265 Agreement,
267
268 Rejection(RejectReason),
270}
271
272#[derive(Debug, Clone, Copy, PartialEq, Eq)]
273#[non_exhaustive]
274pub enum CoreRejectReason {
275 System = 1001,
276 Peer = 1002,
277 Resource = 1003,
278 Rogue = 1004,
279 Backlog = 1005,
280 Ipe = 1006,
281 Close = 1007,
282 Version = 1008,
283 RdvCookie = 1009,
284 BadSecret = 1010,
285 Unsecure = 1011,
286 MessageApi = 1012,
287 Congestion = 1013,
288 Filter = 1014,
289 Group = 1015,
290 Timeout = 1016,
291}
292
293#[non_exhaustive]
294#[derive(Debug, Clone, Copy, PartialEq, Eq)]
295pub enum ServerRejectReason {
296 Fallback = 2000,
297 KeyNotSup = 2001,
298 Filepath = 2002,
299 HostNotFound = 2003,
300 BadRequest = 2400,
301 Unauthorized = 2401,
302 Overload = 2402,
303 Forbidden = 2403,
304 Notfound = 2404,
305 BadMode = 2405,
306 Unacceptable = 2406,
307 Conflict = 2409,
308 NotSupMedia = 2415,
309 Locked = 2423,
310 FailedDepend = 2424,
311 InternalServerError = 2500,
312 Unimplemented = 2501,
313 Gateway = 2502,
314 Down = 2503,
315 Version = 2505,
316 NoRoom = 2507,
317}
318
319#[derive(Debug, Clone, Copy, PartialEq, Eq)]
322pub enum RejectReason {
323 Core(CoreRejectReason),
325 CoreUnrecognized(i32),
326
327 Server(ServerRejectReason),
329 ServerUnrecognized(i32),
330
331 User(i32),
333}
334
335impl HandshakeVsInfo {
336 fn type_flags(&self, shake_type: ShakeType) -> u32 {
339 match self {
340 HandshakeVsInfo::V4(ty) => *ty as u32,
341 HandshakeVsInfo::V5(hs) => {
342 if shake_type == ShakeType::Induction
343 && (hs.ext_hs.is_some() || hs.ext_km.is_some() || hs.sid.is_some())
344 {
345 panic!("Handshake is both induction and has SRT extensions, not valid");
348 }
349
350 let mut flags = ExtFlags::empty();
351
352 if hs.ext_hs.is_some() {
353 flags |= ExtFlags::HS;
354 }
355 if hs.ext_km.is_some() {
356 flags |= ExtFlags::KM;
357 }
358 if hs.sid.is_some() {
359 flags |= ExtFlags::CONFIG;
360 }
361 (u32::from(hs.key_size.as_raw()) >> 3 << 16)
364 | if shake_type == ShakeType::Induction {
366 u32::from(SRT_MAGIC_CODE)
367 } else {
368 u32::from(flags.bits())
369 }
370 }
371 }
372 }
373
374 pub fn version(&self) -> u32 {
376 match self {
377 HandshakeVsInfo::V4(_) => 4,
378 HandshakeVsInfo::V5 { .. } => 5,
379 }
380 }
381}
382
383impl SocketType {
384 pub fn from_u16(num: u16) -> Result<SocketType, u16> {
386 match num {
387 1 => Ok(SocketType::Stream),
388 2 => Ok(SocketType::Datagram),
389 i => Err(i),
390 }
391 }
392}
393
394impl ControlPacket {
395 pub fn parse(buf: &mut impl Buf, is_ipv6: bool) -> Result<ControlPacket, PacketParseError> {
396 let control_type = buf.get_u16() << 1 >> 1; let reserved = buf.get_u16();
400 let add_info = buf.get_u32();
401 let timestamp = TimeStamp::from_micros(buf.get_u32());
402 let dest_sockid = buf.get_u32();
403
404 Ok(ControlPacket {
405 timestamp,
406 dest_sockid: SocketId(dest_sockid),
407 control_type: ControlTypes::deserialize(
409 control_type,
410 reserved,
411 add_info,
412 buf,
413 is_ipv6,
414 )?,
415 })
416 }
417
418 pub fn serialize<T: BufMut>(&self, into: &mut T) {
419 into.put_u16(self.control_type.id_byte() | (0b1 << 15));
421
422 into.put_u16(self.control_type.reserved());
424
425 into.put_u32(self.control_type.additional_info());
427
428 into.put_u32(self.timestamp.as_micros());
430
431 into.put_u32(self.dest_sockid.0);
433
434 self.control_type.serialize(into);
436 }
437
438 pub fn handshake(&self) -> Option<&HandshakeControlInfo> {
439 if let ControlTypes::Handshake(hs) = &self.control_type {
440 Some(hs)
441 } else {
442 None
443 }
444 }
445
446 pub fn wire_size(&self) -> usize {
447 Self::HEADER_SIZE
449 + match &self.control_type {
450 ControlTypes::Handshake(hs) => hs.serialized_size(),
451 ControlTypes::Ack(ack) => ack.serialized_size(),
452 ControlTypes::Nak(nak) => nak.0.len() * size_of::<u32>(),
453 ControlTypes::DropRequest { .. } => 2 * size_of::<u32>(),
454 ControlTypes::Srt(srt) => usize::from(srt.size_words()) * size_of::<u32>(),
455 ControlTypes::CongestionWarning
456 | ControlTypes::Ack2(_)
457 | ControlTypes::Shutdown
458 | ControlTypes::KeepAlive
459 | ControlTypes::PeerError(_) => 4,
460 }
461 }
462}
463
464impl Debug for ControlPacket {
465 fn fmt(&self, f: &mut Formatter) -> Result<(), fmt::Error> {
466 write!(
467 f,
468 "{{{:?} ts={:?} dst={:?}}}",
469 self.control_type, self.timestamp, self.dest_sockid,
470 )
471 }
472}
473
474const SRT_MAGIC_CODE: u16 = 0x4A17;
478
479impl ControlTypes {
480 pub fn new_drop_request(msg_to_drop: MsgNumber, drop_range: Range<SeqNumber>) -> Self {
481 assert!(!drop_range.is_empty());
482 Self::DropRequest {
483 msg_to_drop,
484 range: RangeInclusive::new(drop_range.start, drop_range.end - 1),
485 }
486 }
487
488 pub fn new_key_refresh_request(key_material: KeyingMaterialMessage) -> ControlTypes {
489 ControlTypes::Srt(SrtControlPacket::KeyRefreshRequest(key_material))
490 }
491
492 fn deserialize<T: Buf>(
496 packet_type: u16,
497 reserved: u16,
498 extra_info: u32,
499 mut buf: T,
500 is_ipv6: bool,
501 ) -> Result<ControlTypes, PacketParseError> {
502 match packet_type {
503 0x0 => {
504 if buf.remaining() < 8 * 4 + 16 {
507 return Err(PacketParseError::NotEnoughData);
508 }
509
510 let udt_version = buf.get_i32();
511 if udt_version != 4 && udt_version != 5 {
512 return Err(PacketParseError::BadUdtVersion(udt_version));
513 }
514
515 let crypto_size = buf.get_u16() << 3;
521 let type_ext_socket_type = buf.get_u16();
526
527 let init_seq_num = SeqNumber::new_truncate(buf.get_u32()); let max_packet_size = PacketSize(buf.get_u32() as u64);
529 let max_flow_size = PacketCount(buf.get_u32() as u64);
530 let shake_type = match ShakeType::try_from(buf.get_i32()) {
531 Ok(ct) => ct,
532 Err(err_ct) => return Err(PacketParseError::BadConnectionType(err_ct)),
533 };
534 let socket_id = SocketId(buf.get_u32());
535 let syn_cookie = buf.get_i32();
536
537 let peer_addr = if !is_ipv6 {
538 let ip = buf.get_u32_le();
539 buf.get_u32();
540 buf.get_u32();
541 buf.get_u32();
542 IpAddr::from(Ipv4Addr::from(ip))
543 } else {
544 let mut ip_buf = [0u8; 16];
545 buf.copy_to_slice(&mut ip_buf);
546 IpAddr::from(Ipv6Addr::from(ip_buf))
547 };
548
549 let info = match udt_version {
550 4 => HandshakeVsInfo::V4(match SocketType::from_u16(type_ext_socket_type) {
551 Ok(t) => t,
552 Err(e) => return Err(PacketParseError::BadSocketType(e)),
553 }),
554 5 => {
555 let crypto_size = match KeySize::try_from(crypto_size) {
557 Ok(size) => size,
558 Err(_) => {
559 warn!(
560 "Unrecognized crypto key length: {}, disabling encryption. Should be 0, 16, 24, or 32 bytes.",
561 crypto_size
562 );
563 KeySize::Unspecified
564 }
565 };
566
567 if shake_type == ShakeType::Induction {
568 if type_ext_socket_type != SRT_MAGIC_CODE {
569 warn!("HSv5 induction response did not have SRT_MAGIC_CODE, which is suspicious")
571 }
572
573 HandshakeVsInfo::V5(HsV5Info::default())
574 } else {
575 let extensions = match ExtFlags::from_bits(type_ext_socket_type) {
577 Some(i) => i,
578 None => {
579 warn!(
580 "Unnecessary bits in extensions flags: {:b}",
581 type_ext_socket_type
582 );
583
584 ExtFlags::from_bits_truncate(type_ext_socket_type)
585 }
586 };
587
588 let mut sid = None;
591 let mut ext_hs = None;
592 let mut ext_km = None;
593
594 while buf.remaining() > 4 {
595 let pack_type = buf.get_u16();
596
597 let pack_size_words = buf.get_u16();
598 let pack_size = usize::from(pack_size_words) * 4;
599
600 if buf.remaining() < pack_size {
601 return Err(PacketParseError::NotEnoughData);
602 }
603
604 let mut buffer = buf.take(pack_size);
605 match pack_type {
606 1 | 2 => {
607 if !extensions.contains(ExtFlags::HS) {
608 warn!("Handshake contains handshake extension type {} without HSREQ flag!", pack_type);
609 }
610 if ext_hs.is_some() {
611 warn!("Handshake contains multiple handshake extensions, only the last will be applied!");
612 }
613 ext_hs =
614 Some(SrtControlPacket::parse(pack_type, &mut buffer)?);
615 }
616 3 | 4 => {
617 if !extensions.contains(ExtFlags::KM) {
618 warn!("Handshake contains key material extension type {} without KMREQ flag!", pack_type);
619 }
620 if ext_km.is_some() {
621 warn!("Handshake contains multiple key material extensions, only the last will be applied!");
622 }
623 ext_km =
624 Some(SrtControlPacket::parse(pack_type, &mut buffer)?);
625 }
626 _ => {
627 if !extensions.contains(ExtFlags::CONFIG) {
628 warn!("Handshake contains config extension type {} without CONFIG flag!", pack_type);
629 }
630 match SrtControlPacket::parse(pack_type, &mut buffer)? {
631 SrtControlPacket::StreamId(stream_id) => {
633 sid = Some(stream_id)
634 }
635 _ => unimplemented!("Implement other kinds"),
636 }
637 }
638 }
639 buf = buffer.into_inner();
640 }
641
642 if buf.remaining() != 0 {
643 warn!("Handshake has data left, but not enough for an extension!");
644 }
645 if ext_hs.is_none() && extensions.contains(ExtFlags::HS) {
646 warn!("Handshake has HSREQ flag, but contains no handshake extensions!");
647 }
648 if ext_km.is_none() && extensions.contains(ExtFlags::KM) {
649 warn!("Handshake has KMREQ flag, but contains no key material extensions!");
650 }
651
652 HandshakeVsInfo::V5(HsV5Info {
653 key_size: crypto_size,
654 ext_hs,
655 ext_km,
656 ext_group: None,
657 sid,
658 })
659 }
660 }
661 _ => unreachable!(), };
663
664 Ok(ControlTypes::Handshake(HandshakeControlInfo {
665 init_seq_num,
666 max_packet_size,
667 max_flow_size,
668 shake_type,
669 socket_id,
670 syn_cookie,
671 peer_addr,
672 info,
673 }))
674 }
675 0x1 => {
676 if buf.remaining() >= 4 {
678 buf.get_u32();
679 }
680 Ok(ControlTypes::KeepAlive)
681 }
682 0x2 => {
683 if buf.remaining() < 4 {
687 return Err(PacketParseError::NotEnoughData);
688 }
689
690 let ack_number = SeqNumber::new_truncate(buf.get_u32());
692 let full_ack_seq_number = FullAckSeqNumber::new(extra_info);
693
694 let ack = if buf.remaining() < 3 * size_of::<u32>() {
695 Acknowledgement::Lite(ack_number)
696 } else {
697 let rtt_mean = TimeSpan::from_micros(buf.get_i32());
699 let rtt_variance = TimeSpan::from_micros(buf.get_i32());
700 let buffer_available = buf.get_u32();
701
702 let packet_receive_rate =
703 (buf.remaining() >= size_of::<u32>()).then(|| buf.get_u32());
704 let estimated_link_capacity =
705 (buf.remaining() >= size_of::<u32>()).then(|| buf.get_u32());
706 let data_receive_rate =
707 (buf.remaining() >= size_of::<u32>()).then(|| buf.get_u32());
708
709 let stats = AckStatistics {
710 rtt: Rtt::new(rtt_mean, rtt_variance),
711 buffer_available,
712 packet_receive_rate,
713 estimated_link_capacity,
714 data_receive_rate,
715 };
716
717 match full_ack_seq_number {
718 None => Acknowledgement::Small(ack_number, stats),
719 Some(full_ack) => Acknowledgement::Full(ack_number, stats, full_ack),
720 }
721 };
722 Ok(ControlTypes::Ack(ack))
723 }
724 0x3 => {
725 let mut loss_info = Vec::new();
728 while buf.remaining() >= 4 {
729 loss_info.push(buf.get_u32());
730 }
731
732 Ok(ControlTypes::Nak(CompressedLossList(loss_info)))
733 }
734 0x4 => {
735 if buf.remaining() >= 4 {
736 buf.get_u32(); }
738 Ok(ControlTypes::CongestionWarning)
739 }
740 0x5 => {
741 if buf.remaining() >= 4 {
742 buf.get_u32(); }
744 Ok(ControlTypes::Shutdown)
745 }
746 0x6 => {
747 if buf.remaining() >= 4 {
749 buf.get_u32(); }
751 if let Some(ack_seq_no) = FullAckSeqNumber::new(extra_info) {
752 Ok(ControlTypes::Ack2(ack_seq_no))
753 } else {
754 Err(PacketParseError::ZeroAckSequenceNumber)
755 }
756 }
757 0x7 => {
758 if buf.remaining() < 2 * 4 {
760 return Err(PacketParseError::NotEnoughData);
761 }
762
763 let start = SeqNumber::new_truncate(buf.get_u32());
764 let end = SeqNumber::new_truncate(buf.get_u32());
765
766 Ok(ControlTypes::DropRequest {
767 msg_to_drop: MsgNumber::new_truncate(extra_info), range: RangeInclusive::new(start, end),
769 })
770 }
771 0x8 => {
772 if buf.remaining() >= 4 {
774 buf.get_u32(); }
776 Ok(ControlTypes::PeerError(extra_info))
777 }
778 0x7FFF => {
779 Ok(ControlTypes::Srt(SrtControlPacket::parse(
781 reserved, &mut buf,
782 )?))
783 }
784 x => Err(PacketParseError::BadControlType(x)),
785 }
786 }
787
788 fn id_byte(&self) -> u16 {
789 match *self {
790 ControlTypes::Handshake(_) => 0x0,
791 ControlTypes::KeepAlive => 0x1,
792 ControlTypes::Ack { .. } => 0x2,
793 ControlTypes::Nak(_) => 0x3,
794 ControlTypes::CongestionWarning => 0x4,
795 ControlTypes::Shutdown => 0x5,
796 ControlTypes::Ack2(_) => 0x6,
797 ControlTypes::DropRequest { .. } => 0x7,
798 ControlTypes::PeerError(_) => 0x8,
799 ControlTypes::Srt(_) => 0x7FFF,
800 }
801 }
802
803 fn additional_info(&self) -> u32 {
804 match self {
805 ControlTypes::DropRequest { msg_to_drop, .. } => msg_to_drop.as_raw(),
807 ControlTypes::Ack2(a) | ControlTypes::Ack(Acknowledgement::Full(_, _, a)) => {
808 (*a).into()
809 }
810 ControlTypes::PeerError(err) => *err,
811 ControlTypes::Ack(_)
813 | ControlTypes::Handshake(_)
814 | ControlTypes::KeepAlive
815 | ControlTypes::Nak(_)
816 | ControlTypes::CongestionWarning
817 | ControlTypes::Shutdown
818 | ControlTypes::Srt(_) => 0,
819 }
820 }
821
822 fn reserved(&self) -> u16 {
823 match self {
824 ControlTypes::Srt(srt) => srt.type_id(),
825 _ => 0,
826 }
827 }
828
829 fn serialize<T: BufMut>(&self, into: &mut T) {
830 match self {
831 ControlTypes::Handshake(ref c) => c.serialize(into),
832 ControlTypes::Ack(ack) => ack.serialize(into),
833 ControlTypes::Nak(ref n) => {
834 for loss in n.iter_compressed() {
835 into.put_u32(loss);
836 }
837 }
838 ControlTypes::DropRequest {
839 msg_to_drop: _,
840 range,
841 } => {
842 into.put_u32(range.start().as_raw());
843 into.put_u32(range.end().as_raw());
844 }
845 ControlTypes::CongestionWarning
846 | ControlTypes::Ack2(_)
847 | ControlTypes::Shutdown
848 | ControlTypes::KeepAlive
849 | ControlTypes::PeerError(_) => {
850 into.put_u32(0x0);
855 }
856 ControlTypes::Srt(srt) => {
857 srt.serialize(into);
858 }
859 };
860 }
861}
862
863impl Debug for ControlTypes {
864 fn fmt(&self, f: &mut Formatter) -> Result<(), fmt::Error> {
865 match self {
866 ControlTypes::Handshake(hs) => write!(f, "{hs:?}"),
867 ControlTypes::KeepAlive => write!(f, "KeepAlive"),
868 ControlTypes::Ack(aci) => write!(f, "{aci:?}"),
869 ControlTypes::Nak(nak) => {
870 write!(f, "Nak({nak:?})") }
872 ControlTypes::CongestionWarning => write!(f, "CongestionWarning"),
873 ControlTypes::Shutdown => write!(f, "Shutdown"),
874 ControlTypes::Ack2(ackno) => write!(f, "Ack2({})", ackno.0),
875 ControlTypes::DropRequest { msg_to_drop, range } => {
876 write!(f, "DropReq(msg={msg_to_drop} {range:?})")
877 }
878 ControlTypes::PeerError(e) => write!(f, "PeerError({e})"),
879 ControlTypes::Srt(srt) => write!(f, "{srt:?}"),
880 }
881 }
882}
883
884impl CompressedLossList {
885 pub fn try_from_iter(iter: impl Iterator<Item = SeqNumber>) -> Option<CompressedLossList> {
886 let loss_list = compress_loss_list(iter).collect::<Vec<_>>();
887 if loss_list.is_empty() {
888 None
889 } else {
890 Some(CompressedLossList(loss_list))
891 }
892 }
893
894 pub fn try_from_range(r: Range<SeqNumber>) -> Option<CompressedLossList> {
895 if r.is_empty() {
896 None
897 } else if r.start + 1 == r.end {
898 Some(CompressedLossList(vec![r.start.as_raw()]))
899 } else {
900 Some(CompressedLossList(vec![
901 (1 << 31) | r.start.as_raw(),
902 (r.end - 1).as_raw(),
903 ]))
904 }
905 }
906
907 pub fn iter_compressed(&self) -> impl Iterator<Item = u32> + '_ {
908 self.0.iter().copied()
909 }
910
911 pub fn iter_decompressed(&self) -> impl Iterator<Item = SeqNumber> + '_ {
912 decompress_loss_list(self.iter_compressed())
913 }
914
915 pub fn into_iter_decompressed(self) -> impl Iterator<Item = SeqNumber> {
916 decompress_loss_list(self.0.into_iter())
917 }
918}
919
920impl FromIterator<SeqNumber> for CompressedLossList {
921 fn from_iter<T: IntoIterator<Item = SeqNumber>>(iter: T) -> Self {
922 Self::try_from_iter(iter.into_iter()).unwrap()
923 }
924}
925
926impl<'a> FromIterator<&'a SeqNumber> for CompressedLossList {
927 fn from_iter<T: IntoIterator<Item = &'a SeqNumber>>(iter: T) -> Self {
928 Self::try_from_iter(iter.into_iter().copied()).unwrap()
929 }
930}
931
932impl From<Range<SeqNumber>> for CompressedLossList {
933 fn from(range: Range<SeqNumber>) -> Self {
934 Self::try_from_range(range).unwrap()
935 }
936}
937
938impl Debug for CompressedLossList {
939 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
940 let mut iter = self.0.iter();
941 while let Some(a) = iter.next() {
942 if a & 0x80000000 != 0 {
943 let b = iter.next().expect("Unterminated list");
944 write!(f, "{}..={},", a & 0x7fffffff, b)?;
945 } else {
946 write!(f, "{a},")?;
947 }
948 }
949 Ok(())
950 }
951}
952
953impl FullAckSeqNumber {
954 pub const INITIAL: FullAckSeqNumber = FullAckSeqNumber(1);
955
956 pub fn new(raw: u32) -> Option<FullAckSeqNumber> {
957 if raw == 0 {
958 None
959 } else {
960 Some(FullAckSeqNumber(raw))
961 }
962 }
963
964 pub fn is_full(&self) -> bool {
965 self.0 != 0
966 }
967}
968
969impl From<FullAckSeqNumber> for u32 {
970 fn from(u: FullAckSeqNumber) -> Self {
971 u.0
972 }
973}
974
975impl Add<u32> for FullAckSeqNumber {
976 type Output = FullAckSeqNumber;
977
978 fn add(self, rhs: u32) -> Self::Output {
979 FullAckSeqNumber(self.0 + rhs)
980 }
981}
982
983impl Sub<FullAckSeqNumber> for FullAckSeqNumber {
984 type Output = usize;
985
986 fn sub(self, rhs: FullAckSeqNumber) -> Self::Output {
987 (self.0 - rhs.0).try_into().unwrap()
988 }
989}
990
991impl Acknowledgement {
992 pub fn ack_number(&self) -> SeqNumber {
993 match self {
994 Acknowledgement::Lite(seq_number)
995 | Acknowledgement::Small(seq_number, _)
996 | Acknowledgement::Full(seq_number, _, _) => *seq_number,
997 }
998 }
999
1000 pub fn full_ack_seq_number(&self) -> Option<FullAckSeqNumber> {
1001 match self {
1002 Acknowledgement::Full(_, _, full_ack_seq_number) => Some(*full_ack_seq_number),
1003 Acknowledgement::Lite(_) | Acknowledgement::Small(_, _) => None,
1004 }
1005 }
1006
1007 pub fn rtt(&self) -> Option<Rtt> {
1008 match self {
1009 Acknowledgement::Full(_, stats, _) | Acknowledgement::Small(_, stats) => {
1010 Some(stats.rtt)
1011 }
1012 Acknowledgement::Lite(_) => None,
1013 }
1014 }
1015
1016 pub fn statistics(&self) -> Option<&AckStatistics> {
1017 match self {
1018 Acknowledgement::Small(_, stats) | Acknowledgement::Full(_, stats, _) => Some(stats),
1019 Acknowledgement::Lite(_) => None,
1020 }
1021 }
1022
1023 pub fn serialize(&self, into: &mut impl BufMut) {
1024 into.put_u32(self.ack_number().as_raw());
1025 if let Some(stats) = self.statistics() {
1026 into.put_i32(stats.rtt.mean().as_micros());
1027 into.put_i32(stats.rtt.variance().as_micros());
1028 into.put_u32(stats.buffer_available);
1029
1030 if let Some(prr) = stats.packet_receive_rate {
1033 into.put_u32(prr);
1034 if let Some(elc) = stats.estimated_link_capacity {
1035 into.put_u32(elc);
1036 if let Some(drr) = stats.data_receive_rate {
1037 into.put_u32(drr);
1038 }
1039 }
1040 }
1041 }
1042 }
1043
1044 fn serialized_size(&self) -> usize {
1045 size_of::<u32>() + self.statistics().map(AckStatistics::serialized_size).unwrap_or(0)
1047 }
1048}
1049
1050impl Debug for HandshakeControlInfo {
1051 fn fmt(&self, f: &mut Formatter) -> Result<(), fmt::Error> {
1052 write!(
1053 f,
1054 "HS {:?} from={:?} {:?}",
1055 self.shake_type, self.socket_id, self.info
1056 )
1057 }
1058}
1059
1060impl Debug for HandshakeVsInfo {
1061 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
1062 match self {
1063 HandshakeVsInfo::V4(stype) => write!(f, "UDT: {stype:?}"),
1064 HandshakeVsInfo::V5(hs) => {
1065 write!(f, "SRT: crypto={:?}", hs.key_size)?;
1066 if let Some(pack) = &hs.ext_hs {
1067 write!(f, " hs={pack:?}")?;
1068 }
1069 if let Some(pack) = &hs.ext_km {
1070 write!(f, " km={pack:?}")?;
1071 }
1072 if let Some(sid) = &hs.sid {
1073 write!(f, " sid={sid:?}")?;
1074 }
1075 Ok(())
1076 }
1077 }
1078 }
1079}
1080
1081impl TryFrom<i32> for ShakeType {
1082 type Error = i32;
1084 fn try_from(value: i32) -> Result<Self, Self::Error> {
1085 match value {
1086 1 => Ok(ShakeType::Induction),
1087 0 => Ok(ShakeType::Waveahand),
1088 -1 => Ok(ShakeType::Conclusion),
1089 -2 => Ok(ShakeType::Agreement),
1090 i if i < 1000 => Err(i), i => Ok(ShakeType::Rejection(RejectReason::try_from(i).unwrap())), }
1093 }
1094}
1095
1096impl From<ShakeType> for i32 {
1097 fn from(st: ShakeType) -> i32 {
1098 match st {
1099 ShakeType::Induction => 1,
1100 ShakeType::Waveahand => 0,
1101 ShakeType::Conclusion => -1,
1102 ShakeType::Agreement => -2,
1103 ShakeType::Rejection(rej) => rej.into(),
1104 }
1105 }
1106}
1107
1108impl TryFrom<i32> for RejectReason {
1110 type Error = i32;
1111 fn try_from(value: i32) -> Result<Self, Self::Error> {
1112 match value {
1113 v if v < 1000 => Err(v),
1114 v if v < 2000 => Ok(match CoreRejectReason::try_from(v) {
1115 Ok(rr) => RejectReason::Core(rr),
1116 Err(rr) => RejectReason::CoreUnrecognized(rr),
1117 }),
1118 v if v < 3000 => Ok(match ServerRejectReason::try_from(v) {
1119 Ok(rr) => RejectReason::Server(rr),
1120 Err(rr) => RejectReason::ServerUnrecognized(rr),
1121 }),
1122 v => Ok(RejectReason::User(v)),
1123 }
1124 }
1125}
1126
1127impl From<RejectReason> for i32 {
1128 fn from(rr: RejectReason) -> i32 {
1129 match rr {
1130 RejectReason::Core(c) => c.into(),
1131 RejectReason::CoreUnrecognized(c) => c,
1132 RejectReason::Server(s) => s.into(),
1133 RejectReason::ServerUnrecognized(s) => s,
1134 RejectReason::User(u) => u,
1135 }
1136 }
1137}
1138
1139impl Display for RejectReason {
1140 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
1141 match self {
1142 RejectReason::Core(c) => write!(f, "{c}"),
1143 RejectReason::CoreUnrecognized(c) => write!(f, "Unrecognized core error: {c}"),
1144 RejectReason::Server(s) => write!(f, "{s}"),
1145 RejectReason::ServerUnrecognized(s) => write!(f, "Unrecognized server error: {s}"),
1146 RejectReason::User(u) => write!(f, "User error: {u}"),
1147 }
1148 }
1149}
1150
1151impl From<CoreRejectReason> for RejectReason {
1152 fn from(rr: CoreRejectReason) -> RejectReason {
1153 RejectReason::Core(rr)
1154 }
1155}
1156
1157impl TryFrom<i32> for CoreRejectReason {
1158 type Error = i32;
1159 fn try_from(value: i32) -> Result<Self, Self::Error> {
1160 use CoreRejectReason::*;
1161 Ok(match value {
1162 1001 => System,
1163 1002 => Peer,
1164 1003 => Resource,
1165 1004 => Rogue,
1166 1005 => Backlog,
1167 1006 => Ipe,
1168 1007 => Close,
1169 1008 => Version,
1170 1009 => RdvCookie,
1171 1010 => BadSecret,
1172 1011 => Unsecure,
1173 1012 => MessageApi,
1174 1013 => Congestion,
1175 1014 => Filter,
1176 1015 => Group,
1177 1016 => Timeout,
1178 other => return Err(other),
1179 })
1180 }
1181}
1182
1183impl From<CoreRejectReason> for i32 {
1184 fn from(rr: CoreRejectReason) -> i32 {
1185 rr as i32
1186 }
1187}
1188
1189impl Display for CoreRejectReason {
1190 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
1191 match self {
1192 CoreRejectReason::System => write!(f, "broken due to system function error"),
1193 CoreRejectReason::Peer => write!(f, "connection was rejected by peer"),
1194 CoreRejectReason::Resource => write!(f, "internal problem with resource allocation"),
1195 CoreRejectReason::Rogue => write!(f, "incorrect data in handshake messages"),
1196 CoreRejectReason::Backlog => write!(f, "listener's backlog exceeded"),
1197 CoreRejectReason::Ipe => write!(f, "internal program error"),
1198 CoreRejectReason::Close => write!(f, "socket is closing"),
1199 CoreRejectReason::Version => {
1200 write!(f, "peer is older version than agent's minimum set")
1201 }
1202 CoreRejectReason::RdvCookie => write!(f, "rendezvous cookie collision"),
1203 CoreRejectReason::BadSecret => write!(f, "wrong password"),
1204 CoreRejectReason::Unsecure => write!(f, "password required or unexpected"),
1205 CoreRejectReason::MessageApi => write!(f, "streamapi/messageapi collision"),
1206 CoreRejectReason::Congestion => write!(f, "incompatible congestion-controller type"),
1207 CoreRejectReason::Filter => write!(f, "incompatible packet filter"),
1208 CoreRejectReason::Group => write!(f, "incompatible group"),
1209 CoreRejectReason::Timeout => write!(f, "connection timeout"),
1210 }
1211 }
1212}
1213
1214impl From<ServerRejectReason> for RejectReason {
1215 fn from(rr: ServerRejectReason) -> RejectReason {
1216 RejectReason::Server(rr)
1217 }
1218}
1219
1220impl TryFrom<i32> for ServerRejectReason {
1221 type Error = i32;
1222 fn try_from(value: i32) -> Result<Self, Self::Error> {
1223 Ok(match value {
1224 2000 => ServerRejectReason::Fallback,
1225 2001 => ServerRejectReason::KeyNotSup,
1226 2002 => ServerRejectReason::Filepath,
1227 2003 => ServerRejectReason::HostNotFound,
1228 2400 => ServerRejectReason::BadRequest,
1229 2401 => ServerRejectReason::Unauthorized,
1230 2402 => ServerRejectReason::Overload,
1231 2403 => ServerRejectReason::Forbidden,
1232 2404 => ServerRejectReason::Notfound,
1233 2405 => ServerRejectReason::BadMode,
1234 2406 => ServerRejectReason::Unacceptable,
1235 2409 => ServerRejectReason::Conflict,
1236 2415 => ServerRejectReason::NotSupMedia,
1237 2423 => ServerRejectReason::Locked,
1238 2424 => ServerRejectReason::FailedDepend,
1239 2500 => ServerRejectReason::InternalServerError,
1240 2501 => ServerRejectReason::Unimplemented,
1241 2502 => ServerRejectReason::Gateway,
1242 2503 => ServerRejectReason::Down,
1243 2505 => ServerRejectReason::Version,
1244 2507 => ServerRejectReason::NoRoom,
1245 unrecog => return Err(unrecog),
1246 })
1247 }
1248}
1249
1250impl From<ServerRejectReason> for i32 {
1251 fn from(rr: ServerRejectReason) -> i32 {
1252 rr as i32
1253 }
1254}
1255
1256impl Display for ServerRejectReason {
1257 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1258 match self {
1259 ServerRejectReason::Fallback =>
1260 write!(f, "the application wants to report some problem, but can't precisely specify it"),
1261 ServerRejectReason::KeyNotSup =>
1262 write!(f, "The key used in the StreamID keyed string is not supported by the service"),
1263 ServerRejectReason::Filepath =>write!(f, "The resource type designates a file and the path is either wrong syntax or not found"),
1264 ServerRejectReason::HostNotFound => write!(f, "The `h` host specification was not recognized by the service"),
1265 ServerRejectReason::BadRequest => write!(f, "General syntax error in the SocketID specification (also a fallback code for undefined cases)"),
1266 ServerRejectReason::Unauthorized => write!(f, "Authentication failed, provided that the user was correctly identified and access to the required resource would be granted"),
1267 ServerRejectReason::Overload => write!(f, "The server is too heavily loaded, or you have exceeded credits for accessing the service and the resource"),
1268 ServerRejectReason::Forbidden => write!(f, "Access denied to the resource by any kind of reason"),
1269 ServerRejectReason::Notfound => write!(f, "Resource not found at this time"),
1270 ServerRejectReason::BadMode => write!(f, "The mode specified in `m` key in StreamID is not supported for this request"),
1271 ServerRejectReason::Unacceptable => write!(f, "The requested parameters specified in SocketID cannot be satisfied for the requested resource. Also when m=publish and the data format is not acceptable"),
1272 ServerRejectReason::Conflict => write!(f, "The resource being accessed is already locked for modification. This is in case of m=publish and the specified resource is currently read-only"),
1273 ServerRejectReason::NotSupMedia => write!(f, "The media type is not supported by the application. This is the `t` key that specifies the media type as stream, file and auth, possibly extended by the application"),
1274 ServerRejectReason::Locked => write!(f, "The resource being accessed is locked for any access"),
1275 ServerRejectReason::FailedDepend => write!(f, "The request failed because it specified a dependent session ID that has been disconnected"),
1276 ServerRejectReason::InternalServerError => write!(f, "Unexpected internal server error"),
1277 ServerRejectReason::Unimplemented => write!(f, "The request was recognized, but the current version doesn't support it (unimplemented)"),
1278 ServerRejectReason::Gateway => write!(f, "The server acts as a gateway and the target endpoint rejected the connection"),
1279 ServerRejectReason::Down => write!(f, "The service has been temporarily taken over by a stub reporting this error. The real service can be down for maintenance or crashed"),
1280 ServerRejectReason::Version => write!(f, "SRT version not supported. This might be either unsupported backward compatibility, or an upper value of a version"),
1281 ServerRejectReason::NoRoom => write!(f, "The data stream cannot be archived due to lacking storage space. This is in case when the request type was to send a file or the live stream to be archived"),
1282 }
1283 }
1284}
1285
1286impl HandshakeControlInfo {
1287 #[allow(clippy::manual_bits)]
1288 fn serialized_size(&self) -> usize {
1289 8 * size_of::<u32>() + size_of::<u128>() + match &self.info {
1292 HandshakeVsInfo::V4(_) => 0,
1293 HandshakeVsInfo::V5(info) => {
1294 info.ext_hs.as_ref().map(|hs| 2 * size_of::<u16>() + usize::from(hs.size_words()) * size_of::<u32>()).unwrap_or(0)
1295 +
1296 info.ext_km.as_ref().map(|hs| 2 * size_of::<u16>() + usize::from(hs.size_words()) * size_of::<u32>()).unwrap_or(0)
1297 +
1298 info.sid.as_ref().map(|sid| 2 * size_of::<u16>() + ((sid.len() + 3) / 4 * 4)).unwrap_or(0)
1299 }
1300 }
1301 }
1302
1303 fn serialize(&self, into: &mut impl BufMut) {
1304 into.put_u32(self.info.version());
1305 into.put_u32(self.info.type_flags(self.shake_type));
1306 into.put_u32(self.init_seq_num.as_raw());
1307 into.put_u32(self.max_packet_size.0 as u32);
1308 into.put_u32(self.max_flow_size.0 as u32);
1309 into.put_i32(self.shake_type.into());
1310 into.put_u32(self.socket_id.0);
1311 into.put_i32(self.syn_cookie);
1312
1313 match self.peer_addr {
1314 IpAddr::V4(four) => {
1315 let v = u32::from(four);
1316 into.put_u32_le(v);
1317
1318 into.put(&[0; 12][..]);
1320 }
1321 IpAddr::V6(six) => {
1322 let v = u128::from(six);
1323
1324 into.put_u128(v);
1325 }
1326 }
1327
1328 if let HandshakeVsInfo::V5(hs) = &self.info {
1330 for ext in [
1331 &hs.ext_hs,
1332 &hs.ext_km,
1333 &hs.sid.clone().map(SrtControlPacket::StreamId),
1334 ]
1335 .into_iter()
1336 .filter_map(|s| s.as_ref())
1337 {
1338 into.put_u16(ext.type_id());
1339 into.put_u16(ext.size_words());
1341 ext.serialize(into);
1342 }
1343 }
1344 }
1345}
1346
1347impl AckStatistics {
1348 fn serialized_size(&self) -> usize {
1349 size_of::<u32>()
1350 * (3 + match self {
1351 Self {
1352 packet_receive_rate: None,
1353 ..
1354 } => 0,
1355 Self {
1356 estimated_link_capacity: None,
1357 ..
1358 } => 1,
1359 Self {
1360 data_receive_rate: None,
1361 ..
1362 } => 2,
1363 _ => 3,
1364 })
1365 }
1366}
1367
1368#[cfg(test)]
1369mod test {
1370 use super::*;
1371
1372 use std::{convert::TryInto, io::Cursor, time::Duration};
1373
1374 use crate::options::*;
1375
1376 fn ser_des_test(pack: ControlPacket) -> Vec<u8> {
1377 let mut buf = vec![];
1378 pack.serialize(&mut buf);
1379
1380 let mut cursor = Cursor::new(&buf);
1381 let des = ControlPacket::parse(&mut cursor, false).unwrap();
1382 assert_eq!(cursor.remaining(), 0);
1383 assert_eq!(pack, des);
1384 assert_eq!(
1385 pack.wire_size(),
1386 buf.len() + 28,
1387 "Packet {pack:?} wrong wire size"
1388 ); buf
1391 }
1392
1393 #[test]
1394 fn lite_ack_ser_des_test() {
1395 ser_des_test(ControlPacket {
1396 timestamp: TimeStamp::from_micros(1234),
1397 dest_sockid: SocketId(0),
1398 control_type: ControlTypes::Ack(Acknowledgement::Lite(SeqNumber::new_truncate(1234))),
1399 });
1400 }
1401
1402 #[test]
1403 fn handshake_ser_des_test() {
1404 ser_des_test(ControlPacket {
1405 timestamp: TimeStamp::from_micros(0),
1406 dest_sockid: SocketId(0),
1407 control_type: ControlTypes::Handshake(HandshakeControlInfo {
1408 init_seq_num: SeqNumber::new_truncate(1_827_131),
1409 max_packet_size: PacketSize(1500),
1410 max_flow_size: PacketCount(25600),
1411 shake_type: ShakeType::Conclusion,
1412 socket_id: SocketId(1231),
1413 syn_cookie: 0,
1414 peer_addr: "127.0.0.1".parse().unwrap(),
1415 info: HandshakeVsInfo::V5(HsV5Info {
1416 key_size: KeySize::Unspecified, ext_hs: Some(SrtControlPacket::HandshakeResponse(SrtHandshake {
1418 version: SrtVersion::CURRENT,
1419 flags: SrtShakeFlags::NAKREPORT | SrtShakeFlags::TSBPDSND,
1420 send_latency: Duration::from_millis(3000),
1421 recv_latency: Duration::from_millis(12345),
1422 })),
1423 ext_km: None,
1424 ext_group: None,
1425 sid: None,
1426 }),
1427 }),
1428 });
1429 }
1430
1431 #[test]
1432 fn ack_ser_des_test() {
1433 ser_des_test(ControlPacket {
1434 timestamp: TimeStamp::from_micros(113_703),
1435 dest_sockid: SocketId(2_453_706_529),
1436 control_type: ControlTypes::Ack(Acknowledgement::Full(
1437 SeqNumber::new_truncate(282_049_186),
1438 AckStatistics {
1439 rtt: Rtt::new(TimeSpan::from_micros(10_002), TimeSpan::from_micros(1000)),
1440 buffer_available: 1314,
1441 packet_receive_rate: Some(0),
1442 estimated_link_capacity: Some(0),
1443 data_receive_rate: Some(0),
1444 },
1445 FullAckSeqNumber::new(1).unwrap(),
1446 )),
1447 });
1448 }
1449
1450 #[test]
1451 fn ack2_ser_des_test() {
1452 let buf = ser_des_test(ControlPacket {
1453 timestamp: TimeStamp::from_micros(125_812),
1454 dest_sockid: SocketId(8313),
1455 control_type: ControlTypes::Ack2(FullAckSeqNumber::new(831).unwrap()),
1456 });
1457
1458 assert_eq!((u32::from(buf[6]) << 8) + u32::from(buf[7]), 831);
1460 }
1461
1462 #[test]
1463 fn enc_size_ser_des_test() {
1464 ser_des_test(ControlPacket {
1465 timestamp: TimeStamp::from_micros(0),
1466 dest_sockid: SocketId(0),
1467 control_type: ControlTypes::Handshake(HandshakeControlInfo {
1468 init_seq_num: SeqNumber(0),
1469 max_packet_size: PacketSize(1816),
1470 max_flow_size: PacketCount(0),
1471 shake_type: ShakeType::Conclusion,
1472 socket_id: SocketId(0),
1473 syn_cookie: 0,
1474 peer_addr: [127, 0, 0, 1].into(),
1475 info: HandshakeVsInfo::V5(HsV5Info {
1476 key_size: KeySize::AES128,
1477 ext_km: None,
1478 ext_hs: None,
1479 ext_group: None,
1480 sid: None,
1481 }),
1482 }),
1483 });
1484 }
1485
1486 #[test]
1487 fn sid_ser_des_test() {
1488 ser_des_test(ControlPacket {
1489 timestamp: TimeStamp::from_micros(0),
1490 dest_sockid: SocketId(0),
1491 control_type: ControlTypes::Handshake(HandshakeControlInfo {
1492 init_seq_num: SeqNumber(0),
1493 max_packet_size: PacketSize(1816),
1494 max_flow_size: PacketCount(0),
1495 shake_type: ShakeType::Conclusion,
1496 socket_id: SocketId(0),
1497 syn_cookie: 0,
1498 peer_addr: [127, 0, 0, 1].into(),
1499 info: HandshakeVsInfo::V5(HsV5Info {
1500 key_size: KeySize::Unspecified,
1501 ext_km: None,
1502 ext_hs: None,
1503 ext_group: None,
1504 sid: Some("Hello hello".into()),
1505 }),
1506 }),
1507 });
1508 }
1509
1510 #[test]
1511 fn keepalive_ser_des_test() {
1512 ser_des_test(ControlPacket {
1513 timestamp: TimeStamp::from_micros(0),
1514 dest_sockid: SocketId(0),
1515 control_type: ControlTypes::KeepAlive,
1516 });
1517 }
1518
1519 #[test]
1520 fn congestion_warning_ser_des_test() {
1521 ser_des_test(ControlPacket {
1522 timestamp: TimeStamp::from_micros(100),
1523 dest_sockid: rand::random(),
1524 control_type: ControlTypes::CongestionWarning,
1525 });
1526 }
1527
1528 #[test]
1529 fn peer_error_ser_des_test() {
1530 ser_des_test(ControlPacket {
1531 timestamp: TimeStamp::from_micros(100),
1532 dest_sockid: rand::random(),
1533 control_type: ControlTypes::PeerError(1234),
1534 });
1535 }
1536
1537 #[test]
1538 fn congestion_ser_des_test() {
1539 ser_des_test(ControlPacket {
1540 timestamp: TimeStamp::from_micros(100),
1541 dest_sockid: rand::random(),
1542 control_type: ControlTypes::Srt(SrtControlPacket::Congestion("live".to_string())),
1543 });
1544 }
1545
1546 #[test]
1547 fn group_ser_des_test() {
1548 ser_des_test(ControlPacket {
1549 timestamp: TimeStamp::from_micros(100),
1550 dest_sockid: rand::random(),
1551 control_type: ControlTypes::Srt(SrtControlPacket::Group {
1552 ty: GroupType::MainBackup,
1553 flags: GroupFlags::MSG_SYNC,
1554 weight: 123,
1555 }),
1556 });
1557 }
1558
1559 #[test]
1560 fn drop_request_ser_des_test() {
1561 ser_des_test(ControlPacket {
1562 timestamp: TimeStamp::from_micros(100),
1563 dest_sockid: rand::random(),
1564 control_type: ControlTypes::DropRequest {
1565 msg_to_drop: MsgNumber(123),
1566 range: SeqNumber(13)..=SeqNumber(100),
1567 },
1568 });
1569 }
1570
1571 #[test]
1572 fn filter_ser_des_test() {
1573 ser_des_test(ControlPacket {
1574 timestamp: TimeStamp::from_micros(100),
1575 dest_sockid: rand::random(),
1576 control_type: ControlTypes::Srt(SrtControlPacket::Filter(FilterSpec(
1577 [("hi".to_string(), "bye".to_string())]
1578 .into_iter()
1579 .collect(),
1580 ))),
1581 });
1582 }
1583
1584 #[test]
1585 fn raw_srt_packet_test() {
1586 let packet_data =
1589 hex::decode("FFFF000000000000000189702BFFEFF2000103010000001E00000078").unwrap();
1590
1591 let packet = ControlPacket::parse(&mut Cursor::new(packet_data), false).unwrap();
1592
1593 assert_eq!(
1594 packet,
1595 ControlPacket {
1596 timestamp: TimeStamp::from_micros(100_720),
1597 dest_sockid: SocketId(738_193_394),
1598 control_type: ControlTypes::Srt(SrtControlPacket::Reject)
1599 }
1600 )
1601 }
1602
1603 #[test]
1604 fn raw_handshake_ipv6() {
1605 let packet_data = hex::decode("8000000000000000000002b00000000000000004000000023c3b0296000005dc00002000000000010669ead20000000000000000000000000000000001000000").unwrap();
1606 let packet = ControlPacket::parse(&mut Cursor::new(&packet_data[..]), true).unwrap();
1607
1608 let r = ControlPacket {
1609 timestamp: TimeStamp::from_micros(688),
1610 dest_sockid: SocketId(0),
1611 control_type: ControlTypes::Handshake(HandshakeControlInfo {
1612 init_seq_num: SeqNumber(1010500246),
1613 max_packet_size: PacketSize(1500),
1614 max_flow_size: PacketCount(8192),
1615 shake_type: ShakeType::Induction,
1616 socket_id: SocketId(0x0669EAD2),
1617 syn_cookie: 0,
1618 peer_addr: "::1.0.0.0".parse().unwrap(),
1619 info: HandshakeVsInfo::V4(SocketType::Datagram),
1620 }),
1621 };
1622
1623 assert_eq!(packet, r);
1624
1625 let mut buf = vec![];
1627 packet.serialize(&mut buf);
1628
1629 assert_eq!(&buf[..], &packet_data[..]);
1630 }
1631
1632 #[test]
1633 fn raw_handshake_srt() {
1634 let packet_data = hex::decode("8000000000000000000F9EC400000000000000050000000144BEA60D000005DC00002000FFFFFFFF3D6936B6E3E405DD0100007F00000000000000000000000000010003000103010000002F00780000").unwrap();
1636 let packet = ControlPacket::parse(&mut Cursor::new(&packet_data[..]), false).unwrap();
1637 assert_eq!(
1638 packet,
1639 ControlPacket {
1640 timestamp: TimeStamp::from_micros(1_023_684),
1641 dest_sockid: SocketId(0),
1642 control_type: ControlTypes::Handshake(HandshakeControlInfo {
1643 init_seq_num: SeqNumber(1_153_345_037),
1644 max_packet_size: PacketSize(1500),
1645 max_flow_size: PacketCount(8192),
1646 shake_type: ShakeType::Conclusion,
1647 socket_id: SocketId(1_030_305_462),
1648 syn_cookie: -471_595_555,
1649 peer_addr: "127.0.0.1".parse().unwrap(),
1650 info: HandshakeVsInfo::V5(HsV5Info {
1651 key_size: KeySize::Unspecified,
1652 ext_hs: Some(SrtControlPacket::HandshakeRequest(SrtHandshake {
1653 version: SrtVersion::new(1, 3, 1),
1654 flags: SrtShakeFlags::TSBPDSND
1655 | SrtShakeFlags::TSBPDRCV
1656 | SrtShakeFlags::HAICRYPT
1657 | SrtShakeFlags::TLPKTDROP
1658 | SrtShakeFlags::REXMITFLG,
1659 send_latency: Duration::from_millis(120),
1660 recv_latency: Duration::new(0, 0)
1661 })),
1662 ext_km: None,
1663 ext_group: None,
1664 sid: None,
1665 })
1666 })
1667 }
1668 );
1669
1670 let mut buf = vec![];
1672 packet.serialize(&mut buf);
1673
1674 assert_eq!(&buf[..], &packet_data[..]);
1675 }
1676
1677 #[test]
1678 fn raw_handshake_sid() {
1679 let packet_data = hex::decode("800000000000000000000b1400000000000000050000000563444b2e000005dc00002000ffffffff37eb0ee52154fbd60100007f0000000000000000000000000001000300010401000000bf0014001400050003646362616867666500006a69").unwrap();
1682 let packet = ControlPacket::parse(&mut Cursor::new(&packet_data[..]), false).unwrap();
1683 assert_eq!(
1684 packet,
1685 ControlPacket {
1686 timestamp: TimeStamp::from_micros(2836),
1687 dest_sockid: SocketId(0),
1688 control_type: ControlTypes::Handshake(HandshakeControlInfo {
1689 init_seq_num: SeqNumber(1_665_420_078),
1690 max_packet_size: PacketSize(1500),
1691 max_flow_size: PacketCount(8192),
1692 shake_type: ShakeType::Conclusion,
1693 socket_id: SocketId(0x37eb0ee5),
1694 syn_cookie: 559_217_622,
1695 peer_addr: "127.0.0.1".parse().unwrap(),
1696 info: HandshakeVsInfo::V5(HsV5Info {
1697 key_size: KeySize::Unspecified,
1698 ext_hs: Some(SrtControlPacket::HandshakeRequest(SrtHandshake {
1699 version: SrtVersion::new(1, 4, 1),
1700 flags: SrtShakeFlags::TSBPDSND
1701 | SrtShakeFlags::TSBPDRCV
1702 | SrtShakeFlags::HAICRYPT
1703 | SrtShakeFlags::REXMITFLG
1704 | SrtShakeFlags::TLPKTDROP
1705 | SrtShakeFlags::NAKREPORT
1706 | SrtShakeFlags::PACKET_FILTER,
1707 send_latency: Duration::from_millis(20),
1708 recv_latency: Duration::from_millis(20)
1709 })),
1710 ext_km: None,
1711 ext_group: None,
1712 sid: Some(String::from("abcdefghij")),
1713 })
1714 })
1715 }
1716 );
1717
1718 let mut buf = vec![];
1720 packet.serialize(&mut buf);
1721
1722 assert_eq!(&buf[..], &packet_data[..]);
1723 }
1724
1725 #[test]
1726 fn raw_handshake_crypto() {
1727 let packet_data = hex::decode("800000000000000000175E8A0000000000000005000000036FEFB8D8000005DC00002000FFFFFFFF35E790ED5D16CCEA0100007F00000000000000000000000000010003000103010000002F01F401F40003000E122029010000000002000200000004049D75B0AC924C6E4C9EC40FEB4FE973DB1D215D426C18A2871EBF77E2646D9BAB15DBD7689AEF60EC").unwrap();
1729 let packet = ControlPacket::parse(&mut Cursor::new(&packet_data[..]), false).unwrap();
1730
1731 assert_eq!(
1732 packet,
1733 ControlPacket {
1734 timestamp: TimeStamp::from_micros(1_531_530),
1735 dest_sockid: SocketId(0),
1736 control_type: ControlTypes::Handshake(HandshakeControlInfo {
1737 init_seq_num: SeqNumber(1_877_981_400),
1738 max_packet_size: PacketSize(1_500),
1739 max_flow_size: PacketCount(8_192),
1740 shake_type: ShakeType::Conclusion,
1741 socket_id: SocketId(904_368_365),
1742 syn_cookie: 1_561_775_338,
1743 peer_addr: "127.0.0.1".parse().unwrap(),
1744 info: HandshakeVsInfo::V5(HsV5Info {
1745 key_size: KeySize::Unspecified,
1746 ext_hs: Some(SrtControlPacket::HandshakeRequest(SrtHandshake {
1747 version: SrtVersion::new(1, 3, 1),
1748 flags: SrtShakeFlags::TSBPDSND
1749 | SrtShakeFlags::TSBPDRCV
1750 | SrtShakeFlags::HAICRYPT
1751 | SrtShakeFlags::TLPKTDROP
1752 | SrtShakeFlags::REXMITFLG,
1753 send_latency: Duration::from_millis(500),
1754 recv_latency: Duration::from_millis(500)
1755 })),
1756 ext_km: Some(SrtControlPacket::KeyRefreshRequest(KeyingMaterialMessage {
1757 pt: PacketType::KeyingMaterial,
1758 key_flags: KeyFlags::EVEN,
1759 keki: 0,
1760 cipher: CipherType::Ctr,
1761 auth: Auth::None,
1762 salt: hex::decode("9D75B0AC924C6E4C9EC40FEB4FE973DB").unwrap(),
1763 wrapped_keys: hex::decode(
1764 "1D215D426C18A2871EBF77E2646D9BAB15DBD7689AEF60EC"
1765 )
1766 .unwrap()
1767 })),
1768 ext_group: None,
1769 sid: None,
1770 })
1771 })
1772 }
1773 );
1774
1775 let mut buf = vec![];
1776 packet.serialize(&mut buf);
1777
1778 assert_eq!(&buf[..], &packet_data[..])
1779 }
1780
1781 #[test]
1782 fn raw_handshake_crypto_pt2() {
1783 let packet_data = hex::decode("8000000000000000000000000C110D94000000050000000374B7526E000005DC00002000FFFFFFFF18C1CED1F3819B720100007F00000000000000000000000000020003000103010000003F03E803E80004000E12202901000000000200020000000404D3B3D84BE1188A4EBDA4DA16EA65D522D82DE544E1BE06B6ED8128BF15AA4E18EC50EAA95546B101").unwrap();
1784 let _packet = ControlPacket::parse(&mut Cursor::new(&packet_data[..]), false).unwrap();
1785 dbg!(&_packet);
1786 }
1787
1788 #[test]
1789 fn short_ack() {
1790 let packet_data =
1792 hex::decode("800200000000000e000246e5d96d5e1a389c24780000452900007bb000001fa9")
1793 .unwrap();
1794
1795 let _cp = ControlPacket::parse(&mut Cursor::new(packet_data), false).unwrap();
1796 }
1797
1798 #[test]
1799 fn test_reject_reason_deser_ser() {
1800 assert_eq!(
1801 Ok(RejectReason::Server(ServerRejectReason::Unimplemented)),
1802 <i32 as TryInto<RejectReason>>::try_into(
1803 RejectReason::Server(ServerRejectReason::Unimplemented).into()
1804 )
1805 );
1806 }
1807
1808 #[test]
1809 fn test_unordered_hs_extensions() {
1810 let packet_data = hex::decode(concat!(
1812 "80000000000000000000dea800000000",
1813 "000000050004000751dca3b8000005b8",
1814 "00002000ffffffff025c84b8da7ee4e7",
1815 "0100007f000000000000000000000000",
1816 "0001000300010402000000bf003c003c",
1817 "000500033a3a212365683d7500000078",
1818 "00030012122029010000000002000200",
1819 "00000408437937d8c23ce2090754c5a7",
1820 "a9e608c14631aef7ac0b8a46b77b8c0b",
1821 "97d4061e565dcb86e4c5cc3701e1f992",
1822 "a5b2de3651c937c94f3333a6"
1823 ))
1824 .unwrap();
1825
1826 let packet = ControlPacket::parse(&mut Cursor::new(packet_data), false).unwrap();
1827 let reference = ControlPacket {
1828 timestamp: TimeStamp::from_micros(57000),
1829 dest_sockid: SocketId(0),
1830 control_type: ControlTypes::Handshake(HandshakeControlInfo {
1831 init_seq_num: SeqNumber(1373414328),
1832 max_packet_size: PacketSize(1464),
1833 max_flow_size: PacketCount(8192),
1834 shake_type: ShakeType::Conclusion,
1835 socket_id: SocketId(0x025C84B8),
1836 syn_cookie: 0xda7ee4e7u32 as i32,
1837 peer_addr: [127, 0, 0, 1].into(),
1838 info: HandshakeVsInfo::V5(HsV5Info {
1839 key_size: KeySize::AES256,
1840 ext_hs: Some(SrtControlPacket::HandshakeRequest(SrtHandshake {
1841 version: SrtVersion::new(1, 4, 2),
1842 flags: SrtShakeFlags::TSBPDSND
1843 | SrtShakeFlags::TSBPDRCV
1844 | SrtShakeFlags::HAICRYPT
1845 | SrtShakeFlags::TLPKTDROP
1846 | SrtShakeFlags::NAKREPORT
1847 | SrtShakeFlags::REXMITFLG
1848 | SrtShakeFlags::PACKET_FILTER,
1849 send_latency: Duration::from_millis(60),
1850 recv_latency: Duration::from_millis(60)
1851 })),
1852 ext_km: Some(SrtControlPacket::KeyRefreshRequest(KeyingMaterialMessage {
1853 pt: PacketType::KeyingMaterial,
1854 key_flags: KeyFlags::EVEN,
1855 keki: 0,
1856 cipher: CipherType::Ctr,
1857 auth: Auth::None,
1858 salt: hex::decode("437937d8c23ce2090754c5a7a9e608c1").unwrap(),
1859 wrapped_keys: hex::decode(
1860 "4631aef7ac0b8a46b77b8c0b97d4061e565dcb86e4c5cc3701e1f992a5b2de3651c937c94f3333a6"
1861 )
1862 .unwrap()
1863 })),
1864 ext_group: None,
1865 sid: Some("#!::u=hex".into()),
1866 }),
1867 }),
1868 };
1869
1870 assert_eq!(packet, reference);
1871 }
1872
1873 #[test]
1874 fn drop_request() {}
1875}