rtmp/chunk/unpacketizer.rs
1use {
2 super::{
3 define,
4 errors::{UnpackError, UnpackErrorValue},
5 ChunkBasicHeader, ChunkInfo, ChunkMessageHeader, ExtendTimestampType,
6 },
7 crate::messages::define::msg_type_id,
8 byteorder::{BigEndian, LittleEndian},
9 bytes::{BufMut, BytesMut},
10 bytesio::bytes_reader::BytesReader,
11 std::{cmp::min, collections::HashMap, fmt, vec::Vec},
12};
13
14const PARSE_ERROR_NUMVER: usize = 5;
15
16#[derive(Eq, PartialEq, Debug)]
17pub enum UnpackResult {
18 ChunkBasicHeaderResult(ChunkBasicHeader),
19 ChunkMessageHeaderResult(ChunkMessageHeader),
20 ChunkInfo(ChunkInfo),
21 Chunks(Vec<ChunkInfo>),
22 Success,
23 NotEnoughBytes,
24 Empty,
25}
26
27#[derive(Copy, Clone, Debug)]
28enum ChunkReadState {
29 ReadBasicHeader = 1,
30 ReadMessageHeader = 2,
31 ReadExtendedTimestamp = 3,
32 ReadMessagePayload = 4,
33 Finish = 5,
34}
35
36impl fmt::Display for ChunkReadState {
37 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
38 match self {
39 ChunkReadState::ReadBasicHeader => {
40 write!(f, "ReadBasicHeader",)
41 }
42 ChunkReadState::ReadMessageHeader => {
43 write!(f, "ReadMessageHeader",)
44 }
45 ChunkReadState::ReadExtendedTimestamp => {
46 write!(f, "ReadExtendedTimestamp",)
47 }
48 ChunkReadState::ReadMessagePayload => {
49 write!(f, "ReadMessagePayload",)
50 }
51 ChunkReadState::Finish => {
52 write!(f, "Finish",)
53 }
54 }
55 }
56}
57
58#[derive(Copy, Clone, Debug)]
59enum MessageHeaderReadState {
60 ReadTimeStamp = 1,
61 ReadMsgLength = 2,
62 ReadMsgTypeID = 3,
63 ReadMsgStreamID = 4,
64}
65
66pub struct ChunkUnpacketizer {
67 pub reader: BytesReader,
68
69 //https://doc.rust-lang.org/stable/rust-by-example/scope/lifetime/fn.html
70 //https://zhuanlan.zhihu.com/p/165976086
71 //We use this member to generate a complete message:
72 // - basic_header: the 2 fields will be updated from each chunk.
73 // - message_header: whose fields need to be updated for current chunk
74 // depends on the format id from basic header.
75 // Each field can inherit the value from the previous chunk.
76 // - payload: If the message's payload size is longger than the max chunk size,
77 // the whole payload will be splitted into several chunks.
78 //
79 pub current_chunk_info: ChunkInfo,
80 chunk_message_headers: HashMap<u32, ChunkMessageHeader>,
81 chunk_read_state: ChunkReadState,
82 msg_header_read_state: MessageHeaderReadState,
83 max_chunk_size: usize,
84 chunk_index: u32,
85 pub session_type: u8,
86 parse_error_number: usize,
87}
88
89impl Default for ChunkUnpacketizer {
90 fn default() -> Self {
91 Self::new()
92 }
93}
94
95impl ChunkUnpacketizer {
96 pub fn new() -> Self {
97 Self {
98 reader: BytesReader::new(BytesMut::new()),
99 current_chunk_info: ChunkInfo::default(),
100 chunk_message_headers: HashMap::new(),
101 chunk_read_state: ChunkReadState::ReadBasicHeader,
102 msg_header_read_state: MessageHeaderReadState::ReadTimeStamp,
103 max_chunk_size: define::INIT_CHUNK_SIZE as usize,
104 chunk_index: 0,
105 session_type: 0,
106 parse_error_number: 0,
107 }
108 }
109
110 pub fn extend_data(&mut self, data: &[u8]) {
111 self.reader.extend_from_slice(data);
112
113 log::trace!(
114 "extend_data length: {}: content:{:X?}",
115 self.reader.len(),
116 self.reader
117 .get_remaining_bytes()
118 .split_to(self.reader.len())
119 .to_vec()
120 );
121 }
122
123 pub fn update_max_chunk_size(&mut self, chunk_size: usize) {
124 log::trace!("update max chunk size: {}", chunk_size);
125 self.max_chunk_size = chunk_size;
126 }
127
128 pub fn read_chunks(&mut self) -> Result<UnpackResult, UnpackError> {
129 // log::trace!(
130 // "read chunks, reader remaining data: {}",
131 // self.reader.get_remaining_bytes()
132 // );
133
134 let mut chunks: Vec<ChunkInfo> = Vec::new();
135
136 loop {
137 match self.read_chunk() {
138 Ok(chunk) => {
139 match chunk {
140 UnpackResult::ChunkInfo(chunk_info) => {
141 let msg_type_id = chunk_info.message_header.msg_type_id;
142 chunks.push(chunk_info);
143
144 //if the chunk_size is changed, then break and update chunk_size
145 if msg_type_id == msg_type_id::SET_CHUNK_SIZE {
146 break;
147 }
148 }
149 _ => continue,
150 }
151 }
152 Err(err) => {
153 if let UnpackErrorValue::CannotParse = err.value {
154 return Err(err);
155 }
156 break;
157 }
158 }
159 }
160
161 if !chunks.is_empty() {
162 Ok(UnpackResult::Chunks(chunks))
163 } else {
164 Err(UnpackError {
165 value: UnpackErrorValue::EmptyChunks,
166 })
167 }
168 }
169
170 /******************************************************************************
171 * 5.3.1 Chunk Format
172 * Each chunk consists of a header and data. The header itself has three parts:
173 * +--------------+----------------+--------------------+--------------+
174 * | Basic Header | Message Header | Extended Timestamp | Chunk Data |
175 * +--------------+----------------+--------------------+--------------+
176 * |<------------------- Chunk Header ----------------->|
177 ******************************************************************************/
178 pub fn read_chunk(&mut self) -> Result<UnpackResult, UnpackError> {
179 let mut result: UnpackResult = UnpackResult::Empty;
180
181 self.chunk_index += 1;
182
183 loop {
184 result = match self.chunk_read_state {
185 ChunkReadState::ReadBasicHeader => self.read_basic_header()?,
186 ChunkReadState::ReadMessageHeader => self.read_message_header()?,
187 ChunkReadState::ReadExtendedTimestamp => self.read_extended_timestamp()?,
188 ChunkReadState::ReadMessagePayload => self.read_message_payload()?,
189 ChunkReadState::Finish => {
190 self.chunk_read_state = ChunkReadState::ReadBasicHeader;
191 break;
192 }
193 };
194 }
195
196 Ok(result)
197
198 // Ok(UnpackResult::Success)
199 }
200
201 pub fn print_current_basic_header(&mut self) {
202 log::trace!(
203 "print_current_basic_header, csid: {},format id: {}",
204 self.current_chunk_info.basic_header.chunk_stream_id,
205 self.current_chunk_info.basic_header.format
206 );
207 }
208
209 /******************************************************************
210 * 5.3.1.1. Chunk Basic Header
211 * The Chunk Basic Header encodes the chunk stream ID and the chunk
212 * type(represented by fmt field in the figure below). Chunk type
213 * determines the format of the encoded message header. Chunk Basic
214 * Header field may be 1, 2, or 3 bytes, depending on the chunk stream
215 * ID.
216 *
217 * The bits 0-5 (least significant) in the chunk basic header represent
218 * the chunk stream ID.
219 *
220 * Chunk stream IDs 2-63 can be encoded in the 1-byte version of this
221 * field.
222 * 0 1 2 3 4 5 6 7
223 * +-+-+-+-+-+-+-+-+
224 * |fmt| cs id |
225 * +-+-+-+-+-+-+-+-+
226 * Figure 6 Chunk basic header 1
227 *
228 * Chunk stream IDs 64-319 can be encoded in the 2-byte version of this
229 * field. ID is computed as (the second byte + 64).
230 * 0 1
231 * 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5
232 * +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
233 * |fmt| 0 | cs id - 64 |
234 * +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
235 * Figure 7 Chunk basic header 2
236 *
237 * Chunk stream IDs 64-65599 can be encoded in the 3-byte version of
238 * this field. ID is computed as ((the third byte)*256 + the second byte
239 * + 64).
240 * 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3
241 * +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
242 * |fmt| 1 | cs id - 64 |
243 * +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
244 * Figure 8 Chunk basic header 3
245 *
246 * cs id: 6 bits
247 * fmt: 2 bits
248 * cs id - 64: 8 or 16 bits
249 *
250 * Chunk stream IDs with values 64-319 could be represented by both 2-
251 * byte version and 3-byte version of this field.
252 ***********************************************************************/
253
254 pub fn read_basic_header(&mut self) -> Result<UnpackResult, UnpackError> {
255 let byte = self.reader.read_u8()?;
256
257 let format_id = (byte >> 6) & 0b00000011;
258 let mut csid = (byte & 0b00111111) as u32;
259
260 match csid {
261 0 => {
262 if self.reader.is_empty() {
263 return Ok(UnpackResult::NotEnoughBytes);
264 }
265 csid = 64;
266 csid += self.reader.read_u8()? as u32;
267 }
268 1 => {
269 if self.reader.is_empty() {
270 return Ok(UnpackResult::NotEnoughBytes);
271 }
272 csid = 64;
273 csid += self.reader.read_u8()? as u32;
274 csid += self.reader.read_u8()? as u32 * 256;
275 }
276 _ => {}
277 }
278
279 //todo
280 //Only when the csid is changed, we restore the chunk message header
281 //One AV message may be splitted into serval chunks, the csid
282 //will be updated when one av message's chunks are completely
283 //sent/received??
284 if csid != self.current_chunk_info.basic_header.chunk_stream_id {
285 log::trace!(
286 "read_basic_header, chunk stream id update, new: {}, old:{}, byte: {}",
287 csid,
288 self.current_chunk_info.basic_header.chunk_stream_id,
289 byte
290 );
291 //If the chunk stream id is changed, then we should
292 //restore the cached chunk message header used for
293 //getting the correct message header fields.
294 match self.chunk_message_headers.get_mut(&csid) {
295 Some(header) => {
296 self.current_chunk_info.message_header = header.clone();
297 self.print_current_basic_header();
298 }
299 None => {
300 //The format id of the first chunk of a new chunk stream id must be zero.
301 //assert_eq!(format_id, 0);
302 if format_id != 0 {
303 log::warn!(
304 "The chunk stream id: {}'s first chunk format is {}.",
305 csid,
306 format_id
307 );
308
309 if self.parse_error_number > PARSE_ERROR_NUMVER {
310 return Err(UnpackError {
311 value: UnpackErrorValue::CannotParse,
312 });
313 }
314 self.parse_error_number += 1;
315 } else {
316 //reset
317 self.parse_error_number = 0;
318 }
319 }
320 }
321 }
322
323 if format_id == 0 {
324 self.current_message_header().timestamp_delta = 0;
325 }
326 // each chunk will read and update the csid and format id
327 self.current_chunk_info.basic_header.chunk_stream_id = csid;
328 self.current_chunk_info.basic_header.format = format_id;
329 self.print_current_basic_header();
330
331 self.chunk_read_state = ChunkReadState::ReadMessageHeader;
332
333 Ok(UnpackResult::ChunkBasicHeaderResult(ChunkBasicHeader::new(
334 format_id, csid,
335 )))
336 }
337
338 fn current_message_header(&mut self) -> &mut ChunkMessageHeader {
339 &mut self.current_chunk_info.message_header
340 }
341
342 fn print_current_message_header(&self, state: ChunkReadState) {
343 log::trace!(
344 "print_current_basic_header state {}, timestamp:{}, timestamp delta:{}, msg length: {},msg type id: {}, msg stream id:{}",
345 state,
346 self.current_chunk_info.message_header.timestamp,
347 self.current_chunk_info.message_header.timestamp_delta,
348 self.current_chunk_info.message_header.msg_length,
349 self.current_chunk_info.message_header.msg_type_id,
350 self.current_chunk_info.message_header.msg_streamd_id
351 );
352 }
353
354 pub fn read_message_header(&mut self) -> Result<UnpackResult, UnpackError> {
355 log::trace!(
356 "read_message_header, data left in buffer: {}",
357 self.reader.len(),
358 );
359
360 //Reset is_extended_timestamp for type 0 ,1 ,2 , for type 3 ,this field will
361 //inherited from the most recent chunk 0, 1, or 2.
362 //(This field is present in Type 3 chunks when the most recent Type 0,
363 //1, or 2 chunk for the same chunk stream ID indicated the presence of
364 //an extended timestamp field. 5.3.1.3)
365 if self.current_chunk_info.basic_header.format != 3 {
366 self.current_message_header().extended_timestamp_type = ExtendTimestampType::NONE;
367 }
368
369 match self.current_chunk_info.basic_header.format {
370 /*****************************************************************/
371 /* 5.3.1.2.1. Type 0 */
372 /*****************************************************************
373 0 1 2 3
374 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
375 +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
376 | timestamp(3bytes) |message length |
377 +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
378 | message length (cont)(3bytes) |message type id| msg stream id |
379 +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
380 | message stream id (cont) (4bytes) |
381 +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
382 *****************************************************************/
383 0 => {
384 loop {
385 match self.msg_header_read_state {
386 MessageHeaderReadState::ReadTimeStamp => {
387 self.current_message_header().timestamp =
388 self.reader.read_u24::<BigEndian>()?;
389 self.msg_header_read_state = MessageHeaderReadState::ReadMsgLength;
390 }
391 MessageHeaderReadState::ReadMsgLength => {
392 self.current_message_header().msg_length =
393 self.reader.read_u24::<BigEndian>()?;
394
395 log::trace!(
396 "read_message_header format 0, msg_length: {}",
397 self.current_message_header().msg_length,
398 );
399 self.msg_header_read_state = MessageHeaderReadState::ReadMsgTypeID;
400 }
401 MessageHeaderReadState::ReadMsgTypeID => {
402 self.current_message_header().msg_type_id = self.reader.read_u8()?;
403
404 log::trace!(
405 "read_message_header format 0, msg_type_id: {}",
406 self.current_message_header().msg_type_id
407 );
408 self.msg_header_read_state = MessageHeaderReadState::ReadMsgStreamID;
409 }
410 MessageHeaderReadState::ReadMsgStreamID => {
411 self.current_message_header().msg_streamd_id =
412 self.reader.read_u32::<LittleEndian>()?;
413 self.msg_header_read_state = MessageHeaderReadState::ReadTimeStamp;
414 break;
415 }
416 }
417 }
418
419 if self.current_message_header().timestamp >= 0xFFFFFF {
420 self.current_message_header().extended_timestamp_type =
421 ExtendTimestampType::FORMAT0;
422 }
423 }
424 /*****************************************************************/
425 /* 5.3.1.2.2. Type 1 */
426 /*****************************************************************
427 0 1 2 3
428 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
429 +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
430 | timestamp(3bytes) |message length |
431 +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
432 | message length (cont)(3bytes) |message type id|
433 +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
434 *****************************************************************/
435 1 => {
436 loop {
437 match self.msg_header_read_state {
438 MessageHeaderReadState::ReadTimeStamp => {
439 self.current_message_header().timestamp_delta =
440 self.reader.read_u24::<BigEndian>()?;
441 self.msg_header_read_state = MessageHeaderReadState::ReadMsgLength;
442 }
443 MessageHeaderReadState::ReadMsgLength => {
444 self.current_message_header().msg_length =
445 self.reader.read_u24::<BigEndian>()?;
446
447 log::trace!(
448 "read_message_header format 1, msg_length: {}",
449 self.current_message_header().msg_length
450 );
451 self.msg_header_read_state = MessageHeaderReadState::ReadMsgTypeID;
452 }
453 MessageHeaderReadState::ReadMsgTypeID => {
454 self.current_message_header().msg_type_id = self.reader.read_u8()?;
455
456 log::trace!(
457 "read_message_header format 1, msg_type_id: {}",
458 self.current_message_header().msg_type_id
459 );
460 self.msg_header_read_state = MessageHeaderReadState::ReadTimeStamp;
461 break;
462 }
463 _ => {
464 log::error!("error happend when read chunk message header");
465 break;
466 }
467 }
468 }
469
470 if self.current_message_header().timestamp_delta >= 0xFFFFFF {
471 self.current_message_header().extended_timestamp_type =
472 ExtendTimestampType::FORMAT12;
473 }
474 }
475 /************************************************/
476 /* 5.3.1.2.3. Type 2 */
477 /************************************************
478 0 1 2
479 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3
480 +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
481 | timestamp(3bytes) |
482 +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
483 ***************************************************/
484 2 => {
485 log::trace!(
486 "read_message_header format 2, msg_type_id: {}",
487 self.current_message_header().msg_type_id
488 );
489 self.current_message_header().timestamp_delta =
490 self.reader.read_u24::<BigEndian>()?;
491
492 if self.current_message_header().timestamp_delta >= 0xFFFFFF {
493 self.current_message_header().extended_timestamp_type =
494 ExtendTimestampType::FORMAT12;
495 }
496 }
497
498 _ => {}
499 }
500
501 self.chunk_read_state = ChunkReadState::ReadExtendedTimestamp;
502 self.print_current_message_header(ChunkReadState::ReadMessageHeader);
503
504 Ok(UnpackResult::Success)
505 }
506
507 pub fn read_extended_timestamp(&mut self) -> Result<UnpackResult, UnpackError> {
508 //The extended timestamp field is present in Type 3 chunks when the most recent Type 0,
509 //1, or 2 chunk for the same chunk stream ID indicated the presence of
510 //an extended timestamp field.
511 match self.current_message_header().extended_timestamp_type {
512 //the current fortmat type can be 0 or 3
513 ExtendTimestampType::FORMAT0 => {
514 self.current_message_header().timestamp = self.reader.read_u32::<BigEndian>()?;
515 }
516 //the current fortmat type can be 1,2 or 3
517 ExtendTimestampType::FORMAT12 => {
518 self.current_message_header().timestamp_delta =
519 self.reader.read_u32::<BigEndian>()?;
520 }
521 ExtendTimestampType::NONE => {}
522 }
523
524 //compute the abs timestamp
525 let cur_format_id = self.current_chunk_info.basic_header.format;
526 if cur_format_id == 1
527 || cur_format_id == 2
528 || (cur_format_id == 3 && self.current_chunk_info.payload.is_empty())
529 {
530 let timestamp = self.current_message_header().timestamp;
531 let timestamp_delta = self.current_message_header().timestamp_delta;
532
533 let (cur_abs_timestamp, is_overflow) = timestamp.overflowing_add(timestamp_delta);
534 if is_overflow {
535 log::warn!(
536 "The current timestamp is overflow, current basic header: {:?}, current message header: {:?}, payload len: {}, abs timestamp: {}",
537 self.current_chunk_info.basic_header,
538 self.current_chunk_info.message_header,
539 self.current_chunk_info.payload.len(),
540 cur_abs_timestamp
541 );
542 }
543 self.current_message_header().timestamp = cur_abs_timestamp;
544 }
545
546 self.chunk_read_state = ChunkReadState::ReadMessagePayload;
547 self.print_current_message_header(ChunkReadState::ReadExtendedTimestamp);
548
549 Ok(UnpackResult::Success)
550 }
551
552 pub fn read_message_payload(&mut self) -> Result<UnpackResult, UnpackError> {
553 let whole_msg_length = self.current_message_header().msg_length as usize;
554 let remaining_bytes = whole_msg_length - self.current_chunk_info.payload.len();
555
556 log::trace!(
557 "read_message_payload whole msg length: {} and remaining bytes need to be read: {}",
558 whole_msg_length,
559 remaining_bytes
560 );
561
562 let mut need_read_length = remaining_bytes;
563 if whole_msg_length > self.max_chunk_size {
564 need_read_length = min(remaining_bytes, self.max_chunk_size);
565 }
566
567 let remaining_mut = self.current_chunk_info.payload.remaining_mut();
568 if need_read_length > remaining_mut {
569 let additional = need_read_length - remaining_mut;
570 self.current_chunk_info.payload.reserve(additional);
571 }
572
573 log::trace!(
574 "read_message_payload buffer len:{}, need_read_length: {}",
575 self.reader.len(),
576 need_read_length
577 );
578
579 let payload_data = self.reader.read_bytes(need_read_length)?;
580 self.current_chunk_info
581 .payload
582 .extend_from_slice(&payload_data[..]);
583
584 log::trace!(
585 "read_message_payload current msg payload len:{}",
586 self.current_chunk_info.payload.len()
587 );
588
589 if self.current_chunk_info.payload.len() == whole_msg_length {
590 self.chunk_read_state = ChunkReadState::Finish;
591 //get the complete chunk and clear the current chunk payload
592 let chunk_info = self.current_chunk_info.clone();
593 self.current_chunk_info.payload.clear();
594
595 let csid = self.current_chunk_info.basic_header.chunk_stream_id;
596 self.chunk_message_headers
597 .insert(csid, self.current_chunk_info.message_header.clone());
598
599 return Ok(UnpackResult::ChunkInfo(chunk_info));
600 }
601
602 self.chunk_read_state = ChunkReadState::ReadBasicHeader;
603
604 Ok(UnpackResult::Success)
605 }
606}
607
608#[cfg(test)]
609mod tests {
610
611 use super::ChunkInfo;
612 use super::ChunkUnpacketizer;
613 use super::UnpackResult;
614 use bytes::BytesMut;
615
616 #[test]
617 fn test_set_chunk_size() {
618 let mut unpacker = ChunkUnpacketizer::new();
619
620 let data: [u8; 16] = [
621 //
622 2, //|format+csid|
623 00, 00, 00, //timestamp
624 00, 00, 4, //msg_length
625 1, //msg_type_id
626 00, 00, 00, 00, //msg_stream_id
627 00, 00, 10, 00, //body
628 ];
629
630 unpacker.extend_data(&data[..]);
631
632 let rv = unpacker.read_chunk();
633
634 let mut body = BytesMut::new();
635 body.extend_from_slice(&[00, 00, 10, 00]);
636
637 let expected = ChunkInfo::new(2, 0, 0, 4, 1, 0, body);
638
639 println!("{:?}, {:?}", expected.basic_header, expected.message_header);
640
641 assert_eq!(
642 rv.unwrap(),
643 UnpackResult::ChunkInfo(expected),
644 "not correct"
645 )
646 }
647
648 #[test]
649 fn test_overflow_add() {
650 let aa: u32 = u32::MAX;
651 println!("{}", aa);
652
653 let (_a, _b) = aa.overflowing_add(5);
654
655 let b = aa.wrapping_add(5);
656
657 println!("{}", b);
658 }
659
660 use std::collections::VecDeque;
661
662 #[test]
663 fn test_unpacketizer2() {
664 let mut queue = VecDeque::new();
665 queue.push_back(2);
666 queue.push_back(3);
667 queue.push_back(4);
668
669 for data in queue.iter() {
670 println!("{}", data);
671 }
672 }
673
674 // #[test]
675 // fn test_window_acknowlage_size_set_peer_bandwidth() {
676 // let mut unpacker = ChunkUnpacketizer::new();
677
678 // let data: [u8; 33] = [
679 // 0x02, 0x00, 0x00, 0x00, 0x00, 0x00, 0x04, 0x05, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
680 // 0x10, 0x00, 0x02, 0x00, 0x00, 0x00, 0x00, 0x00, 0x05, 0x06, 0x00, 0x00, 0x00, 0x00,
681 // 0x00, 0x00, 0x10, 0x00, 0x02,
682 // ];
683
684 // unpacker.extend_data(&data[..]);
685
686 // let rv = unpacker.read_chunk();
687
688 // let rv2 = unpacker.read_chunk();
689
690 // let mut body = BytesMut::new();
691 // body.extend_from_slice(&[00, 00, 10, 00]);
692
693 // let expected = ChunkInfo::new(2, 0, 0, 4, 1, 0, body);
694
695 // assert_eq!(
696 // rv.unwrap(),
697 // UnpackResult::ChunkInfo(expected),
698 // "not correct"
699 // )
700 // }
701
702 // #[test]
703 // fn test_on_connect() {
704 // // 0000 03 00 00 00 00 00 b1 14 00 00 00 00 02 00 07 63 ...............c
705 // // 0010 6f 6e 6e 65 63 74 00 3f f0 00 00 00 00 00 00 03 onnect.?........
706 // // 0020 00 03 61 70 70 02 00 06 68 61 72 6c 61 6e 00 04 ..app...harlan..
707 // // 0030 74 79 70 65 02 00 0a 6e 6f 6e 70 72 69 76 61 74 type...nonprivat
708 // // 0040 65 00 08 66 6c 61 73 68 56 65 72 02 00 1f 46 4d e..flashVer...FM
709 // // 0050 4c 45 2f 33 2e 30 20 28 63 6f 6d 70 61 74 69 62 LE/3.0 (compatib
710 // // 0060 6c 65 3b 20 46 4d 53 63 2f 31 2e 30 29 00 06 73 le; FMSc/1.0)..s
711 // // 0070 77 66 55 72 6c 02 00 1c 72 74 6d 70 3a 2f 2f 6c wfUrl...rtmp://l
712 // // 0080 6f 63 61 6c 68 6f 73 74 3a 31 39 33 35 2f 68 61 ocalhost:1935/ha
713 // // 0090 72 6c 61 6e 00 05 74 63 55 72 6c 02 00 1c 72 74 rlan..tcUrl...rt
714 // // 00a0 6d 70 3a 2f 2f 6c 6f 63 61 6c 68 6f 73 74 3a 31 mp://localhost:1
715 // // 00b0 39 33 35 2f 68 61 72 6c 61 6e 00 00 09 935/harlan...
716 // // let data: [u8; 189] = [
717 // // 3, //|format+csid|
718 // // 0, 0, 0, //timestamp
719 // // 0, 0, 177, //msg_length
720 // // 20, //msg_type_id 0x14
721 // // 0, 0, 0, 0, //msg_stream_id
722 // // 2, 0, 7, 99, 111, 110, 110, 101, 99, 116, 0, 63, 240, 0, 0, 0, 0, 0, 0, //body
723 // // 3, 0, 3, 97, 112, 112, 2, 0, 6, 104, 97, 114, 108, 97, 110, 0, 4, 116, 121, 112, 101,
724 // // 2, 0, 10, 110, 111, 110, 112, 114, 105, 118, 97, 116, 101, 0, 8, 102, 108, 97, 115,
725 // // 104, 86, 101, 114, 2, 0, 31, 70, 77, 76, 69, 47, 51, 46, 48, 32, 40, 99, 111, 109, 112,
726 // // 97, 116, 105, 98, 108, 101, 59, 32, 70, 77, 83, 99, 47, 49, 46, 48, 41, 0, 6, 115, 119,
727 // // 102, 85, 114, 108, 2, 0, 28, 114, 116, 109, 112, 58, 47, 47, 108, 111, 99, 97, 108,
728 // // 104, 111, 115, 116, 58, 49, 57, 51, 53, 47, 104, 97, 114, 108, 97, 110, 0, 5, 116, 99,
729 // // 85, 114, 108, 2, 0, 28, 114, 116, 109, 112, 58, 47, 47, 108, 111, 99, 97, 108, 104,
730 // // 111, 115, 116, 58, 49, 57, 51, 53, 47, 104, 97, 114, 108, 97, 110, 0, 0, 9,
731 // // ];
732
733 // let data: [u8; 189] = [
734 // 0x03,
735 // 0x00, 0x00, 0x00,
736 // 0x00, 0x00, 0xb1,
737 // 0x14,
738 // 0x00, 0x00, 0x00, 0x00,
739 // 0x02, 0x00,
740 // 0x07, 0x63, 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x00, 0x3f, 0xf0, 0x00, 0x00, 0x00,
741 // 0x00, 0x00, 0x00, 0x03, 0x00, 0x03, 0x61, 0x70, 0x70, 0x02, 0x00, 0x06, 0x68, 0x61,
742 // 0x72, 0x6c, 0x61, 0x6e, 0x00, 0x04, 0x74, 0x79, 0x70, 0x65, 0x02, 0x00, 0x0a, 0x6e,
743 // 0x6f, 0x6e, 0x70, 0x72, 0x69, 0x76, 0x61, 0x74, 0x65, 0x00, 0x08, 0x66, 0x6c, 0x61,
744 // 0x73, 0x68, 0x56, 0x65, 0x72, 0x02, 0x00, 0x1f, 0x46, 0x4d, 0x4c, 0x45, 0x2f, 0x33,
745 // 0x2e, 0x30, 0x20, 0x28, 0x63, 0x6f, 0x6d, 0x70, 0x61, 0x74, 0x69, 0x62, 0x6c, 0x65,
746 // 0x3b, 0x20, 0x46, 0x4d, 0x53, 0x63, 0x2f, 0x31, 0x2e, 0x30, 0x29, 0x00, 0x06, 0x73,
747 // 0x77, 0x66, 0x55, 0x72, 0x6c, 0x02, 0x00, 0x1c, 0x72, 0x74, 0x6d, 0x70, 0x3a, 0x2f,
748 // 0x2f, 0x6c, 0x6f, 0x63, 0x61, 0x6c, 0x68, 0x6f, 0x73, 0x74, 0x3a, 0x31, 0x39, 0x33,
749 // 0x35, 0x2f, 0x68, 0x61, 0x72, 0x6c, 0x61, 0x6e, 0x00, 0x05, 0x74, 0x63, 0x55, 0x72,
750 // 0x6c, 0x02, 0x00, 0x1c, 0x72, 0x74, 0x6d, 0x70, 0x3a, 0x2f, 0x2f, 0x6c, 0x6f, 0x63,
751 // 0x61, 0x6c, 0x68, 0x6f, 0x73, 0x74, 0x3a, 0x31, 0x39, 0x33, 0x35, 0x2f, 0x68, 0x61,
752 // 0x72, 0x6c, 0x61, 0x6e, 0x00, 0x00, 0x09,
753 // ];
754
755 // let mut unpacker = ChunkUnpacketizer::new();
756 // unpacker.extend_data(&data[..]);
757
758 // let rv = unpacker.read_chunk();
759 // match &rv {
760 // Err(err) => {
761 // println!("==={}===", err);
762 // }
763 // _ => {}
764 // }
765
766 // let mut body = BytesMut::new();
767 // body.extend_from_slice(&[
768 // 2, 0, 7, 99, 111, 110, 110, 101, 99, 116, 0, 63, 240, 0, 0, 0, 0, 0, 0, //body
769 // 3, 0, 3, 97, 112, 112, 2, 0, 6, 104, 97, 114, 108, 97, 110, 0, 4, 116, 121, 112, 101,
770 // 2, 0, 10, 110, 111, 110, 112, 114, 105, 118, 97, 116, 101, 0, 8, 102, 108, 97, 115,
771 // 104, 86, 101, 114, 2, 0, 31, 70, 77, 76, 69, 47, 51, 46, 48, 32, 40, 99, 111, 109, 112,
772 // 97, 116, 105, 98, 108, 101, 59, 32, 70, 77, 83, 99, 47, 49, 46, 48, 41, 0, 6, 115, 119,
773 // 102, 85, 114, 108, 2, 0, 28, 114, 116, 109, 112, 58, 47, 47, 108, 111, 99, 97, 108,
774 // 104, 111, 115, 116, 58, 49, 57, 51, 53, 47, 104, 97, 114, 108, 97, 110, 0, 5, 116, 99,
775 // 85, 114, 108, 2, 0, 28, 114, 116, 109, 112, 58, 47, 47, 108, 111, 99, 97, 108, 104,
776 // 111, 115, 116, 58, 49, 57, 51, 53, 47, 104, 97, 114, 108, 97, 110, 0, 0, 9,
777 // ]);
778
779 // let expected = ChunkInfo::new(3, 0, 0, 177, 20, 0, body);
780
781 // assert_eq!(
782 // rv.unwrap(),
783 // UnpackResult::ChunkInfo(expected),
784 // "not correct"
785 // )
786 // }
787}