srt/packet/control.rs
1use std::fmt::{self, Debug, Formatter};
2use std::net::{IpAddr, Ipv4Addr};
3
4use bitflags::bitflags;
5use bytes::{Buf, BufMut};
6use log::warn;
7
8use crate::protocol::{TimeSpan, TimeStamp};
9use crate::{MsgNumber, SeqNumber, SocketID};
10
11mod srt;
12
13pub use self::srt::{CipherType, SrtControlPacket, SrtHandshake, SrtKeyMessage, SrtShakeFlags};
14use super::PacketParseError;
15
16/// A UDP packet carrying control information
17///
18/// ```ignore,
19/// 0 1 2 3
20/// 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
21/// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
22/// |1| Type | Reserved |
23/// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
24/// | | Additional Info |
25/// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
26/// | Time Stamp |
27/// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
28/// | Destination Socket ID |
29/// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
30/// | |
31/// ~ Control Information Field ~
32/// | |
33/// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
34/// ```
35/// (from <https://tools.ietf.org/html/draft-gg-udt-03#page-5>)
36#[derive(Clone, PartialEq, Eq)]
37pub struct ControlPacket {
38 /// The timestamp, relative to the socket start time (wrapping every 2^32 microseconds)
39 pub timestamp: TimeStamp,
40
41 /// The dest socket ID, used for multiplexing
42 pub dest_sockid: SocketID,
43
44 /// The extra data
45 pub control_type: ControlTypes,
46}
47
48/// The different kind of control packets
49#[derive(Clone, PartialEq, Eq)]
50#[allow(clippy::large_enum_variant)]
51pub enum ControlTypes {
52 /// The control packet for initiating connections, type 0x0
53 /// Does not use Additional Info
54 Handshake(HandshakeControlInfo),
55
56 /// To keep a connection alive
57 /// Does not use Additional Info or Control Info, type 0x1
58 KeepAlive,
59
60 /// ACK packet, type 0x2
61 Ack(AckControlInfo),
62
63 /// NAK packet, type 0x3
64 /// Additional Info isn't used
65 /// The information is stored in the loss compression format, specified in the loss_compression module.
66 Nak(Vec<u32>),
67
68 /// Shutdown packet, type 0x5
69 Shutdown,
70
71 /// Acknowledgement of Acknowledgement (ACK2) 0x6
72 /// Additional Info (the i32) is the ACK sequence number to acknowldege
73 Ack2(i32),
74
75 /// Drop request, type 0x7
76 DropRequest {
77 /// The message to drop
78 /// Stored in the "addditional info" field of the packet.
79 msg_to_drop: MsgNumber,
80
81 /// The first sequence number in the message to drop
82 first: SeqNumber,
83
84 /// The last sequence number in the message to drop
85 last: SeqNumber,
86 },
87
88 /// Srt control packets
89 /// These use the UDT extension type 0xFF
90 Srt(SrtControlPacket),
91}
92
93bitflags! {
94 /// Used to describe the extension types in the packet
95 struct ExtFlags: u16 {
96 /// The packet has a handshake extension
97 const HS = 0b1;
98 /// The packet has a kmreq extension
99 const KM = 0b10;
100 /// The packet has a config extension (SID or smoother)
101 const CONFIG = 0b100;
102 }
103}
104
105/// HS-version dependenent data
106#[derive(Debug, Clone, PartialEq, Eq)]
107#[allow(clippy::large_enum_variant)]
108pub enum HandshakeVSInfo {
109 V4(SocketType),
110 V5 {
111 /// the crypto size in bytes, either 0 (no encryption), 16, 24, or 32
112 /// source: https://github.com/Haivision/srt/blob/master/docs/stransmit.md#medium-srt
113 crypto_size: u8,
114
115 /// The extension HSReq/HSResp
116 ext_hs: Option<SrtControlPacket>,
117
118 /// The extension KMREQ/KMRESP
119 ext_km: Option<SrtControlPacket>,
120
121 /// The extension config (SID, smoother)
122 ext_config: Option<SrtControlPacket>,
123 },
124}
125
126/// The control info for handshake packets
127#[derive(Clone, PartialEq, Eq)]
128pub struct HandshakeControlInfo {
129 /// The initial sequence number, usually randomly initialized
130 pub init_seq_num: SeqNumber,
131
132 /// Max packet size, including UDP/IP headers. 1500 by default
133 pub max_packet_size: u32,
134
135 /// Max flow window size, by default 25600
136 pub max_flow_size: u32,
137
138 /// Designates where in the handshake process this packet lies
139 pub shake_type: ShakeType,
140
141 /// The socket ID that this request is originating from
142 pub socket_id: SocketID,
143
144 /// SYN cookie
145 ///
146 /// "generates a cookie value according to the client address and a
147 /// secret key and sends it back to the client. The client must then send
148 /// back the same cookie to the server."
149 pub syn_cookie: i32,
150
151 /// The IP address of the connecting client
152 pub peer_addr: IpAddr,
153
154 /// The rest of the data, which is HS version specific
155 pub info: HandshakeVSInfo,
156}
157
158#[derive(Clone, PartialEq, Eq)]
159pub struct AckControlInfo {
160 /// The ack sequence number of this ack, increments for each ack sent.
161 /// Stored in additional info
162 pub ack_seq_num: i32,
163
164 /// The packet sequence number that all packets have been recieved until (excluding)
165 pub ack_number: SeqNumber,
166
167 /// Round trip time
168 pub rtt: Option<TimeSpan>,
169
170 /// RTT variance
171 pub rtt_variance: Option<TimeSpan>,
172
173 /// available buffer
174 pub buffer_available: Option<i32>,
175
176 /// receive rate, in packets/sec
177 pub packet_recv_rate: Option<u32>,
178
179 /// Estimated Link capacity
180 pub est_link_cap: Option<i32>,
181}
182
183/// The socket type for a handshake.
184#[derive(Debug, Clone, Copy, PartialEq, Eq)]
185pub enum SocketType {
186 /// A stream socket, 1 when serialized
187 Stream = 1,
188
189 /// A datagram socket, 2 when serialied
190 Datagram = 2,
191}
192
193/// See <https://tools.ietf.org/html/draft-gg-udt-03#page-10>
194///
195/// More applicably,
196///
197/// Note: the client-server connection uses:
198/// --> INDUCTION (empty)
199/// <-- INDUCTION (cookie)
200/// --> CONCLUSION (cookie)
201/// <-- CONCLUSION (ok)
202///
203/// The rendezvous HSv4 (legacy):
204/// --> WAVEAHAND (effective only if peer is also connecting)
205/// <-- CONCLUSION (empty) (consider yourself connected upon reception)
206/// --> AGREEMENT (sent as a response for conclusion, requires no response)
207///
208/// The rendezvous HSv5 (using SRT extensions):
209/// --> WAVEAHAND (with cookie)
210/// --- (selecting INITIATOR/RESPONDER by cookie contest - comparing one another's cookie)
211/// <-- CONCLUSION (without extensions, if RESPONDER, with extensions, if INITIATOR)
212/// --> CONCLUSION (with response extensions, if RESPONDER)
213/// <-- AGREEMENT (sent exclusively by INITIATOR upon reception of CONCLUSIOn with response extensions)
214#[derive(Debug, Clone, Copy, PartialEq, Eq)]
215pub enum ShakeType {
216 /// First handshake exchange in client-server connection
217 Induction = 1,
218
219 /// A rendezvous connection, initial connect request, 0
220 Waveahand = 0,
221
222 /// A rendezvous connection, response to initial connect request, -1
223 /// Also a regular connection client response to the second handshake
224 Conclusion = -1,
225
226 /// Final rendezvous check, -2
227 Agreement = -2,
228}
229
230impl HandshakeVSInfo {
231 /// Get the type (V4) or ext flags (V5)
232 /// the shake_type is required to decide to encode the magic code
233 fn type_flags(&self, shake_type: ShakeType) -> u32 {
234 match self {
235 HandshakeVSInfo::V4(ty) => *ty as u32,
236 HandshakeVSInfo::V5 {
237 crypto_size,
238 ext_hs,
239 ext_km,
240 ext_config,
241 } => {
242 if shake_type == ShakeType::Induction
243 && (ext_hs.is_some() || ext_km.is_some() || ext_config.is_some())
244 {
245 // induction does not include any extensions, and instead has the
246 // magic code. this is an incompatialbe place to be.
247 panic!("Handshake is both induction and has SRT extensions, not valid");
248 }
249
250 let mut flags = ExtFlags::empty();
251
252 if ext_hs.is_some() {
253 flags |= ExtFlags::HS;
254 }
255 if ext_km.is_some() {
256 flags |= ExtFlags::KM;
257 }
258 if ext_config.is_some() {
259 flags |= ExtFlags::CONFIG;
260 }
261 // take the crypto size, get rid of the frist three (garunteed zero) bits, then shift it into the
262 // most significant 2-byte word
263 (u32::from(*crypto_size) >> 3 << 16)
264 // when this is an induction packet, includ the magic code instead of flags
265 | if shake_type == ShakeType::Induction {
266 u32::from(SRT_MAGIC_CODE)
267 } else {
268 u32::from(flags.bits())
269 }
270 }
271 }
272 }
273
274 /// Get the UDT version
275 pub fn version(&self) -> u32 {
276 match self {
277 HandshakeVSInfo::V4(_) => 4,
278 HandshakeVSInfo::V5 { .. } => 5,
279 }
280 }
281}
282
283impl SocketType {
284 /// Turns a u32 into a SocketType. If the u32 wasn't valid (only 1 and 2 are valid), than it returns Err(num)
285 pub fn from_u16(num: u16) -> Result<SocketType, u16> {
286 match num {
287 1 => Ok(SocketType::Stream),
288 2 => Ok(SocketType::Datagram),
289 i => Err(i),
290 }
291 }
292}
293
294impl ControlPacket {
295 pub fn parse(buf: &mut impl Buf) -> Result<ControlPacket, PacketParseError> {
296 let control_type = buf.get_u16() << 1 >> 1; // clear first bit
297
298 // get reserved data, which is the last two bytes of the first four bytes
299 let reserved = buf.get_u16();
300 let add_info = buf.get_i32();
301 let timestamp = TimeStamp::from_micros(buf.get_u32());
302 let dest_sockid = buf.get_u32();
303
304 Ok(ControlPacket {
305 timestamp,
306 dest_sockid: SocketID(dest_sockid),
307 // just match against the second byte, as everything is in that
308 control_type: ControlTypes::deserialize(control_type, reserved, add_info, buf)?,
309 })
310 }
311
312 pub fn serialize<T: BufMut>(&self, into: &mut T) {
313 // first half of first row, the control type and the 1st bit which is a one
314 into.put_u16(self.control_type.id_byte() | (0b1 << 15));
315
316 // finish that row, which is reserved
317 into.put_u16(self.control_type.reserved());
318
319 // the additonal info line
320 into.put_i32(self.control_type.additional_info());
321
322 // timestamp
323 into.put_u32(self.timestamp.as_micros());
324
325 // dest sock id
326 into.put_u32(self.dest_sockid.0);
327
328 // the rest of the info
329 self.control_type.serialize(into);
330 }
331}
332
333impl Debug for ControlPacket {
334 fn fmt(&self, f: &mut Formatter) -> Result<(), fmt::Error> {
335 write!(
336 f,
337 "{{{:?} ts={:.4}s dst={:?}}}",
338 self.control_type,
339 self.timestamp.as_secs_f64(),
340 self.dest_sockid,
341 )
342 }
343}
344
345// I definitely don't totally understand this yet.
346// Points of interest: handshake.h:wrapFlags
347// core.cpp:8176 (processConnectionRequest -> if INDUCTION)
348const SRT_MAGIC_CODE: u16 = 0x4A17;
349
350impl ControlTypes {
351 /// Deserialize a control info
352 /// * `packet_type` - The packet ID byte, the second byte in the first row
353 /// * `reserved` - the second 16 bytes of the first row, reserved for custom packets
354 fn deserialize<T: Buf>(
355 packet_type: u16,
356 reserved: u16,
357 extra_info: i32,
358 mut buf: T,
359 ) -> Result<ControlTypes, PacketParseError> {
360 match packet_type {
361 0x0 => {
362 // Handshake
363 // make sure the packet is large enough -- 8 32-bit words, 1 128 (ip)
364 if buf.remaining() < 8 * 4 + 16 {
365 return Err(PacketParseError::NotEnoughData);
366 }
367
368 let udt_version = buf.get_i32();
369 if udt_version != 4 && udt_version != 5 {
370 return Err(PacketParseError::BadUDTVersion(udt_version));
371 }
372
373 // the second 32 bit word is always socket type under UDT4
374 // under SRT HSv5, it is a bit more complex:
375 //
376 // byte 1-2: the crypto key size, rightshifted by three. For example 0b11 would translate to a crypto size of 24
377 // source: https://github.com/Haivision/srt/blob/4f7f2beb2e1e306111b9b11402049a90cb6d3787/srtcore/handshake.h#L123-L125
378 let crypto_size = buf.get_u16() << 3;
379 // byte 3-4: the SRT_MAGIC_CODE, to make sure a client is HSv5 or the ExtFlags if this is an induction response
380 // else, this is the extension flags
381 //
382 // it's ok to only have the lower 16 bits here for the socket type because socket types always have a zero upper 16 bits
383 let type_ext_socket_type = buf.get_u16();
384
385 let init_seq_num = SeqNumber::new_truncate(buf.get_u32()); // TODO: should this truncate?
386 let max_packet_size = buf.get_u32();
387 let max_flow_size = buf.get_u32();
388 let shake_type = match ShakeType::from_i32(buf.get_i32()) {
389 Ok(ct) => ct,
390 Err(err_ct) => return Err(PacketParseError::BadConnectionType(err_ct)),
391 };
392 let socket_id = SocketID(buf.get_u32());
393 let syn_cookie = buf.get_i32();
394
395 // get the IP
396 let mut ip_buf: [u8; 16] = [0; 16];
397 buf.copy_to_slice(&mut ip_buf);
398
399 // TODO: this is probably really wrong, so fix it
400 let peer_addr = if ip_buf[4..] == [0; 12][..] {
401 IpAddr::from(Ipv4Addr::new(ip_buf[3], ip_buf[2], ip_buf[1], ip_buf[0]))
402 } else {
403 IpAddr::from(ip_buf)
404 };
405
406 let info = match udt_version {
407 4 => HandshakeVSInfo::V4(match SocketType::from_u16(type_ext_socket_type) {
408 Ok(t) => t,
409 Err(e) => return Err(PacketParseError::BadSocketType(e)),
410 }),
411 5 => {
412 // make sure crypto size is of a valid variant
413 let crypto_size = match crypto_size {
414 0 | 16 | 24 | 32 => crypto_size as u8,
415 c => {
416 warn!(
417 "Unrecognized crypto key length: {}, disabling encryption. Should be 16, 24, or 32 bytes",
418 c
419 );
420 0
421 }
422 };
423
424 if shake_type == ShakeType::Induction {
425 if type_ext_socket_type != SRT_MAGIC_CODE {
426 // TODO: should this bail? What does the reference implementation do?
427 warn!("HSv5 induction response did not have SRT_MAGIC_CODE, which is suspicious")
428 }
429
430 HandshakeVSInfo::V5 {
431 crypto_size,
432 ext_hs: None,
433 ext_km: None,
434 ext_config: None,
435 }
436 } else {
437 // if this is not induction, this is the extension flags
438 let extensions = match ExtFlags::from_bits(type_ext_socket_type) {
439 Some(i) => i,
440 None => {
441 warn!(
442 "Unnecessary bits in extensions flags: {:b}",
443 type_ext_socket_type
444 );
445
446 ExtFlags::from_bits_truncate(type_ext_socket_type)
447 }
448 };
449
450 // parse out extensions
451 let ext_hs = if extensions.contains(ExtFlags::HS) {
452 if buf.remaining() < 4 {
453 return Err(PacketParseError::NotEnoughData);
454 }
455 let pack_type = buf.get_u16();
456 let _pack_size = buf.get_u16(); // TODO: why exactly is this needed?
457 match pack_type {
458 // 1 and 2 are handshake response and requests
459 1 | 2 => Some(SrtControlPacket::parse(pack_type, &mut buf)?),
460 e => return Err(PacketParseError::BadSRTHsExtensionType(e)),
461 }
462 } else {
463 None
464 };
465 let ext_km = if extensions.contains(ExtFlags::KM) {
466 if buf.remaining() < 4 {
467 return Err(PacketParseError::NotEnoughData);
468 }
469 let pack_type = buf.get_u16();
470 let _pack_size = buf.get_u16(); // TODO: why exactly is this needed?
471 match pack_type {
472 // 3 and 4 are km packets
473 3 | 4 => Some(SrtControlPacket::parse(pack_type, &mut buf)?),
474 e => return Err(PacketParseError::BadSRTKmExtensionType(e)),
475 }
476 } else {
477 None
478 };
479 let ext_config = if extensions.contains(ExtFlags::CONFIG) {
480 if buf.remaining() < 4 {
481 return Err(PacketParseError::NotEnoughData);
482 }
483 let pack_type = buf.get_u16();
484 let _pack_size = buf.get_u16(); // TODO: why exactly is this needed?
485 match pack_type {
486 // 5 is sid 6 is smoother
487 5 | 6 => Some(SrtControlPacket::parse(pack_type, &mut buf)?),
488 e => {
489 return Err(PacketParseError::BadSRTConfigExtensionType(e))
490 }
491 }
492 } else {
493 None
494 };
495 HandshakeVSInfo::V5 {
496 crypto_size,
497 ext_hs,
498 ext_km,
499 ext_config,
500 }
501 }
502 }
503 _ => unreachable!(), // this is already checked for above
504 };
505
506 Ok(ControlTypes::Handshake(HandshakeControlInfo {
507 init_seq_num,
508 max_packet_size,
509 max_flow_size,
510 shake_type,
511 socket_id,
512 syn_cookie,
513 peer_addr,
514 info,
515 }))
516 }
517 0x1 => Ok(ControlTypes::KeepAlive),
518 0x2 => {
519 // ACK
520
521 // make sure there are enough bytes -- only one required field
522 if buf.remaining() < 4 {
523 return Err(PacketParseError::NotEnoughData);
524 }
525
526 // read control info
527 let ack_number = SeqNumber::new_truncate(buf.get_u32());
528
529 // if there is more data, use it. However, it's optional
530 let opt_read_next_u32 = |buf: &mut T| {
531 if buf.remaining() >= 4 {
532 Some(buf.get_u32())
533 } else {
534 None
535 }
536 };
537 let opt_read_next_i32 = |buf: &mut T| {
538 if buf.remaining() >= 4 {
539 Some(buf.get_i32())
540 } else {
541 None
542 }
543 };
544 let rtt = opt_read_next_i32(&mut buf).map(TimeSpan::from_micros);
545 let rtt_variance = opt_read_next_i32(&mut buf).map(TimeSpan::from_micros);
546 let buffer_available = opt_read_next_i32(&mut buf);
547 let packet_recv_rate = opt_read_next_u32(&mut buf);
548 let est_link_cap = opt_read_next_i32(&mut buf);
549
550 Ok(ControlTypes::Ack(AckControlInfo {
551 ack_seq_num: extra_info,
552 ack_number,
553 rtt,
554 rtt_variance,
555 buffer_available,
556 packet_recv_rate,
557 est_link_cap,
558 }))
559 }
560 0x3 => {
561 // NAK
562
563 let mut loss_info = Vec::new();
564 while buf.remaining() >= 4 {
565 loss_info.push(buf.get_u32());
566 }
567
568 Ok(ControlTypes::Nak(loss_info))
569 }
570 0x5 => Ok(ControlTypes::Shutdown),
571 0x6 => {
572 // ACK2
573 Ok(ControlTypes::Ack2(extra_info))
574 }
575 0x7 => {
576 // Drop request
577 if buf.remaining() < 2 * 4 {
578 return Err(PacketParseError::NotEnoughData);
579 }
580
581 Ok(ControlTypes::DropRequest {
582 msg_to_drop: MsgNumber::new_truncate(extra_info as u32), // cast is safe, just reinterpret
583 first: SeqNumber::new_truncate(buf.get_u32()),
584 last: SeqNumber::new_truncate(buf.get_u32()),
585 })
586 }
587 0x7FFF => {
588 // Srt
589 Ok(ControlTypes::Srt(SrtControlPacket::parse(
590 reserved, &mut buf,
591 )?))
592 }
593 x => Err(PacketParseError::BadControlType(x)),
594 }
595 }
596
597 fn id_byte(&self) -> u16 {
598 match *self {
599 ControlTypes::Handshake(_) => 0x0,
600 ControlTypes::KeepAlive => 0x1,
601 ControlTypes::Ack { .. } => 0x2,
602 ControlTypes::Nak(_) => 0x3,
603 ControlTypes::Shutdown => 0x5,
604 ControlTypes::Ack2(_) => 0x6,
605 ControlTypes::DropRequest { .. } => 0x7,
606 ControlTypes::Srt(_) => 0x7FFF,
607 }
608 }
609
610 fn additional_info(&self) -> i32 {
611 match self {
612 // These types have additional info
613 ControlTypes::DropRequest { msg_to_drop: a, .. } => a.as_raw() as i32,
614 ControlTypes::Ack2(a) | ControlTypes::Ack(AckControlInfo { ack_seq_num: a, .. }) => *a,
615 // These do not, just use zero
616 _ => 0,
617 }
618 }
619
620 fn reserved(&self) -> u16 {
621 match self {
622 ControlTypes::Srt(srt) => srt.type_id(),
623 _ => 0,
624 }
625 }
626
627 fn serialize<T: BufMut>(&self, into: &mut T) {
628 match self {
629 ControlTypes::Handshake(ref c) => {
630 into.put_u32(c.info.version());
631 into.put_u32(c.info.type_flags(c.shake_type));
632 into.put_u32(c.init_seq_num.as_raw());
633 into.put_u32(c.max_packet_size);
634 into.put_u32(c.max_flow_size);
635 into.put_i32(c.shake_type as i32);
636 into.put_u32(c.socket_id.0);
637 into.put_i32(c.syn_cookie);
638
639 match c.peer_addr {
640 IpAddr::V4(four) => {
641 let mut v = Vec::from(&four.octets()[..]);
642 v.reverse(); // reverse bytes
643 into.put(&v[..]);
644
645 // the data structure reuiqres enough space for an ipv6, so pad the end with 16 - 4 = 12 bytes
646 into.put(&[0; 12][..]);
647 }
648 IpAddr::V6(six) => {
649 let mut v = Vec::from(&six.octets()[..]);
650 v.reverse();
651
652 into.put(&v[..]);
653 }
654 }
655
656 // serialzie extensions
657 if let HandshakeVSInfo::V5 {
658 ref ext_hs,
659 ref ext_km,
660 ref ext_config,
661 ..
662 } = c.info
663 {
664 for ext in [ext_hs, ext_km, ext_config]
665 .iter()
666 .filter_map(|&s| s.as_ref())
667 {
668 into.put_u16(ext.type_id());
669 // put the size in 32-bit integers
670 into.put_u16(ext.size_words());
671 ext.serialize(into);
672 }
673 }
674 }
675 ControlTypes::Ack(AckControlInfo {
676 ack_number,
677 rtt,
678 rtt_variance,
679 buffer_available,
680 packet_recv_rate,
681 est_link_cap,
682 ..
683 }) => {
684 into.put_u32(ack_number.as_raw());
685 into.put_i32(rtt.map(|t| t.as_micros()).unwrap_or(10_000));
686 into.put_i32(rtt_variance.map(|t| t.as_micros()).unwrap_or(50_000));
687 into.put_i32(buffer_available.unwrap_or(8175)); // TODO: better defaults
688 into.put_u32(packet_recv_rate.unwrap_or(10_000));
689 into.put_i32(est_link_cap.unwrap_or(1_000));
690 }
691 ControlTypes::Nak(ref n) => {
692 for &loss in n {
693 into.put_u32(loss);
694 }
695 }
696 ControlTypes::DropRequest { .. } => unimplemented!(),
697 ControlTypes::Ack2(_) | ControlTypes::Shutdown | ControlTypes::KeepAlive => {
698 // The reference implementation appends one (4 byte) word at the end of these packets, which wireshark labels as 'Unused'
699 // I have no idea why, but wireshark reports it as a "malformed packet" without it. For the record,
700 // this is NOT in the UDT specification. I wonder if this was carried over from the original UDT implementation.
701 // https://github.com/Haivision/srt/blob/86013826b5e0c4d8e531cf18a30c6ad4b16c1b3b/srtcore/packet.cpp#L309
702 into.put_u32(0x0);
703 }
704 ControlTypes::Srt(srt) => {
705 srt.serialize(into);
706 }
707 };
708 }
709}
710
711impl Debug for ControlTypes {
712 fn fmt(&self, f: &mut Formatter) -> Result<(), fmt::Error> {
713 match self {
714 ControlTypes::Handshake(hs) => write!(f, "{:?}", hs),
715 ControlTypes::KeepAlive => write!(f, "KeepAlive"),
716 ControlTypes::Ack(AckControlInfo {
717 ack_seq_num,
718 ack_number,
719 rtt,
720 rtt_variance,
721 buffer_available,
722 packet_recv_rate,
723 est_link_cap,
724 }) => {
725 write!(f, "Ack(asn={} an={}", ack_seq_num, ack_number,)?;
726 if let Some(rtt) = rtt {
727 write!(f, " rtt={}", rtt.as_micros())?;
728 }
729 if let Some(rttvar) = rtt_variance {
730 write!(f, " rttvar={}", rttvar.as_micros())?;
731 }
732 if let Some(buf) = buffer_available {
733 write!(f, " buf_av={}", buf)?;
734 }
735 if let Some(prr) = packet_recv_rate {
736 write!(f, " pack_rr={}", prr)?;
737 }
738 if let Some(link_cap) = est_link_cap {
739 write!(f, " link_cap={}", link_cap)?;
740 }
741 write!(f, ")")?;
742 Ok(())
743 }
744 ControlTypes::Nak(nak) => {
745 write!(f, "Nak({:?})", nak) // TODO could be better, show ranges
746 }
747 ControlTypes::Shutdown => write!(f, "Shutdown"),
748 ControlTypes::Ack2(ackno) => write!(f, "Ack2({})", ackno),
749 ControlTypes::DropRequest {
750 msg_to_drop,
751 first,
752 last,
753 } => write!(f, "DropReq(msg={} {}-{})", msg_to_drop, first, last),
754 ControlTypes::Srt(srt) => write!(f, "{:?}", srt),
755 }
756 }
757}
758
759// pub init_seq_num: SeqNumber,
760
761// /// Max packet size, including UDP/IP headers. 1500 by default
762// pub max_packet_size: u32,
763
764// /// Max flow window size, by default 25600
765// pub max_flow_size: u32,
766
767// /// Designates where in the handshake process this packet lies
768// pub shake_type: ShakeType,
769
770// /// The socket ID that this request is originating from
771// pub socket_id: SocketID,
772
773// /// SYN cookie
774// ///
775// /// "generates a cookie value according to the client address and a
776// /// secret key and sends it back to the client. The client must then send
777// /// back the same cookie to the server."
778// pub syn_cookie: i32,
779
780// /// The IP address of the connecting client
781// pub peer_addr: IpAddr,
782
783// /// The rest of the data, which is HS version specific
784// pub info: HandshakeVSInfo,
785impl Debug for HandshakeControlInfo {
786 fn fmt(&self, f: &mut Formatter) -> Result<(), fmt::Error> {
787 write!(f, "HS {:?} from({:?})", self.shake_type, self.socket_id)
788 }
789}
790
791impl ShakeType {
792 /// Turns an i32 into a `ConnectionType`, returning Err(num) if no valid one was passed.
793 pub fn from_i32(num: i32) -> Result<ShakeType, i32> {
794 match num {
795 1 => Ok(ShakeType::Induction),
796 0 => Ok(ShakeType::Waveahand),
797 -1 => Ok(ShakeType::Conclusion),
798 -2 => Ok(ShakeType::Agreement),
799 i => Err(i),
800 }
801 }
802}
803
804#[cfg(test)]
805mod test {
806
807 use super::*;
808 use crate::{SeqNumber, SocketID, SrtVersion};
809 use std::io::Cursor;
810 use std::time::Duration;
811
812 #[test]
813 fn handshake_ser_des_test() {
814 let pack = ControlPacket {
815 timestamp: TimeStamp::from_micros(0),
816 dest_sockid: SocketID(0),
817 control_type: ControlTypes::Handshake(HandshakeControlInfo {
818 init_seq_num: SeqNumber::new_truncate(1_827_131),
819 max_packet_size: 1500,
820 max_flow_size: 25600,
821 shake_type: ShakeType::Conclusion,
822 socket_id: SocketID(1231),
823 syn_cookie: 0,
824 peer_addr: "127.0.0.1".parse().unwrap(),
825 info: HandshakeVSInfo::V5 {
826 crypto_size: 0, // TODO: implement
827 ext_hs: Some(SrtControlPacket::HandshakeResponse(SrtHandshake {
828 version: SrtVersion::CURRENT,
829 flags: SrtShakeFlags::NAKREPORT | SrtShakeFlags::TSBPDSND,
830 peer_latency: Duration::from_millis(3000),
831 latency: Duration::from_millis(12345),
832 })),
833 ext_km: None,
834 ext_config: None,
835 },
836 }),
837 };
838
839 let mut buf = vec![];
840 pack.serialize(&mut buf);
841
842 let des = ControlPacket::parse(&mut Cursor::new(buf)).unwrap();
843
844 assert_eq!(pack, des);
845 }
846
847 #[test]
848 fn ack_ser_des_test() {
849 let pack = ControlPacket {
850 timestamp: TimeStamp::from_micros(113_703),
851 dest_sockid: SocketID(2_453_706_529),
852 control_type: ControlTypes::Ack(AckControlInfo {
853 ack_seq_num: 1,
854 ack_number: SeqNumber::new_truncate(282_049_186),
855 rtt: Some(TimeSpan::from_micros(10_002)),
856 rtt_variance: Some(TimeSpan::from_micros(1000)),
857 buffer_available: Some(1314),
858 packet_recv_rate: Some(0),
859 est_link_cap: Some(0),
860 }),
861 };
862
863 let mut buf = vec![];
864 pack.serialize(&mut buf);
865
866 let des = ControlPacket::parse(&mut Cursor::new(buf)).unwrap();
867
868 assert_eq!(pack, des);
869 }
870
871 #[test]
872 fn ack2_ser_des_test() {
873 let pack = ControlPacket {
874 timestamp: TimeStamp::from_micros(125_812),
875 dest_sockid: SocketID(8313),
876 control_type: ControlTypes::Ack2(831),
877 };
878 assert_eq!(pack.control_type.additional_info(), 831);
879
880 let mut buf = vec![];
881 pack.serialize(&mut buf);
882
883 // dword 2 should have 831 in big endian, so the last two bits of the second dword
884 assert_eq!((u32::from(buf[6]) << 8) + u32::from(buf[7]), 831);
885
886 let des = ControlPacket::parse(&mut Cursor::new(buf)).unwrap();
887
888 assert_eq!(pack, des);
889 }
890
891 #[test]
892 fn raw_srt_packet_test() {
893 // this was taken from wireshark on a packet from stransmit that crashed
894 // it is a SRT reject message
895 let packet_data =
896 hex::decode("FFFF000000000000000189702BFFEFF2000103010000001E00000078").unwrap();
897
898 let packet = ControlPacket::parse(&mut Cursor::new(packet_data)).unwrap();
899
900 assert_eq!(
901 packet,
902 ControlPacket {
903 timestamp: TimeStamp::from_micros(100_720),
904 dest_sockid: SocketID(738_193_394),
905 control_type: ControlTypes::Srt(SrtControlPacket::Reject)
906 }
907 )
908 }
909
910 #[test]
911 fn raw_handshake_srt() {
912 // this is a example HSv5 conclusion packet from the reference implementation
913 let packet_data = hex::decode("8000000000000000000F9EC400000000000000050000000144BEA60D000005DC00002000FFFFFFFF3D6936B6E3E405DD0100007F00000000000000000000000000010003000103010000002F00780000").unwrap();
914 let packet = ControlPacket::parse(&mut Cursor::new(&packet_data[..])).unwrap();
915 assert_eq!(
916 packet,
917 ControlPacket {
918 timestamp: TimeStamp::from_micros(1_023_684),
919 dest_sockid: SocketID(0),
920 control_type: ControlTypes::Handshake(HandshakeControlInfo {
921 init_seq_num: SeqNumber(1_153_345_037),
922 max_packet_size: 1500,
923 max_flow_size: 8192,
924 shake_type: ShakeType::Conclusion,
925 socket_id: SocketID(1_030_305_462),
926 syn_cookie: -471_595_555,
927 peer_addr: "127.0.0.1".parse().unwrap(),
928 info: HandshakeVSInfo::V5 {
929 crypto_size: 0,
930 ext_hs: Some(SrtControlPacket::HandshakeRequest(SrtHandshake {
931 version: SrtVersion::new(1, 3, 1),
932 flags: SrtShakeFlags::TSBPDSND
933 | SrtShakeFlags::TSBPDRCV
934 | SrtShakeFlags::HAICRYPT
935 | SrtShakeFlags::TLPKTDROP
936 | SrtShakeFlags::REXMITFLG,
937 peer_latency: Duration::from_millis(120),
938 latency: Duration::new(0, 0)
939 })),
940 ext_km: None,
941 ext_config: None
942 }
943 })
944 }
945 );
946
947 // reserialize it
948 let mut buf = vec![];
949 packet.serialize(&mut buf);
950
951 assert_eq!(&buf[..], &packet_data[..]);
952 }
953
954 #[test]
955 fn raw_handshake_crypto() {
956 // this is an example HSv5 conclusion packet from the reference implementation that has crypto data embedded.
957 let packet_data = hex::decode("800000000000000000175E8A0000000000000005000000036FEFB8D8000005DC00002000FFFFFFFF35E790ED5D16CCEA0100007F00000000000000000000000000010003000103010000002F01F401F40003000E122029010000000002000200000004049D75B0AC924C6E4C9EC40FEB4FE973DB1D215D426C18A2871EBF77E2646D9BAB15DBD7689AEF60EC").unwrap();
958 let packet = ControlPacket::parse(&mut Cursor::new(&packet_data[..])).unwrap();
959
960 assert_eq!(
961 packet,
962 ControlPacket {
963 timestamp: TimeStamp::from_micros(1_531_530),
964 dest_sockid: SocketID(0),
965 control_type: ControlTypes::Handshake(HandshakeControlInfo {
966 init_seq_num: SeqNumber(1_877_981_400),
967 max_packet_size: 1_500,
968 max_flow_size: 8_192,
969 shake_type: ShakeType::Conclusion,
970 socket_id: SocketID(904_368_365),
971 syn_cookie: 1_561_775_338,
972 peer_addr: "127.0.0.1".parse().unwrap(),
973 info: HandshakeVSInfo::V5 {
974 crypto_size: 0,
975 ext_hs: Some(SrtControlPacket::HandshakeRequest(SrtHandshake {
976 version: SrtVersion::new(1, 3, 1),
977 flags: SrtShakeFlags::TSBPDSND
978 | SrtShakeFlags::TSBPDRCV
979 | SrtShakeFlags::HAICRYPT
980 | SrtShakeFlags::TLPKTDROP
981 | SrtShakeFlags::REXMITFLG,
982 peer_latency: Duration::from_millis(500),
983 latency: Duration::from_millis(500)
984 })),
985 ext_km: Some(SrtControlPacket::KeyManagerRequest(SrtKeyMessage {
986 pt: 2,
987 sign: 8_233,
988 keki: 0,
989 cipher: CipherType::CTR,
990 auth: 0,
991 se: 2,
992 salt: hex::decode("9D75B0AC924C6E4C9EC40FEB4FE973DB").unwrap(),
993 even_key: Some(
994 hex::decode("1D215D426C18A2871EBF77E2646D9BAB").unwrap()
995 ),
996 odd_key: None,
997 wrap_data: *b"\x15\xDB\xD7\x68\x9A\xEF\x60\xEC",
998 })),
999 ext_config: None
1000 }
1001 })
1002 }
1003 );
1004
1005 let mut buf = vec![];
1006 packet.serialize(&mut buf);
1007
1008 assert_eq!(&buf[..], &packet_data[..])
1009 }
1010
1011 #[test]
1012 fn raw_handshake_crypto_pt2() {
1013 let packet_data = hex::decode("8000000000000000000000000C110D94000000050000000374B7526E000005DC00002000FFFFFFFF18C1CED1F3819B720100007F00000000000000000000000000020003000103010000003F03E803E80004000E12202901000000000200020000000404D3B3D84BE1188A4EBDA4DA16EA65D522D82DE544E1BE06B6ED8128BF15AA4E18EC50EAA95546B101").unwrap();
1014 let _packet = ControlPacket::parse(&mut Cursor::new(&packet_data[..])).unwrap();
1015 }
1016
1017 #[test]
1018 fn short_ack() {
1019 // this is a packet received from the reference implementation that crashed the parser
1020 let packet_data =
1021 hex::decode("800200000000000e000246e5d96d5e1a389c24780000452900007bb000001fa9")
1022 .unwrap();
1023
1024 let _cp = ControlPacket::parse(&mut Cursor::new(packet_data)).unwrap();
1025 }
1026}