1use super::bbframe::BBFrame;
9use super::bbheader::BBHeader;
10use super::gseheader::{GSEHeader, Label};
11use bytes::{Bytes, BytesMut};
12use crc::Digest;
13use std::collections::HashMap;
14use thiserror::Error;
15
16#[derive(Debug, Clone, Eq, PartialEq, Hash)]
20pub struct GSEPacket {
21 header: GSEHeader,
22 data: Bytes,
23}
24
25#[derive(Error, Debug, Copy, Clone, Eq, PartialEq, Hash)]
27pub enum GSEError {
28 #[error("the BBFRAME is shorter than the BBHEADER length")]
30 BBFrameShort,
31 #[error("the SYNCD field of the GSE-HEM BBFRAME is not a multiple of 8 bits")]
33 SyncdNotMultiple,
34 #[error("The SYNCD field of the GSE-HEM BBFRAME points beyond the end of the BBFRAME")]
36 SyncdTooLarge,
37}
38
39lazy_static::lazy_static! {
40 static ref CRC32: crc::Crc<u32> = crc::Crc::<u32>::new(&crc::CRC_32_MPEG_2);
41}
42
43impl GSEPacket {
44 pub fn from_bytes(bytes: &Bytes, re_used_label: Option<&Label>) -> Option<GSEPacket> {
54 Self::try_from_bytes(bytes, re_used_label, true)
55 }
56
57 fn try_from_bytes(
58 bytes: &Bytes,
59 re_used_label: Option<&Label>,
60 not_contained_is_error: bool,
61 ) -> Option<GSEPacket> {
62 let header = GSEHeader::from_slice(bytes, re_used_label)?;
63 let header_len = header.len();
64 let total_len = 2 + usize::from(header.gse_length());
65 if total_len > bytes.len() {
66 if not_contained_is_error {
67 log::error!("GSE Packet not fully contained inside bytes");
68 }
69 return None;
70 }
71 if total_len < header_len {
72 log::error!("GSE Packet total length is smaller than header length");
73 return None;
74 }
75 let data = bytes.slice(header_len..total_len);
76 Some(GSEPacket { header, data })
77 }
78
79 pub fn split_bytes(bytes: &Bytes) -> impl Iterator<Item = GSEPacket> {
86 Self::try_split_bytes(bytes, true)
87 }
88
89 fn try_split_bytes(
90 bytes: &Bytes,
91 not_contained_is_error: bool,
92 ) -> impl Iterator<Item = GSEPacket> {
93 let mut remain = bytes.slice(..);
94 let mut label = None;
95 std::iter::from_fn(move || {
96 if let Some(packet) =
97 GSEPacket::try_from_bytes(&remain, label.as_ref(), not_contained_is_error)
98 {
99 log::debug!("extracted GSE Packet with header {}", packet.header());
100 log::trace!(
101 "GSE Packet data field {}",
102 faster_hex::hex_string(packet.data())
103 );
104 remain = remain.slice(packet.len()..);
105 if let Some(l) = packet.header.label() {
106 label = Some(l.clone());
107 }
108 Some(packet)
109 } else {
110 log::debug!("no more GSE Packets in BBFRAME");
111 None
112 }
113 })
114 }
115
116 pub fn split_bbframe(bbframe: &BBFrame) -> Result<impl Iterator<Item = GSEPacket>, GSEError> {
125 if bbframe.len() < BBHeader::LEN {
126 return Err(GSEError::BBFrameShort);
127 }
128 Ok(GSEPacket::split_bytes(&bbframe.slice(BBHeader::LEN..)))
129 }
130
131 pub fn len(&self) -> usize {
133 self.header.len() + self.data.len()
134 }
135
136 pub fn is_empty(&self) -> bool {
142 false
143 }
144
145 pub fn header(&self) -> &GSEHeader {
147 &self.header
148 }
149
150 pub fn data(&self) -> &Bytes {
152 &self.data
153 }
154}
155
156#[derive(Debug)]
161pub struct GSEPacketDefrag {
162 defragger: Defragger,
163 hem_leftover: BytesMut,
165 hem_last_label: Option<Label>,
167}
168
169#[derive(Debug)]
172struct Defragger {
173 defrags: HashMap<u8, Defrag>,
174 skip_total_length_check: bool,
175}
176
177struct Defrag {
178 total_length: usize,
179 protocol_type: u16,
180 label: Label,
181 current_length: usize,
182 fragments: Vec<Bytes>,
183 digest: Digest<'static, u32>,
184 skip_total_length_check: bool,
187}
188
189#[derive(Debug, Clone, Eq, PartialEq, Hash)]
194pub struct PDU {
195 data: Bytes,
196 protocol_type: u16,
197 label: Label,
198}
199
200impl PDU {
201 fn from_single_fragment(packet: &GSEPacket) -> Option<PDU> {
202 Some(PDU {
203 protocol_type: packet.header().protocol_type()?,
204 label: packet.header().label()?.clone(),
205 data: packet.data().clone(),
206 })
207 }
208
209 pub fn data(&self) -> &Bytes {
211 &self.data
212 }
213
214 pub fn protocol_type(&self) -> u16 {
216 self.protocol_type
217 }
218
219 pub fn label(&self) -> &Label {
221 &self.label
222 }
223}
224
225enum EitherIter<AIterType, BIterType> {
228 A(AIterType),
229 B(BIterType),
230}
231
232impl<AIterType, BIterType> Iterator for EitherIter<AIterType, BIterType>
233where
234 AIterType: Iterator,
235 BIterType: Iterator<Item = AIterType::Item>,
236{
237 type Item = AIterType::Item;
238 fn next(&mut self) -> Option<<Self as Iterator>::Item> {
239 match self {
240 EitherIter::A(it) => it.next(),
241 EitherIter::B(it) => it.next(),
242 }
243 }
244}
245
246impl GSEPacketDefrag {
247 pub fn new() -> GSEPacketDefrag {
249 GSEPacketDefrag {
250 defragger: Defragger {
251 defrags: HashMap::new(),
252 skip_total_length_check: false,
253 },
254 hem_leftover: BytesMut::new(),
255 hem_last_label: None,
256 }
257 }
258
259 pub fn set_skip_total_length_check(&mut self, value: bool) {
268 self.defragger.skip_total_length_check = value;
269 }
270
271 pub fn defragment<'a>(
279 &'a mut self,
280 bbframe: &'a BBFrame,
281 ) -> Result<impl Iterator<Item = PDU> + 'a, GSEError> {
282 if bbframe.len() < BBHeader::LEN {
283 return Err(GSEError::BBFrameShort);
284 }
285 let bbheader = bbframe[..BBHeader::LEN].try_into().unwrap();
286 let bbheader = BBHeader::new(&bbheader);
287 if bbheader.is_gse_hem() {
288 let syncd_bits = bbheader.syncd();
289 if syncd_bits % 8 != 0 {
290 return Err(GSEError::SyncdNotMultiple);
291 }
292 let syncd_bytes = usize::from(syncd_bits / 8);
293 let remaining_start = BBHeader::LEN + syncd_bytes;
294 if remaining_start >= bbframe.len() {
295 return Err(GSEError::SyncdTooLarge);
296 }
297 let first_packet = match (self.hem_leftover.is_empty(), syncd_bytes == 0) {
298 (true, false) => {
299 log::warn!(
300 "GSE-HEM SYNCD is not zero but we have no leftovers from previous BBFRAME"
301 );
302 None
303 }
304 (false, true) => {
305 log::warn!(
306 "GSE-HEM SYNCD is zero but we have leftovers from previous BBFRAME; \
307 dropping leftovers"
308 );
309 self.hem_leftover.truncate(0);
310 None
311 }
312 (true, true) => None,
313 (false, false) => {
314 self.hem_leftover
315 .extend_from_slice(&bbframe[BBHeader::LEN..remaining_start]);
316 let concat = self.hem_leftover.split_off(0).freeze();
317 let hem_last_label = self.hem_last_label.clone();
318 GSEPacket::from_bytes(&concat, hem_last_label.as_ref()).and_then(|packet| {
319 if packet.len() == concat.len() {
320 Some(packet)
321 } else {
322 log::warn!("GSE packet recovered from GSE-HEM leftovers does not match leftovers length; \
323 dropping packet");
324 None
325 }
326 })
327 }
328 };
329 let remaining = bbframe.slice(remaining_start..);
330 let remaining_packets = GSEPacket::try_split_bytes(&remaining, false);
331 let remaining_packets = remaining_packets
334 .scan(remaining_start, |end, packet| {
335 *end += packet.len();
336 if let Some(l) = packet.header.label() {
337 self.hem_last_label = Some(l.clone());
338 }
339 Some(Some((*end, packet)))
340 })
341 .chain(std::iter::once(None))
342 .scan(remaining_start, |prev_end, packet| {
343 if let Some((end, packet)) = packet {
344 *prev_end = end;
345 Some(packet)
346 } else {
347 assert!(self.hem_leftover.is_empty());
348 self.hem_leftover.extend_from_slice(&bbframe[*prev_end..]);
349 None
350 }
351 });
352 Ok(EitherIter::A(
353 first_packet
354 .into_iter()
355 .chain(remaining_packets)
356 .flat_map(|packet| self.defragger.defrag_packet(&packet)),
357 ))
358 } else {
359 if !self.hem_leftover.is_empty() {
360 log::warn!(
361 "defragmenting non-HEM BBFRAME, but have leftovers from previous HEM BBFRAME; \
362 dropping leftovers"
363 );
364 self.hem_leftover.truncate(0);
365 }
366 Ok(EitherIter::B(
367 GSEPacket::split_bbframe(bbframe)?
368 .flat_map(|packet| self.defragger.defrag_packet(&packet)),
369 ))
370 }
371 }
372}
373
374impl Defragger {
375 fn defrag_packet(&mut self, packet: &GSEPacket) -> Option<PDU> {
376 if packet.header().is_single_fragment() {
377 log::debug!("defragmented GSE Packet as a single fragment");
378 return Some(PDU::from_single_fragment(packet).unwrap());
379 }
380 let frag_id = packet.header().fragment_id().unwrap();
381 if packet.header().start() {
382 log::debug!("start of GSE fragment ID = {}", frag_id);
383 let mut defrag = Defrag::new(packet.header()).unwrap();
384 defrag.set_skip_total_length_check(self.skip_total_length_check);
385 defrag.push(packet);
386 self.defrags.insert(frag_id, defrag);
387 } else if let Some(defrag) = self.defrags.get_mut(&frag_id) {
388 log::debug!("pushing non-start GSE fragment ID = {}", frag_id);
389 defrag.push(packet);
390 }
391 if packet.header.end() {
392 if let Some(defrag) = self.defrags.remove(&frag_id) {
393 log::debug!("end of GSE fragment ID = {}", frag_id);
394 return defrag.reconstruct(frag_id);
395 }
396 }
397 None
398 }
399}
400
401impl Defrag {
402 fn new(header: &GSEHeader) -> Option<Defrag> {
403 Some(Defrag {
404 total_length: usize::from(header.total_length()?),
405 protocol_type: header.protocol_type()?,
406 label: header.label()?.clone(),
407 current_length: 0,
408 fragments: Vec::new(),
409 digest: CRC32.digest(),
410 skip_total_length_check: false,
411 })
412 }
413
414 fn set_skip_total_length_check(&mut self, value: bool) {
415 self.skip_total_length_check = value;
416 }
417
418 fn push(&mut self, packet: &GSEPacket) {
419 self.fragments.push(packet.data().clone());
420 if let Some(total_length) = packet.header().total_length() {
421 self.digest.update(&total_length.to_be_bytes());
422 }
423 if let Some(protocol_type) = packet.header().protocol_type() {
424 self.digest.update(&protocol_type.to_be_bytes());
425 self.current_length += std::mem::size_of::<u16>();
426 }
427 if let Some(label) = packet.header().label() {
428 self.digest.update(label.as_slice());
429 self.current_length += label.len();
430 }
431 if packet.header.end() {
432 let data = packet.data();
433 let crc_size = std::mem::size_of::<u32>();
434 if data.len() >= crc_size {
435 self.digest
436 .update(&packet.data()[..packet.data().len() - crc_size]);
437 self.current_length += packet.data().len() - crc_size;
438 } else {
439 log::error!(
440 "data size of last GSE fragment is {} bytes, \
441 which is less than the CRC-32 length",
442 data.len()
443 );
444 }
445 } else {
446 self.digest.update(packet.data());
447 self.current_length += packet.data().len();
448 }
449 }
450
451 fn reconstruct(self, frag_id: u8) -> Option<PDU> {
452 if !self.skip_total_length_check && self.total_length != self.current_length {
453 log::debug!(
454 "defragmented length {} does not match total length {}",
455 self.current_length,
456 self.total_length
457 );
458 return None;
459 }
460 let data = self.fragments.iter().flatten().copied().collect::<Bytes>();
461 let crc_size = std::mem::size_of::<u32>();
462 if data.len() < crc_size {
463 log::error!("defragmented data is shorter than CRC-32 size");
464 return None;
465 }
466 let crc_calc = self.digest.finalize();
467 let crc_data = u32::from_be_bytes(data[data.len() - crc_size..].try_into().unwrap());
468 if crc_calc != crc_data {
469 log::debug!("invalid CRC-32 for fragment ID = {}", frag_id);
470 return None;
471 }
472 log::debug!("valid CRC-32 for fragment ID = {}", frag_id);
473 Some(PDU {
474 data: data.slice(..data.len() - crc_size),
475 protocol_type: self.protocol_type,
476 label: self.label,
477 })
478 }
479}
480
481impl std::fmt::Debug for Defrag {
482 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
483 f.debug_struct("Defrag")
484 .field("total_length", &self.total_length)
485 .field("fragments", &self.fragments)
486 .finish()
487 }
488}
489
490impl Default for GSEPacketDefrag {
491 fn default() -> GSEPacketDefrag {
492 GSEPacketDefrag::new()
493 }
494}
495
496#[cfg(test)]
497mod test {
498 use super::*;
499 use hex_literal::hex;
500
501 const SINGLE_PACKET: [u8; 104] = hex!(
502 "72 00 00 00 02 f0 00 00 00 15 c0 5c 08 00 02 00
503 48 55 4c 4b 45 00 00 54 6f aa 40 00 40 01 72 fc
504 2c 00 00 01 2c 00 00 02 08 00 4e 94 00 3b 00 04
505 19 7d 6b 63 00 00 00 00 5d 79 08 00 00 00 00 00
506 10 11 12 13 14 15 16 17 18 19 1a 1b 1c 1d 1e 1f
507 20 21 22 23 24 25 26 27 28 29 2a 2b 2c 2d 2e 2f
508 30 31 32 33 34 35 36 37"
509 );
510
511 #[test]
512 fn defrag_single_packet() {
513 let bbframe = Bytes::copy_from_slice(&SINGLE_PACKET);
514 let mut defrag = GSEPacketDefrag::new();
515 let pdus: Vec<_> = defrag.defragment(&bbframe).unwrap().collect();
516 assert_eq!(pdus.len(), 1);
517 let pdu = &pdus[0];
518 assert_eq!(&pdu.data()[..], &SINGLE_PACKET[20..]);
519 assert_eq!(pdu.protocol_type(), 0x0800);
520 assert_eq!(pdu.label().as_slice(), hex!("02 00 48 55 4c 4b"));
521 }
522
523 #[test]
524 fn test_hem_defrag_multiple() {
525 let dfl_bytes = 400;
527 let packet_size_bytes = 75;
528 let num_packets = 100;
529 let bbheader_template = hex!("ba 00 00 00 0c 80 00 00 00 00");
531 let bbheader = BBHeader::new(&bbheader_template);
532 assert_eq!(usize::from(bbheader.dfl()), dfl_bytes * 8);
533 let packets = (0..num_packets)
534 .map(|n| {
535 let gse_length = packet_size_bytes + 2;
537 let mut packet = Vec::with_capacity(gse_length + 2);
538 packet.push(0xe0);
539 packet.push(u8::try_from(gse_length).unwrap());
540 packet.push(0x12);
542 packet.push(0x34);
543 for j in 0..packet_size_bytes {
544 packet.push((j + n) as u8);
545 }
546 packet
547 })
548 .collect::<Vec<Vec<u8>>>();
549 let mut bbframes = Vec::new();
550 let mut bbframe = BytesMut::new();
551 let mut remain = BytesMut::new();
552 let mut packets_total = 0;
553 let mut packets_in_bbframe = 0;
554 for packet in &packets {
555 if bbframe.is_empty() {
556 let syncd = remain.len() * 8;
557 let mut bbheader = bbheader_template;
558 bbheader[7] = ((syncd >> 8) & 0xff) as u8;
559 bbheader[8] = (syncd & 0xff) as u8;
560 let crc = BBHeader::new(&bbheader).compute_crc8();
561 bbheader[9] = crc;
562 assert!(BBHeader::new(&bbheader).crc_is_valid());
563 bbframe.extend_from_slice(&bbheader);
564 bbframe.extend_from_slice(&remain);
565 packets_in_bbframe = if remain.is_empty() { 0 } else { 1 };
566 remain.truncate(0);
567 }
568 let to_take = (dfl_bytes - (bbframe.len() - BBHeader::LEN)).min(packet.len());
569 bbframe.extend_from_slice(&packet[..to_take]);
570 if to_take < packet.len() {
571 bbframes.push(bbframe.split_off(0).freeze());
572 packets_total += packets_in_bbframe;
573 assert!(remain.is_empty());
574 remain.extend_from_slice(&packet[to_take..]);
575 } else {
576 packets_in_bbframe += 1;
577 }
578 }
579 assert!(packets_total > 75);
581 assert!(bbframes.len() > 10);
582
583 let mut defrag = GSEPacketDefrag::new();
585 let mut pdus = Vec::with_capacity(packets_total);
586 for bbframe in &bbframes {
587 for packet in defrag.defragment(bbframe).unwrap() {
588 pdus.push(packet);
589 }
590 }
591 assert_eq!(pdus.len(), packets_total);
592 for (n, pdu) in pdus.iter().enumerate() {
593 let expected = (0..packet_size_bytes)
594 .map(|j| (j + n) as u8)
595 .collect::<Vec<u8>>();
596 assert_eq!(pdu.data(), &expected);
597 assert_eq!(pdu.protocol_type(), 0x1234);
598 }
599 }
600}
601
602#[cfg(test)]
603mod proptests {
604 use super::*;
605 use proptest::prelude::*;
606
607 prop_compose! {
608 fn garbage()
609 (g in proptest::collection::vec(
610 proptest::collection::vec(any::<u8>(), 0..10000), 0..100))
611 -> Vec<BBFrame> {
612 g.into_iter().map(|v| Bytes::copy_from_slice(&v)).collect::<Vec<BBFrame>>()
613 }
614 }
615
616 proptest! {
617 #[test]
618 fn defrag_garbage(garbage_bbframes in garbage()) {
619 let mut defrag = GSEPacketDefrag::new();
620 for bbframe in &garbage_bbframes {
621 if let Ok(pdus) = defrag.defragment(bbframe) {
622 for pdu in pdus {
623 pdu.data();
624 pdu.protocol_type();
625 pdu.label();
626 }
627 }
628 }
629 }
630 }
631}