rtmp/chunk/
unpacketizer.rs

1use {
2    super::{
3        define,
4        errors::{UnpackError, UnpackErrorValue},
5        ChunkBasicHeader, ChunkInfo, ChunkMessageHeader, ExtendTimestampType,
6    },
7    crate::messages::define::msg_type_id,
8    byteorder::{BigEndian, LittleEndian},
9    bytes::{BufMut, BytesMut},
10    bytesio::bytes_reader::BytesReader,
11    std::{cmp::min, collections::HashMap, fmt, vec::Vec},
12};
13
14const PARSE_ERROR_NUMVER: usize = 5;
15
16#[derive(Eq, PartialEq, Debug)]
17pub enum UnpackResult {
18    ChunkBasicHeaderResult(ChunkBasicHeader),
19    ChunkMessageHeaderResult(ChunkMessageHeader),
20    ChunkInfo(ChunkInfo),
21    Chunks(Vec<ChunkInfo>),
22    Success,
23    NotEnoughBytes,
24    Empty,
25}
26
27#[derive(Copy, Clone, Debug)]
28enum ChunkReadState {
29    ReadBasicHeader = 1,
30    ReadMessageHeader = 2,
31    ReadExtendedTimestamp = 3,
32    ReadMessagePayload = 4,
33    Finish = 5,
34}
35
36impl fmt::Display for ChunkReadState {
37    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
38        match self {
39            ChunkReadState::ReadBasicHeader => {
40                write!(f, "ReadBasicHeader",)
41            }
42            ChunkReadState::ReadMessageHeader => {
43                write!(f, "ReadMessageHeader",)
44            }
45            ChunkReadState::ReadExtendedTimestamp => {
46                write!(f, "ReadExtendedTimestamp",)
47            }
48            ChunkReadState::ReadMessagePayload => {
49                write!(f, "ReadMessagePayload",)
50            }
51            ChunkReadState::Finish => {
52                write!(f, "Finish",)
53            }
54        }
55    }
56}
57
58#[derive(Copy, Clone, Debug)]
59enum MessageHeaderReadState {
60    ReadTimeStamp = 1,
61    ReadMsgLength = 2,
62    ReadMsgTypeID = 3,
63    ReadMsgStreamID = 4,
64}
65
66pub struct ChunkUnpacketizer {
67    pub reader: BytesReader,
68
69    //https://doc.rust-lang.org/stable/rust-by-example/scope/lifetime/fn.html
70    //https://zhuanlan.zhihu.com/p/165976086
71    //We use this member to generate a complete message:
72    // - basic_header:   the 2 fields will be updated from each chunk.
73    // - message_header: whose fields need to be updated for current chunk
74    //                   depends on the format id from basic header.
75    //                   Each field can inherit the value from the previous chunk.
76    // - payload:        If the message's payload size is longger than the max chunk size,
77    //                   the whole payload will be splitted into several chunks.
78    //
79    pub current_chunk_info: ChunkInfo,
80    chunk_message_headers: HashMap<u32, ChunkMessageHeader>,
81    chunk_read_state: ChunkReadState,
82    msg_header_read_state: MessageHeaderReadState,
83    max_chunk_size: usize,
84    chunk_index: u32,
85    pub session_type: u8,
86    parse_error_number: usize,
87}
88
89impl Default for ChunkUnpacketizer {
90    fn default() -> Self {
91        Self::new()
92    }
93}
94
95impl ChunkUnpacketizer {
96    pub fn new() -> Self {
97        Self {
98            reader: BytesReader::new(BytesMut::new()),
99            current_chunk_info: ChunkInfo::default(),
100            chunk_message_headers: HashMap::new(),
101            chunk_read_state: ChunkReadState::ReadBasicHeader,
102            msg_header_read_state: MessageHeaderReadState::ReadTimeStamp,
103            max_chunk_size: define::INIT_CHUNK_SIZE as usize,
104            chunk_index: 0,
105            session_type: 0,
106            parse_error_number: 0,
107        }
108    }
109
110    pub fn extend_data(&mut self, data: &[u8]) {
111        self.reader.extend_from_slice(data);
112
113        log::trace!(
114            "extend_data length: {}: content:{:X?}",
115            self.reader.len(),
116            self.reader
117                .get_remaining_bytes()
118                .split_to(self.reader.len())
119                .to_vec()
120        );
121    }
122
123    pub fn update_max_chunk_size(&mut self, chunk_size: usize) {
124        log::trace!("update max chunk size: {}", chunk_size);
125        self.max_chunk_size = chunk_size;
126    }
127
128    pub fn read_chunks(&mut self) -> Result<UnpackResult, UnpackError> {
129        // log::trace!(
130        //     "read chunks, reader remaining data: {}",
131        //     self.reader.get_remaining_bytes()
132        // );
133
134        let mut chunks: Vec<ChunkInfo> = Vec::new();
135
136        loop {
137            match self.read_chunk() {
138                Ok(chunk) => {
139                    match chunk {
140                        UnpackResult::ChunkInfo(chunk_info) => {
141                            let msg_type_id = chunk_info.message_header.msg_type_id;
142                            chunks.push(chunk_info);
143
144                            //if the chunk_size is changed, then break and update chunk_size
145                            if msg_type_id == msg_type_id::SET_CHUNK_SIZE {
146                                break;
147                            }
148                        }
149                        _ => continue,
150                    }
151                }
152                Err(err) => {
153                    if let UnpackErrorValue::CannotParse = err.value {
154                        return Err(err);
155                    }
156                    break;
157                }
158            }
159        }
160
161        if !chunks.is_empty() {
162            Ok(UnpackResult::Chunks(chunks))
163        } else {
164            Err(UnpackError {
165                value: UnpackErrorValue::EmptyChunks,
166            })
167        }
168    }
169
170    /******************************************************************************
171     * 5.3.1 Chunk Format
172     * Each chunk consists of a header and data. The header itself has three parts:
173     * +--------------+----------------+--------------------+--------------+
174     * | Basic Header | Message Header | Extended Timestamp | Chunk Data |
175     * +--------------+----------------+--------------------+--------------+
176     * |<------------------- Chunk Header ----------------->|
177     ******************************************************************************/
178    pub fn read_chunk(&mut self) -> Result<UnpackResult, UnpackError> {
179        let mut result: UnpackResult = UnpackResult::Empty;
180
181        self.chunk_index += 1;
182
183        loop {
184            result = match self.chunk_read_state {
185                ChunkReadState::ReadBasicHeader => self.read_basic_header()?,
186                ChunkReadState::ReadMessageHeader => self.read_message_header()?,
187                ChunkReadState::ReadExtendedTimestamp => self.read_extended_timestamp()?,
188                ChunkReadState::ReadMessagePayload => self.read_message_payload()?,
189                ChunkReadState::Finish => {
190                    self.chunk_read_state = ChunkReadState::ReadBasicHeader;
191                    break;
192                }
193            };
194        }
195
196        Ok(result)
197
198        // Ok(UnpackResult::Success)
199    }
200
201    pub fn print_current_basic_header(&mut self) {
202        log::trace!(
203            "print_current_basic_header, csid: {},format id: {}",
204            self.current_chunk_info.basic_header.chunk_stream_id,
205            self.current_chunk_info.basic_header.format
206        );
207    }
208
209    /******************************************************************
210     * 5.3.1.1. Chunk Basic Header
211     * The Chunk Basic Header encodes the chunk stream ID and the chunk
212     * type(represented by fmt field in the figure below). Chunk type
213     * determines the format of the encoded message header. Chunk Basic
214     * Header field may be 1, 2, or 3 bytes, depending on the chunk stream
215     * ID.
216     *
217     * The bits 0-5 (least significant) in the chunk basic header represent
218     * the chunk stream ID.
219     *
220     * Chunk stream IDs 2-63 can be encoded in the 1-byte version of this
221     * field.
222     *    0 1 2 3 4 5 6 7
223     *   +-+-+-+-+-+-+-+-+
224     *   |fmt|   cs id   |
225     *   +-+-+-+-+-+-+-+-+
226     *   Figure 6 Chunk basic header 1
227     *
228     * Chunk stream IDs 64-319 can be encoded in the 2-byte version of this
229     * field. ID is computed as (the second byte + 64).
230     *   0                   1
231     *   0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5
232     *   +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
233     *   |fmt|    0      | cs id - 64    |
234     *   +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
235     *   Figure 7 Chunk basic header 2
236     *
237     * Chunk stream IDs 64-65599 can be encoded in the 3-byte version of
238     * this field. ID is computed as ((the third byte)*256 + the second byte
239     * + 64).
240     *    0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3
241     *   +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
242     *   |fmt|     1     |         cs id - 64            |
243     *   +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
244     *   Figure 8 Chunk basic header 3
245     *
246     * cs id: 6 bits
247     * fmt: 2 bits
248     * cs id - 64: 8 or 16 bits
249     *
250     * Chunk stream IDs with values 64-319 could be represented by both 2-
251     * byte version and 3-byte version of this field.
252     ***********************************************************************/
253
254    pub fn read_basic_header(&mut self) -> Result<UnpackResult, UnpackError> {
255        let byte = self.reader.read_u8()?;
256
257        let format_id = (byte >> 6) & 0b00000011;
258        let mut csid = (byte & 0b00111111) as u32;
259
260        match csid {
261            0 => {
262                if self.reader.is_empty() {
263                    return Ok(UnpackResult::NotEnoughBytes);
264                }
265                csid = 64;
266                csid += self.reader.read_u8()? as u32;
267            }
268            1 => {
269                if self.reader.is_empty() {
270                    return Ok(UnpackResult::NotEnoughBytes);
271                }
272                csid = 64;
273                csid += self.reader.read_u8()? as u32;
274                csid += self.reader.read_u8()? as u32 * 256;
275            }
276            _ => {}
277        }
278
279        //todo
280        //Only when the csid is changed, we restore the chunk message header
281        //One AV message may be splitted into serval chunks, the csid
282        //will be updated when one av message's chunks are completely
283        //sent/received??
284        if csid != self.current_chunk_info.basic_header.chunk_stream_id {
285            log::trace!(
286                "read_basic_header, chunk stream id update, new: {}, old:{}, byte: {}",
287                csid,
288                self.current_chunk_info.basic_header.chunk_stream_id,
289                byte
290            );
291            //If the chunk stream id is changed, then we should
292            //restore the cached chunk message header used for
293            //getting the correct message header fields.
294            match self.chunk_message_headers.get_mut(&csid) {
295                Some(header) => {
296                    self.current_chunk_info.message_header = header.clone();
297                    self.print_current_basic_header();
298                }
299                None => {
300                    //The format id of the first chunk of a new chunk stream id must be zero.
301                    //assert_eq!(format_id, 0);
302                    if format_id != 0 {
303                        log::warn!(
304                            "The chunk stream id: {}'s first chunk format is {}.",
305                            csid,
306                            format_id
307                        );
308
309                        if self.parse_error_number > PARSE_ERROR_NUMVER {
310                            return Err(UnpackError {
311                                value: UnpackErrorValue::CannotParse,
312                            });
313                        }
314                        self.parse_error_number += 1;
315                    } else {
316                        //reset
317                        self.parse_error_number = 0;
318                    }
319                }
320            }
321        }
322
323        if format_id == 0 {
324            self.current_message_header().timestamp_delta = 0;
325        }
326        // each chunk will read and update the csid and format id
327        self.current_chunk_info.basic_header.chunk_stream_id = csid;
328        self.current_chunk_info.basic_header.format = format_id;
329        self.print_current_basic_header();
330
331        self.chunk_read_state = ChunkReadState::ReadMessageHeader;
332
333        Ok(UnpackResult::ChunkBasicHeaderResult(ChunkBasicHeader::new(
334            format_id, csid,
335        )))
336    }
337
338    fn current_message_header(&mut self) -> &mut ChunkMessageHeader {
339        &mut self.current_chunk_info.message_header
340    }
341
342    fn print_current_message_header(&self, state: ChunkReadState) {
343        log::trace!(
344            "print_current_basic_header state {}, timestamp:{}, timestamp delta:{}, msg length: {},msg type id: {}, msg stream id:{}",
345            state,
346            self.current_chunk_info.message_header.timestamp,
347            self.current_chunk_info.message_header.timestamp_delta,
348            self.current_chunk_info.message_header.msg_length,
349            self.current_chunk_info.message_header.msg_type_id,
350            self.current_chunk_info.message_header.msg_streamd_id
351        );
352    }
353
354    pub fn read_message_header(&mut self) -> Result<UnpackResult, UnpackError> {
355        log::trace!(
356            "read_message_header, data left in buffer: {}",
357            self.reader.len(),
358        );
359
360        //Reset is_extended_timestamp for type 0 ,1 ,2 , for type 3 ,this field will
361        //inherited from the most recent chunk 0, 1, or 2.
362        //(This field is present in Type 3 chunks when the most recent Type 0,
363        //1, or 2 chunk for the same chunk stream ID indicated the presence of
364        //an extended timestamp field. 5.3.1.3)
365        if self.current_chunk_info.basic_header.format != 3 {
366            self.current_message_header().extended_timestamp_type = ExtendTimestampType::NONE;
367        }
368
369        match self.current_chunk_info.basic_header.format {
370            /*****************************************************************/
371            /*      5.3.1.2.1. Type 0                                        */
372            /*****************************************************************
373             0                   1                   2                   3
374             0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
375            +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
376            |                timestamp(3bytes)              |message length |
377            +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
378            | message length (cont)(3bytes) |message type id| msg stream id |
379            +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
380            |       message stream id (cont) (4bytes)       |
381            +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
382            *****************************************************************/
383            0 => {
384                loop {
385                    match self.msg_header_read_state {
386                        MessageHeaderReadState::ReadTimeStamp => {
387                            self.current_message_header().timestamp =
388                                self.reader.read_u24::<BigEndian>()?;
389                            self.msg_header_read_state = MessageHeaderReadState::ReadMsgLength;
390                        }
391                        MessageHeaderReadState::ReadMsgLength => {
392                            self.current_message_header().msg_length =
393                                self.reader.read_u24::<BigEndian>()?;
394
395                            log::trace!(
396                                "read_message_header format 0, msg_length: {}",
397                                self.current_message_header().msg_length,
398                            );
399                            self.msg_header_read_state = MessageHeaderReadState::ReadMsgTypeID;
400                        }
401                        MessageHeaderReadState::ReadMsgTypeID => {
402                            self.current_message_header().msg_type_id = self.reader.read_u8()?;
403
404                            log::trace!(
405                                "read_message_header format 0, msg_type_id: {}",
406                                self.current_message_header().msg_type_id
407                            );
408                            self.msg_header_read_state = MessageHeaderReadState::ReadMsgStreamID;
409                        }
410                        MessageHeaderReadState::ReadMsgStreamID => {
411                            self.current_message_header().msg_streamd_id =
412                                self.reader.read_u32::<LittleEndian>()?;
413                            self.msg_header_read_state = MessageHeaderReadState::ReadTimeStamp;
414                            break;
415                        }
416                    }
417                }
418
419                if self.current_message_header().timestamp >= 0xFFFFFF {
420                    self.current_message_header().extended_timestamp_type =
421                        ExtendTimestampType::FORMAT0;
422                }
423            }
424            /*****************************************************************/
425            /*      5.3.1.2.2. Type 1                                        */
426            /*****************************************************************
427             0                   1                   2                   3
428             0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
429            +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
430            |                timestamp(3bytes)              |message length |
431            +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
432            | message length (cont)(3bytes) |message type id|
433            +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
434            *****************************************************************/
435            1 => {
436                loop {
437                    match self.msg_header_read_state {
438                        MessageHeaderReadState::ReadTimeStamp => {
439                            self.current_message_header().timestamp_delta =
440                                self.reader.read_u24::<BigEndian>()?;
441                            self.msg_header_read_state = MessageHeaderReadState::ReadMsgLength;
442                        }
443                        MessageHeaderReadState::ReadMsgLength => {
444                            self.current_message_header().msg_length =
445                                self.reader.read_u24::<BigEndian>()?;
446
447                            log::trace!(
448                                "read_message_header format 1, msg_length: {}",
449                                self.current_message_header().msg_length
450                            );
451                            self.msg_header_read_state = MessageHeaderReadState::ReadMsgTypeID;
452                        }
453                        MessageHeaderReadState::ReadMsgTypeID => {
454                            self.current_message_header().msg_type_id = self.reader.read_u8()?;
455
456                            log::trace!(
457                                "read_message_header format 1, msg_type_id: {}",
458                                self.current_message_header().msg_type_id
459                            );
460                            self.msg_header_read_state = MessageHeaderReadState::ReadTimeStamp;
461                            break;
462                        }
463                        _ => {
464                            log::error!("error happend when read chunk message header");
465                            break;
466                        }
467                    }
468                }
469
470                if self.current_message_header().timestamp_delta >= 0xFFFFFF {
471                    self.current_message_header().extended_timestamp_type =
472                        ExtendTimestampType::FORMAT12;
473                }
474            }
475            /************************************************/
476            /*      5.3.1.2.3. Type 2                       */
477            /************************************************
478             0                   1                   2
479             0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3
480            +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
481            |                timestamp(3bytes)              |
482            +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
483            ***************************************************/
484            2 => {
485                log::trace!(
486                    "read_message_header format 2, msg_type_id: {}",
487                    self.current_message_header().msg_type_id
488                );
489                self.current_message_header().timestamp_delta =
490                    self.reader.read_u24::<BigEndian>()?;
491
492                if self.current_message_header().timestamp_delta >= 0xFFFFFF {
493                    self.current_message_header().extended_timestamp_type =
494                        ExtendTimestampType::FORMAT12;
495                }
496            }
497
498            _ => {}
499        }
500
501        self.chunk_read_state = ChunkReadState::ReadExtendedTimestamp;
502        self.print_current_message_header(ChunkReadState::ReadMessageHeader);
503
504        Ok(UnpackResult::Success)
505    }
506
507    pub fn read_extended_timestamp(&mut self) -> Result<UnpackResult, UnpackError> {
508        //The extended timestamp field is present in Type 3 chunks when the most recent Type 0,
509        //1, or 2 chunk for the same chunk stream ID indicated the presence of
510        //an extended timestamp field.
511        match self.current_message_header().extended_timestamp_type {
512            //the current fortmat type can be 0 or 3
513            ExtendTimestampType::FORMAT0 => {
514                self.current_message_header().timestamp = self.reader.read_u32::<BigEndian>()?;
515            }
516            //the current fortmat type can be 1,2 or 3
517            ExtendTimestampType::FORMAT12 => {
518                self.current_message_header().timestamp_delta =
519                    self.reader.read_u32::<BigEndian>()?;
520            }
521            ExtendTimestampType::NONE => {}
522        }
523
524        //compute the abs timestamp
525        let cur_format_id = self.current_chunk_info.basic_header.format;
526        if cur_format_id == 1
527            || cur_format_id == 2
528            || (cur_format_id == 3 && self.current_chunk_info.payload.is_empty())
529        {
530            let timestamp = self.current_message_header().timestamp;
531            let timestamp_delta = self.current_message_header().timestamp_delta;
532
533            let (cur_abs_timestamp, is_overflow) = timestamp.overflowing_add(timestamp_delta);
534            if is_overflow {
535                log::warn!(
536                    "The current timestamp is overflow, current basic header: {:?}, current message header: {:?}, payload len: {}, abs timestamp: {}",
537                    self.current_chunk_info.basic_header,
538                    self.current_chunk_info.message_header,
539                    self.current_chunk_info.payload.len(),
540                    cur_abs_timestamp
541                );
542            }
543            self.current_message_header().timestamp = cur_abs_timestamp;
544        }
545
546        self.chunk_read_state = ChunkReadState::ReadMessagePayload;
547        self.print_current_message_header(ChunkReadState::ReadExtendedTimestamp);
548
549        Ok(UnpackResult::Success)
550    }
551
552    pub fn read_message_payload(&mut self) -> Result<UnpackResult, UnpackError> {
553        let whole_msg_length = self.current_message_header().msg_length as usize;
554        let remaining_bytes = whole_msg_length - self.current_chunk_info.payload.len();
555
556        log::trace!(
557            "read_message_payload whole msg length: {} and remaining bytes need to be read: {}",
558            whole_msg_length,
559            remaining_bytes
560        );
561
562        let mut need_read_length = remaining_bytes;
563        if whole_msg_length > self.max_chunk_size {
564            need_read_length = min(remaining_bytes, self.max_chunk_size);
565        }
566
567        let remaining_mut = self.current_chunk_info.payload.remaining_mut();
568        if need_read_length > remaining_mut {
569            let additional = need_read_length - remaining_mut;
570            self.current_chunk_info.payload.reserve(additional);
571        }
572
573        log::trace!(
574            "read_message_payload buffer len:{}, need_read_length: {}",
575            self.reader.len(),
576            need_read_length
577        );
578
579        let payload_data = self.reader.read_bytes(need_read_length)?;
580        self.current_chunk_info
581            .payload
582            .extend_from_slice(&payload_data[..]);
583
584        log::trace!(
585            "read_message_payload current msg payload len:{}",
586            self.current_chunk_info.payload.len()
587        );
588
589        if self.current_chunk_info.payload.len() == whole_msg_length {
590            self.chunk_read_state = ChunkReadState::Finish;
591            //get the complete chunk and clear the current chunk payload
592            let chunk_info = self.current_chunk_info.clone();
593            self.current_chunk_info.payload.clear();
594
595            let csid = self.current_chunk_info.basic_header.chunk_stream_id;
596            self.chunk_message_headers
597                .insert(csid, self.current_chunk_info.message_header.clone());
598
599            return Ok(UnpackResult::ChunkInfo(chunk_info));
600        }
601
602        self.chunk_read_state = ChunkReadState::ReadBasicHeader;
603
604        Ok(UnpackResult::Success)
605    }
606}
607
608#[cfg(test)]
609mod tests {
610
611    use super::ChunkInfo;
612    use super::ChunkUnpacketizer;
613    use super::UnpackResult;
614    use bytes::BytesMut;
615
616    #[test]
617    fn test_set_chunk_size() {
618        let mut unpacker = ChunkUnpacketizer::new();
619
620        let data: [u8; 16] = [
621            //
622            2, //|format+csid|
623            00, 00, 00, //timestamp
624            00, 00, 4, //msg_length
625            1, //msg_type_id
626            00, 00, 00, 00, //msg_stream_id
627            00, 00, 10, 00, //body
628        ];
629
630        unpacker.extend_data(&data[..]);
631
632        let rv = unpacker.read_chunk();
633
634        let mut body = BytesMut::new();
635        body.extend_from_slice(&[00, 00, 10, 00]);
636
637        let expected = ChunkInfo::new(2, 0, 0, 4, 1, 0, body);
638
639        println!("{:?}, {:?}", expected.basic_header, expected.message_header);
640
641        assert_eq!(
642            rv.unwrap(),
643            UnpackResult::ChunkInfo(expected),
644            "not correct"
645        )
646    }
647
648    #[test]
649    fn test_overflow_add() {
650        let aa: u32 = u32::MAX;
651        println!("{}", aa);
652
653        let (_a, _b) = aa.overflowing_add(5);
654
655        let b = aa.wrapping_add(5);
656
657        println!("{}", b);
658    }
659
660    use std::collections::VecDeque;
661
662    #[test]
663    fn test_unpacketizer2() {
664        let mut queue = VecDeque::new();
665        queue.push_back(2);
666        queue.push_back(3);
667        queue.push_back(4);
668
669        for data in queue.iter() {
670            println!("{}", data);
671        }
672    }
673
674    // #[test]
675    // fn test_window_acknowlage_size_set_peer_bandwidth() {
676    //     let mut unpacker = ChunkUnpacketizer::new();
677
678    //     let data: [u8; 33] = [
679    //         0x02, 0x00, 0x00, 0x00, 0x00, 0x00, 0x04, 0x05, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
680    //         0x10, 0x00, 0x02, 0x00, 0x00, 0x00, 0x00, 0x00, 0x05, 0x06, 0x00, 0x00, 0x00, 0x00,
681    //         0x00, 0x00, 0x10, 0x00, 0x02,
682    //     ];
683
684    //     unpacker.extend_data(&data[..]);
685
686    //     let rv = unpacker.read_chunk();
687
688    //     let rv2 = unpacker.read_chunk();
689
690    //     let mut body = BytesMut::new();
691    //     body.extend_from_slice(&[00, 00, 10, 00]);
692
693    //     let expected = ChunkInfo::new(2, 0, 0, 4, 1, 0, body);
694
695    //     assert_eq!(
696    //         rv.unwrap(),
697    //         UnpackResult::ChunkInfo(expected),
698    //         "not correct"
699    //     )
700    // }
701
702    // #[test]
703    // fn test_on_connect() {
704    //     // 0000   03 00 00 00 00 00 b1 14 00 00 00 00 02 00 07 63  ...............c
705    //     // 0010   6f 6e 6e 65 63 74 00 3f f0 00 00 00 00 00 00 03  onnect.?........
706    //     // 0020   00 03 61 70 70 02 00 06 68 61 72 6c 61 6e 00 04  ..app...harlan..
707    //     // 0030   74 79 70 65 02 00 0a 6e 6f 6e 70 72 69 76 61 74  type...nonprivat
708    //     // 0040   65 00 08 66 6c 61 73 68 56 65 72 02 00 1f 46 4d  e..flashVer...FM
709    //     // 0050   4c 45 2f 33 2e 30 20 28 63 6f 6d 70 61 74 69 62  LE/3.0 (compatib
710    //     // 0060   6c 65 3b 20 46 4d 53 63 2f 31 2e 30 29 00 06 73  le; FMSc/1.0)..s
711    //     // 0070   77 66 55 72 6c 02 00 1c 72 74 6d 70 3a 2f 2f 6c  wfUrl...rtmp://l
712    //     // 0080   6f 63 61 6c 68 6f 73 74 3a 31 39 33 35 2f 68 61  ocalhost:1935/ha
713    //     // 0090   72 6c 61 6e 00 05 74 63 55 72 6c 02 00 1c 72 74  rlan..tcUrl...rt
714    //     // 00a0   6d 70 3a 2f 2f 6c 6f 63 61 6c 68 6f 73 74 3a 31  mp://localhost:1
715    //     // 00b0   39 33 35 2f 68 61 72 6c 61 6e 00 00 09           935/harlan...
716    //     // let data: [u8; 189] = [
717    //     //     3, //|format+csid|
718    //     //     0, 0, 0, //timestamp
719    //     //     0, 0, 177, //msg_length
720    //     //     20,  //msg_type_id 0x14
721    //     //     0, 0, 0, 0, //msg_stream_id
722    //     //     2, 0, 7, 99, 111, 110, 110, 101, 99, 116, 0, 63, 240, 0, 0, 0, 0, 0, 0, //body
723    //     //     3, 0, 3, 97, 112, 112, 2, 0, 6, 104, 97, 114, 108, 97, 110, 0, 4, 116, 121, 112, 101,
724    //     //     2, 0, 10, 110, 111, 110, 112, 114, 105, 118, 97, 116, 101, 0, 8, 102, 108, 97, 115,
725    //     //     104, 86, 101, 114, 2, 0, 31, 70, 77, 76, 69, 47, 51, 46, 48, 32, 40, 99, 111, 109, 112,
726    //     //     97, 116, 105, 98, 108, 101, 59, 32, 70, 77, 83, 99, 47, 49, 46, 48, 41, 0, 6, 115, 119,
727    //     //     102, 85, 114, 108, 2, 0, 28, 114, 116, 109, 112, 58, 47, 47, 108, 111, 99, 97, 108,
728    //     //     104, 111, 115, 116, 58, 49, 57, 51, 53, 47, 104, 97, 114, 108, 97, 110, 0, 5, 116, 99,
729    //     //     85, 114, 108, 2, 0, 28, 114, 116, 109, 112, 58, 47, 47, 108, 111, 99, 97, 108, 104,
730    //     //     111, 115, 116, 58, 49, 57, 51, 53, 47, 104, 97, 114, 108, 97, 110, 0, 0, 9,
731    //     // ];
732
733    //     let data: [u8; 189] = [
734    //         0x03,
735    //         0x00, 0x00, 0x00,
736    //         0x00, 0x00, 0xb1,
737    //         0x14,
738    //         0x00, 0x00, 0x00, 0x00,
739    //         0x02, 0x00,
740    //         0x07, 0x63, 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x00, 0x3f, 0xf0, 0x00, 0x00, 0x00,
741    //         0x00, 0x00, 0x00, 0x03, 0x00, 0x03, 0x61, 0x70, 0x70, 0x02, 0x00, 0x06, 0x68, 0x61,
742    //         0x72, 0x6c, 0x61, 0x6e, 0x00, 0x04, 0x74, 0x79, 0x70, 0x65, 0x02, 0x00, 0x0a, 0x6e,
743    //         0x6f, 0x6e, 0x70, 0x72, 0x69, 0x76, 0x61, 0x74, 0x65, 0x00, 0x08, 0x66, 0x6c, 0x61,
744    //         0x73, 0x68, 0x56, 0x65, 0x72, 0x02, 0x00, 0x1f, 0x46, 0x4d, 0x4c, 0x45, 0x2f, 0x33,
745    //         0x2e, 0x30, 0x20, 0x28, 0x63, 0x6f, 0x6d, 0x70, 0x61, 0x74, 0x69, 0x62, 0x6c, 0x65,
746    //         0x3b, 0x20, 0x46, 0x4d, 0x53, 0x63, 0x2f, 0x31, 0x2e, 0x30, 0x29, 0x00, 0x06, 0x73,
747    //         0x77, 0x66, 0x55, 0x72, 0x6c, 0x02, 0x00, 0x1c, 0x72, 0x74, 0x6d, 0x70, 0x3a, 0x2f,
748    //         0x2f, 0x6c, 0x6f, 0x63, 0x61, 0x6c, 0x68, 0x6f, 0x73, 0x74, 0x3a, 0x31, 0x39, 0x33,
749    //         0x35, 0x2f, 0x68, 0x61, 0x72, 0x6c, 0x61, 0x6e, 0x00, 0x05, 0x74, 0x63, 0x55, 0x72,
750    //         0x6c, 0x02, 0x00, 0x1c, 0x72, 0x74, 0x6d, 0x70, 0x3a, 0x2f, 0x2f, 0x6c, 0x6f, 0x63,
751    //         0x61, 0x6c, 0x68, 0x6f, 0x73, 0x74, 0x3a, 0x31, 0x39, 0x33, 0x35, 0x2f, 0x68, 0x61,
752    //         0x72, 0x6c, 0x61, 0x6e, 0x00, 0x00, 0x09,
753    //     ];
754
755    //     let mut unpacker = ChunkUnpacketizer::new();
756    //     unpacker.extend_data(&data[..]);
757
758    //     let rv = unpacker.read_chunk();
759    //     match &rv {
760    //         Err(err) => {
761    //             println!("==={}===", err);
762    //         }
763    //         _ => {}
764    //     }
765
766    //     let mut body = BytesMut::new();
767    //     body.extend_from_slice(&[
768    //         2, 0, 7, 99, 111, 110, 110, 101, 99, 116, 0, 63, 240, 0, 0, 0, 0, 0, 0, //body
769    //         3, 0, 3, 97, 112, 112, 2, 0, 6, 104, 97, 114, 108, 97, 110, 0, 4, 116, 121, 112, 101,
770    //         2, 0, 10, 110, 111, 110, 112, 114, 105, 118, 97, 116, 101, 0, 8, 102, 108, 97, 115,
771    //         104, 86, 101, 114, 2, 0, 31, 70, 77, 76, 69, 47, 51, 46, 48, 32, 40, 99, 111, 109, 112,
772    //         97, 116, 105, 98, 108, 101, 59, 32, 70, 77, 83, 99, 47, 49, 46, 48, 41, 0, 6, 115, 119,
773    //         102, 85, 114, 108, 2, 0, 28, 114, 116, 109, 112, 58, 47, 47, 108, 111, 99, 97, 108,
774    //         104, 111, 115, 116, 58, 49, 57, 51, 53, 47, 104, 97, 114, 108, 97, 110, 0, 5, 116, 99,
775    //         85, 114, 108, 2, 0, 28, 114, 116, 109, 112, 58, 47, 47, 108, 111, 99, 97, 108, 104,
776    //         111, 115, 116, 58, 49, 57, 51, 53, 47, 104, 97, 114, 108, 97, 110, 0, 0, 9,
777    //     ]);
778
779    //     let expected = ChunkInfo::new(3, 0, 0, 177, 20, 0, body);
780
781    //     assert_eq!(
782    //         rv.unwrap(),
783    //         UnpackResult::ChunkInfo(expected),
784    //         "not correct"
785    //     )
786    // }
787}