rtmp_rs/protocol/
chunk.rs

1//! RTMP chunk stream codec
2//!
3//! RTMP messages are split into chunks for multiplexing. Each chunk has a header
4//! that identifies the chunk stream and message being sent.
5//!
6//! ```text
7//! Chunk Format:
8//! +-------------+----------------+-------------------+
9//! | Basic Header| Message Header | Chunk Data        |
10//! | (1-3 bytes) | (0,3,7,11 bytes)| (variable)       |
11//! +-------------+----------------+-------------------+
12//!
13//! Basic Header formats:
14//! - 1 byte:  fmt(2) + csid(6)        for csid 2-63
15//! - 2 bytes: fmt(2) + 0 + csid(8)    for csid 64-319
16//! - 3 bytes: fmt(2) + 1 + csid(16)   for csid 64-65599
17//!
18//! Message Header formats (based on fmt):
19//! - Type 0 (11 bytes): timestamp(3) + length(3) + type(1) + stream_id(4)
20//! - Type 1 (7 bytes):  timestamp_delta(3) + length(3) + type(1)
21//! - Type 2 (3 bytes):  timestamp_delta(3)
22//! - Type 3 (0 bytes):  (use previous chunk's values)
23//!
24//! Extended timestamp (4 bytes) is appended when timestamp >= 0xFFFFFF
25//! ```
26//!
27//! Reference: RTMP Specification Section 5.3
28
29use bytes::{Buf, BufMut, Bytes, BytesMut};
30use std::collections::HashMap;
31
32use crate::error::{ProtocolError, Result};
33use crate::protocol::constants::*;
34
35/// A complete RTMP message (reassembled from chunks)
36#[derive(Debug, Clone)]
37pub struct RtmpChunk {
38    /// Chunk stream ID (for multiplexing)
39    pub csid: u32,
40    /// Message timestamp (milliseconds)
41    pub timestamp: u32,
42    /// Message type ID
43    pub message_type: u8,
44    /// Message stream ID
45    pub stream_id: u32,
46    /// Message payload
47    pub payload: Bytes,
48}
49
50/// Per-chunk-stream state for reassembly
51#[derive(Debug, Clone, Default)]
52struct ChunkStreamState {
53    /// Last timestamp (absolute)
54    timestamp: u32,
55    /// Last timestamp delta
56    timestamp_delta: u32,
57    /// Last message length
58    message_length: u32,
59    /// Last message type
60    message_type: u8,
61    /// Last message stream ID
62    stream_id: u32,
63    /// Whether we've received extended timestamp
64    has_extended_timestamp: bool,
65    /// Buffer for partial message reassembly
66    partial_message: BytesMut,
67    /// Expected total length of current message
68    expected_length: u32,
69}
70
71/// Chunk stream decoder
72///
73/// Handles chunk demultiplexing and message reassembly.
74pub struct ChunkDecoder {
75    /// Maximum incoming chunk size
76    chunk_size: u32,
77    /// Per-chunk-stream state
78    streams: HashMap<u32, ChunkStreamState>,
79    /// Maximum message size (sanity limit)
80    max_message_size: u32,
81}
82
83impl ChunkDecoder {
84    /// Create a new decoder with default chunk size
85    pub fn new() -> Self {
86        Self {
87            chunk_size: DEFAULT_CHUNK_SIZE,
88            streams: HashMap::new(),
89            max_message_size: MAX_MESSAGE_SIZE,
90        }
91    }
92
93    /// Set the chunk size (called when receiving SetChunkSize message)
94    pub fn set_chunk_size(&mut self, size: u32) {
95        self.chunk_size = size.min(MAX_CHUNK_SIZE);
96    }
97
98    /// Get current chunk size
99    pub fn chunk_size(&self) -> u32 {
100        self.chunk_size
101    }
102
103    /// Try to decode a complete message from the buffer
104    ///
105    /// Returns Ok(Some(chunk)) if a complete message was decoded,
106    /// Ok(None) if more data is needed, or Err on protocol error.
107    pub fn decode(&mut self, buf: &mut BytesMut) -> Result<Option<RtmpChunk>> {
108        if buf.is_empty() {
109            return Ok(None);
110        }
111
112        // Parse basic header to get csid and fmt (peek only, don't advance)
113        let (fmt, csid, header_len) = match self.parse_basic_header(buf)? {
114            Some(v) => v,
115            None => return Ok(None),
116        };
117
118        tracing::trace!(
119            fmt = fmt,
120            csid = csid,
121            header_len = header_len,
122            first_byte = format!("0x{:02x}", buf[0]),
123            "Parsing chunk"
124        );
125
126        // Get or create chunk stream state
127        let state = self.streams.entry(csid).or_default();
128
129        // Calculate message header size based on fmt
130        let msg_header_size = match fmt {
131            0 => 11,
132            1 => 7,
133            2 => 3,
134            3 => 0,
135            _ => return Err(ProtocolError::InvalidChunkHeader.into()),
136        };
137
138        // Check if we have enough data for headers
139        let needs_extended = if fmt == 3 {
140            state.has_extended_timestamp
141        } else if buf.len() > header_len + 2 {
142            // Peek at timestamp field to check for extended timestamp
143            let ts_bytes = &buf[header_len..header_len + 3];
144            let ts =
145                ((ts_bytes[0] as u32) << 16) | ((ts_bytes[1] as u32) << 8) | (ts_bytes[2] as u32);
146            ts >= EXTENDED_TIMESTAMP_THRESHOLD
147        } else {
148            false
149        };
150
151        let extended_size = if needs_extended { 4 } else { 0 };
152        let total_header_size = header_len + msg_header_size + extended_size;
153
154        if buf.len() < total_header_size {
155            return Ok(None); // Need more header data
156        }
157
158        // PEEK at message header to determine chunk data length BEFORE consuming anything
159        // For fmt 3, we use state values; for others, we peek at the buffer
160        let (_peeked_message_length, peeked_expected_length) = match fmt {
161            0 | 1 => {
162                // Message length is at offset: header_len + 3 (timestamp bytes)
163                let len_offset = header_len + 3;
164                let len_bytes = &buf[len_offset..len_offset + 3];
165                let len = ((len_bytes[0] as u32) << 16)
166                    | ((len_bytes[1] as u32) << 8)
167                    | (len_bytes[2] as u32);
168                (len, len)
169            }
170            2 | 3 => {
171                // Use state for message length
172                let msg_len = state.message_length;
173                let expected = if state.partial_message.is_empty() {
174                    msg_len
175                } else {
176                    state.expected_length
177                };
178                (msg_len, expected)
179            }
180            _ => unreachable!(),
181        };
182
183        // Calculate chunk data length
184        let partial_len = state.partial_message.len() as u32;
185        let remaining = peeked_expected_length.saturating_sub(partial_len);
186        let chunk_data_len = remaining.min(self.chunk_size) as usize;
187
188        // Now check if we have enough data for header + payload
189        let total_chunk_size = total_header_size + chunk_data_len;
190        if buf.len() < total_chunk_size {
191            return Ok(None); // Need more data - don't consume anything
192        }
193
194        // NOW we can safely consume the data since we have enough for the entire chunk
195        buf.advance(header_len);
196
197        let (timestamp_field, message_length, message_type, stream_id) = match fmt {
198            0 => {
199                // Full header
200                let ts = buf.get_uint(3) as u32;
201                let len = buf.get_uint(3) as u32;
202                let typ = buf.get_u8();
203                let sid = buf.get_u32_le(); // Stream ID is little-endian!
204                (ts, len, typ, sid)
205            }
206            1 => {
207                // No stream ID
208                let ts = buf.get_uint(3) as u32;
209                let len = buf.get_uint(3) as u32;
210                let typ = buf.get_u8();
211                (ts, len, typ, state.stream_id)
212            }
213            2 => {
214                // Timestamp delta only
215                let ts = buf.get_uint(3) as u32;
216                (
217                    ts,
218                    state.message_length,
219                    state.message_type,
220                    state.stream_id,
221                )
222            }
223            3 => {
224                // No header, use previous values
225                (
226                    state.timestamp_delta,
227                    state.message_length,
228                    state.message_type,
229                    state.stream_id,
230                )
231            }
232            _ => unreachable!(),
233        };
234
235        // Handle extended timestamp (we already checked we have enough bytes)
236        let timestamp = if timestamp_field >= EXTENDED_TIMESTAMP_THRESHOLD
237            || (fmt == 3 && state.has_extended_timestamp)
238        {
239            state.has_extended_timestamp = true;
240            buf.get_u32()
241        } else {
242            state.has_extended_timestamp = false;
243            timestamp_field
244        };
245
246        // Update state
247        let absolute_timestamp = if fmt == 0 {
248            timestamp
249        } else if fmt == 3 && !state.partial_message.is_empty() {
250            // continuation chunk, timestamp stays the same
251            state.timestamp
252        } else {
253            state.timestamp.wrapping_add(timestamp)
254        };
255
256        state.timestamp_delta = timestamp;
257        state.message_length = message_length;
258        state.message_type = message_type;
259        state.stream_id = stream_id;
260        state.timestamp = absolute_timestamp;
261
262        // Validate message size
263        if message_length > self.max_message_size {
264            return Err(ProtocolError::MessageTooLarge {
265                size: message_length,
266                max: self.max_message_size,
267            }
268            .into());
269        }
270
271        // Initialize reassembly buffer if this is a new message
272        if state.partial_message.is_empty() {
273            state.expected_length = message_length;
274            state.partial_message.reserve(message_length as usize);
275        }
276
277        // Read chunk data (we already verified we have enough)
278        state.partial_message.put_slice(&buf[..chunk_data_len]);
279        buf.advance(chunk_data_len);
280
281        // Check if message is complete
282        if state.partial_message.len() as u32 >= state.expected_length {
283            let payload = state.partial_message.split().freeze();
284            state.expected_length = 0;
285
286            Ok(Some(RtmpChunk {
287                csid,
288                timestamp: state.timestamp,
289                message_type: state.message_type,
290                stream_id: state.stream_id,
291                payload,
292            }))
293        } else {
294            Ok(None) // Message not yet complete
295        }
296    }
297
298    /// Parse basic header and return (fmt, csid, header_length)
299    fn parse_basic_header(&self, buf: &[u8]) -> Result<Option<(u8, u32, usize)>> {
300        if buf.is_empty() {
301            return Ok(None);
302        }
303
304        let first = buf[0];
305        let fmt = (first >> 6) & 0x03;
306        let csid_low = first & 0x3F;
307
308        match csid_low {
309            0 => {
310                // 2-byte header: csid = 64 + second byte
311                if buf.len() < 2 {
312                    return Ok(None);
313                }
314                let csid = 64 + buf[1] as u32;
315                Ok(Some((fmt, csid, 2)))
316            }
317            1 => {
318                // 3-byte header: csid = 64 + second + third*256
319                if buf.len() < 3 {
320                    return Ok(None);
321                }
322                let csid = 64 + buf[1] as u32 + (buf[2] as u32) * 256;
323                Ok(Some((fmt, csid, 3)))
324            }
325            _ => {
326                // 1-byte header: csid = 2-63
327                Ok(Some((fmt, csid_low as u32, 1)))
328            }
329        }
330    }
331
332    /// Abort a message on a chunk stream (when receiving Abort message)
333    pub fn abort(&mut self, csid: u32) {
334        if let Some(state) = self.streams.get_mut(&csid) {
335            state.partial_message.clear();
336            state.expected_length = 0;
337        }
338    }
339}
340
341impl Default for ChunkDecoder {
342    fn default() -> Self {
343        Self::new()
344    }
345}
346
347/// Chunk stream encoder
348///
349/// Encodes messages into chunks for transmission.
350pub struct ChunkEncoder {
351    /// Outgoing chunk size
352    chunk_size: u32,
353    /// Per-chunk-stream state for compression
354    streams: HashMap<u32, ChunkStreamState>,
355}
356
357impl ChunkEncoder {
358    /// Create a new encoder with default chunk size
359    pub fn new() -> Self {
360        Self {
361            chunk_size: DEFAULT_CHUNK_SIZE,
362            streams: HashMap::new(),
363        }
364    }
365
366    /// Set the chunk size (call before encoding to use larger chunks)
367    pub fn set_chunk_size(&mut self, size: u32) {
368        self.chunk_size = size.min(MAX_CHUNK_SIZE);
369    }
370
371    /// Get current chunk size
372    pub fn chunk_size(&self) -> u32 {
373        self.chunk_size
374    }
375
376    /// Encode a message into chunks
377    pub fn encode(&mut self, chunk: &RtmpChunk, buf: &mut BytesMut) {
378        let csid = chunk.csid;
379        let chunk_size = self.chunk_size;
380
381        // Get or create state, and compute format based on current state
382        let state = self.streams.entry(csid).or_default();
383
384        // Compute format based on state comparison
385        let fmt = select_format(chunk, state);
386
387        // Determine if we need extended timestamp
388        let needs_extended = chunk.timestamp >= EXTENDED_TIMESTAMP_THRESHOLD;
389        let timestamp_field = if needs_extended {
390            EXTENDED_TIMESTAMP_THRESHOLD
391        } else {
392            chunk.timestamp
393        };
394
395        let timestamp_delta = chunk.timestamp.wrapping_sub(state.timestamp);
396        let delta_field = if needs_extended {
397            EXTENDED_TIMESTAMP_THRESHOLD
398        } else {
399            timestamp_delta
400        };
401
402        let had_extended_timestamp = state.has_extended_timestamp;
403
404        // Update state before encoding
405        state.timestamp = chunk.timestamp;
406        state.timestamp_delta = timestamp_delta;
407        state.message_length = chunk.payload.len() as u32;
408        state.message_type = chunk.message_type;
409        state.stream_id = chunk.stream_id;
410        state.has_extended_timestamp = needs_extended;
411
412        // Encode chunks
413        let mut offset = 0;
414        let payload_len = chunk.payload.len();
415        let mut first_chunk = true;
416
417        while offset < payload_len {
418            let chunk_data_len = (payload_len - offset).min(chunk_size as usize);
419
420            // Write basic header
421            write_basic_header(csid, if first_chunk { fmt } else { 3 }, buf);
422
423            // Write message header based on format
424            if first_chunk {
425                match fmt {
426                    0 => {
427                        // Full header
428                        write_u24(timestamp_field, buf);
429                        write_u24(payload_len as u32, buf);
430                        buf.put_u8(chunk.message_type);
431                        buf.put_u32_le(chunk.stream_id);
432                    }
433                    1 => {
434                        // No stream ID
435                        write_u24(delta_field, buf);
436                        write_u24(payload_len as u32, buf);
437                        buf.put_u8(chunk.message_type);
438                    }
439                    2 => {
440                        // Timestamp delta only
441                        write_u24(delta_field, buf);
442                    }
443                    3 => {
444                        // No header
445                    }
446                    _ => unreachable!(),
447                }
448            }
449
450            // Write extended timestamp if needed
451            if needs_extended && (first_chunk || had_extended_timestamp) {
452                buf.put_u32(chunk.timestamp);
453            }
454
455            // Write chunk data
456            buf.put_slice(&chunk.payload[offset..offset + chunk_data_len]);
457            offset += chunk_data_len;
458            first_chunk = false;
459        }
460    }
461}
462
463/// Select the best header format for compression
464fn select_format(chunk: &RtmpChunk, state: &ChunkStreamState) -> u8 {
465    // First message on this stream must use format 0
466    if state.message_type == 0 && state.stream_id == 0 {
467        return 0;
468    }
469
470    // If stream ID differs, must use format 0
471    if chunk.stream_id != state.stream_id {
472        return 0;
473    }
474
475    // If message type or length differs, use format 1
476    if chunk.message_type != state.message_type
477        || chunk.payload.len() as u32 != state.message_length
478    {
479        return 1;
480    }
481
482    // If timestamp delta matches previous, use format 3
483    let delta = chunk.timestamp.wrapping_sub(state.timestamp);
484    if delta == state.timestamp_delta {
485        return 3;
486    }
487
488    // Otherwise use format 2 (timestamp delta only)
489    2
490}
491
492/// Write basic header
493fn write_basic_header(csid: u32, fmt: u8, buf: &mut BytesMut) {
494    if csid >= 64 + 256 {
495        // 3-byte header
496        buf.put_u8((fmt << 6) | 1);
497        let csid_offset = csid - 64;
498        buf.put_u8((csid_offset & 0xFF) as u8);
499        buf.put_u8(((csid_offset >> 8) & 0xFF) as u8);
500    } else if csid >= 64 {
501        // 2-byte header
502        buf.put_u8((fmt << 6) | 0);
503        buf.put_u8((csid - 64) as u8);
504    } else {
505        // 1-byte header
506        buf.put_u8((fmt << 6) | (csid as u8));
507    }
508}
509
510/// Write 24-bit big-endian value
511fn write_u24(value: u32, buf: &mut BytesMut) {
512    buf.put_u8(((value >> 16) & 0xFF) as u8);
513    buf.put_u8(((value >> 8) & 0xFF) as u8);
514    buf.put_u8((value & 0xFF) as u8);
515}
516
517impl Default for ChunkEncoder {
518    fn default() -> Self {
519        Self::new()
520    }
521}
522
523#[cfg(test)]
524mod tests {
525    use super::*;
526
527    #[test]
528    fn test_basic_header_parsing() {
529        let decoder = ChunkDecoder::new();
530
531        // 1-byte header (csid 2-63)
532        let buf = [0x03]; // fmt=0, csid=3
533        let result = decoder.parse_basic_header(&buf).unwrap().unwrap();
534        assert_eq!(result, (0, 3, 1));
535
536        // 2-byte header (csid 64-319)
537        let buf = [0x00, 0x00]; // fmt=0, csid=64
538        let result = decoder.parse_basic_header(&buf).unwrap().unwrap();
539        assert_eq!(result, (0, 64, 2));
540
541        // 3-byte header (csid 64-65599)
542        let buf = [0x01, 0x00, 0x01]; // fmt=0, csid=64+256
543        let result = decoder.parse_basic_header(&buf).unwrap().unwrap();
544        assert_eq!(result, (0, 320, 3));
545    }
546
547    #[test]
548    fn test_encode_decode_roundtrip() {
549        let original = RtmpChunk {
550            csid: CSID_COMMAND,
551            timestamp: 1000,
552            message_type: MSG_COMMAND_AMF0,
553            stream_id: 0,
554            payload: Bytes::from_static(b"test payload data"),
555        };
556
557        let mut encoder = ChunkEncoder::new();
558        let mut decoder = ChunkDecoder::new();
559
560        let mut encoded = BytesMut::new();
561        encoder.encode(&original, &mut encoded);
562
563        let decoded = decoder.decode(&mut encoded).unwrap().unwrap();
564
565        assert_eq!(decoded.csid, original.csid);
566        assert_eq!(decoded.timestamp, original.timestamp);
567        assert_eq!(decoded.message_type, original.message_type);
568        assert_eq!(decoded.stream_id, original.stream_id);
569        assert_eq!(decoded.payload, original.payload);
570    }
571
572    #[test]
573    fn test_large_message_chunking() {
574        let large_payload = vec![0u8; 500]; // Larger than default chunk size (128)
575
576        let original = RtmpChunk {
577            csid: CSID_VIDEO,
578            timestamp: 0,
579            message_type: MSG_VIDEO,
580            stream_id: 1,
581            payload: Bytes::from(large_payload.clone()),
582        };
583
584        let mut encoder = ChunkEncoder::new();
585        let mut decoder = ChunkDecoder::new();
586
587        let mut encoded = BytesMut::new();
588        encoder.encode(&original, &mut encoded);
589
590        // Should produce multiple chunks (500 bytes / 128 = ~4 chunks)
591        assert!(encoded.len() > 500);
592
593        // Decode all chunks until we get a complete message
594        let decoded = loop {
595            if let Some(chunk) = decoder.decode(&mut encoded).unwrap() {
596                break chunk;
597            }
598        };
599        assert_eq!(decoded.payload.len(), 500);
600    }
601
602    #[test]
603    fn test_chunk_size_configuration() {
604        let mut encoder = ChunkEncoder::new();
605        let mut decoder = ChunkDecoder::new();
606
607        // Check default chunk size
608        assert_eq!(encoder.chunk_size(), DEFAULT_CHUNK_SIZE);
609        assert_eq!(decoder.chunk_size(), DEFAULT_CHUNK_SIZE);
610
611        // Set new chunk size
612        encoder.set_chunk_size(4096);
613        decoder.set_chunk_size(4096);
614
615        assert_eq!(encoder.chunk_size(), 4096);
616        assert_eq!(decoder.chunk_size(), 4096);
617    }
618
619    #[test]
620    fn test_chunk_size_capped_at_max() {
621        let mut encoder = ChunkEncoder::new();
622        let mut decoder = ChunkDecoder::new();
623
624        // Try to set chunk size larger than MAX
625        encoder.set_chunk_size(u32::MAX);
626        decoder.set_chunk_size(u32::MAX);
627
628        assert_eq!(encoder.chunk_size(), MAX_CHUNK_SIZE);
629        assert_eq!(decoder.chunk_size(), MAX_CHUNK_SIZE);
630    }
631
632    #[test]
633    fn test_different_chunk_stream_ids() {
634        let mut encoder = ChunkEncoder::new();
635        let mut decoder = ChunkDecoder::new();
636
637        // Test various CSIDs
638        let csids = [2, 3, 63, 64, 100, 319, 320, 1000];
639
640        for &csid in &csids {
641            let chunk = RtmpChunk {
642                csid,
643                timestamp: 1000,
644                message_type: MSG_COMMAND_AMF0,
645                stream_id: 0,
646                payload: Bytes::from_static(b"test"),
647            };
648
649            let mut encoded = BytesMut::new();
650            encoder.encode(&chunk, &mut encoded);
651
652            let decoded = decoder.decode(&mut encoded).unwrap().unwrap();
653            assert_eq!(decoded.csid, csid);
654        }
655    }
656
657    #[test]
658    fn test_extended_timestamp() {
659        let mut encoder = ChunkEncoder::new();
660        let mut decoder = ChunkDecoder::new();
661
662        // Create a chunk with timestamp >= EXTENDED_TIMESTAMP_THRESHOLD
663        let chunk = RtmpChunk {
664            csid: CSID_COMMAND,
665            timestamp: 0xFFFFFF + 1000, // Requires extended timestamp
666            message_type: MSG_COMMAND_AMF0,
667            stream_id: 0,
668            payload: Bytes::from_static(b"extended timestamp test"),
669        };
670
671        let mut encoded = BytesMut::new();
672        encoder.encode(&chunk, &mut encoded);
673
674        let decoded = decoder.decode(&mut encoded).unwrap().unwrap();
675        assert_eq!(decoded.timestamp, 0xFFFFFF + 1000);
676    }
677
678    #[test]
679    fn test_extended_timestamp_boundary() {
680        let mut encoder = ChunkEncoder::new();
681        let mut decoder = ChunkDecoder::new();
682
683        // Test exactly at the threshold
684        let chunk = RtmpChunk {
685            csid: CSID_COMMAND,
686            timestamp: EXTENDED_TIMESTAMP_THRESHOLD,
687            message_type: MSG_COMMAND_AMF0,
688            stream_id: 0,
689            payload: Bytes::from_static(b"boundary test"),
690        };
691
692        let mut encoded = BytesMut::new();
693        encoder.encode(&chunk, &mut encoded);
694
695        let decoded = decoder.decode(&mut encoded).unwrap().unwrap();
696        assert_eq!(decoded.timestamp, EXTENDED_TIMESTAMP_THRESHOLD);
697    }
698
699    #[test]
700    fn test_zero_timestamp() {
701        let mut encoder = ChunkEncoder::new();
702        let mut decoder = ChunkDecoder::new();
703
704        let chunk = RtmpChunk {
705            csid: CSID_VIDEO,
706            timestamp: 0,
707            message_type: MSG_VIDEO,
708            stream_id: 1,
709            payload: Bytes::from_static(b"zero ts"),
710        };
711
712        let mut encoded = BytesMut::new();
713        encoder.encode(&chunk, &mut encoded);
714
715        let decoded = decoder.decode(&mut encoded).unwrap().unwrap();
716        assert_eq!(decoded.timestamp, 0);
717    }
718
719    #[test]
720    fn test_abort_clears_partial_message() {
721        let mut decoder = ChunkDecoder::new();
722
723        // Simulate receiving part of a large message
724        let large_chunk = RtmpChunk {
725            csid: CSID_VIDEO,
726            timestamp: 0,
727            message_type: MSG_VIDEO,
728            stream_id: 1,
729            payload: Bytes::from(vec![0u8; 500]),
730        };
731
732        let mut encoder = ChunkEncoder::new();
733        let mut encoded = BytesMut::new();
734        encoder.encode(&large_chunk, &mut encoded);
735
736        // Take only the first 200 bytes (partial message)
737        let mut partial = encoded.split_to(200);
738
739        // Try to decode - should return None (incomplete)
740        let result = decoder.decode(&mut partial).unwrap();
741        assert!(result.is_none());
742
743        // Abort the message on this CSID
744        decoder.abort(CSID_VIDEO);
745
746        // Now send a new complete small message on the same CSID
747        let small_chunk = RtmpChunk {
748            csid: CSID_VIDEO,
749            timestamp: 100,
750            message_type: MSG_VIDEO,
751            stream_id: 1,
752            payload: Bytes::from_static(b"new message"),
753        };
754
755        let mut new_encoded = BytesMut::new();
756        encoder.encode(&small_chunk, &mut new_encoded);
757
758        let decoded = decoder.decode(&mut new_encoded).unwrap().unwrap();
759        assert_eq!(decoded.payload.as_ref(), b"new message");
760    }
761
762    #[test]
763    fn test_decode_empty_buffer() {
764        let mut decoder = ChunkDecoder::new();
765        let mut buf = BytesMut::new();
766
767        let result = decoder.decode(&mut buf).unwrap();
768        assert!(result.is_none());
769    }
770
771    #[test]
772    fn test_multiple_messages_same_csid() {
773        let mut encoder = ChunkEncoder::new();
774        let mut decoder = ChunkDecoder::new();
775
776        // First message
777        let chunk1 = RtmpChunk {
778            csid: CSID_COMMAND,
779            timestamp: 0,
780            message_type: MSG_COMMAND_AMF0,
781            stream_id: 0,
782            payload: Bytes::from_static(b"first"),
783        };
784
785        // Second message (same CSID, uses header compression)
786        let chunk2 = RtmpChunk {
787            csid: CSID_COMMAND,
788            timestamp: 100,
789            message_type: MSG_COMMAND_AMF0,
790            stream_id: 0,
791            payload: Bytes::from_static(b"second"),
792        };
793
794        let mut encoded = BytesMut::new();
795        encoder.encode(&chunk1, &mut encoded);
796        encoder.encode(&chunk2, &mut encoded);
797
798        let decoded1 = decoder.decode(&mut encoded).unwrap().unwrap();
799        assert_eq!(decoded1.payload.as_ref(), b"first");
800        assert_eq!(decoded1.timestamp, 0);
801
802        let decoded2 = decoder.decode(&mut encoded).unwrap().unwrap();
803        assert_eq!(decoded2.payload.as_ref(), b"second");
804        assert_eq!(decoded2.timestamp, 100);
805    }
806
807    #[test]
808    fn test_interleaved_chunk_streams() {
809        let mut encoder = ChunkEncoder::new();
810        let mut decoder = ChunkDecoder::new();
811
812        // Interleave chunks from different streams
813        let video_chunk = RtmpChunk {
814            csid: CSID_VIDEO,
815            timestamp: 0,
816            message_type: MSG_VIDEO,
817            stream_id: 1,
818            payload: Bytes::from_static(b"video"),
819        };
820
821        let audio_chunk = RtmpChunk {
822            csid: CSID_AUDIO,
823            timestamp: 0,
824            message_type: MSG_AUDIO,
825            stream_id: 1,
826            payload: Bytes::from_static(b"audio"),
827        };
828
829        let command_chunk = RtmpChunk {
830            csid: CSID_COMMAND,
831            timestamp: 0,
832            message_type: MSG_COMMAND_AMF0,
833            stream_id: 0,
834            payload: Bytes::from_static(b"command"),
835        };
836
837        let mut encoded = BytesMut::new();
838        encoder.encode(&video_chunk, &mut encoded);
839        encoder.encode(&audio_chunk, &mut encoded);
840        encoder.encode(&command_chunk, &mut encoded);
841
842        let decoded_video = decoder.decode(&mut encoded).unwrap().unwrap();
843        assert_eq!(decoded_video.csid, CSID_VIDEO);
844        assert_eq!(decoded_video.message_type, MSG_VIDEO);
845
846        let decoded_audio = decoder.decode(&mut encoded).unwrap().unwrap();
847        assert_eq!(decoded_audio.csid, CSID_AUDIO);
848        assert_eq!(decoded_audio.message_type, MSG_AUDIO);
849
850        let decoded_cmd = decoder.decode(&mut encoded).unwrap().unwrap();
851        assert_eq!(decoded_cmd.csid, CSID_COMMAND);
852        assert_eq!(decoded_cmd.message_type, MSG_COMMAND_AMF0);
853    }
854
855    #[test]
856    fn test_message_too_large_error() {
857        let mut decoder = ChunkDecoder::new();
858        decoder.max_message_size = 100; // Set a small limit
859
860        // Create an encoded chunk with message length > 100
861        let mut encoder = ChunkEncoder::new();
862        let chunk = RtmpChunk {
863            csid: CSID_VIDEO,
864            timestamp: 0,
865            message_type: MSG_VIDEO,
866            stream_id: 1,
867            payload: Bytes::from(vec![0u8; 200]),
868        };
869
870        let mut encoded = BytesMut::new();
871        encoder.encode(&chunk, &mut encoded);
872
873        // Decoding should fail with MessageTooLarge
874        let result = decoder.decode(&mut encoded);
875        assert!(result.is_err());
876        if let Err(e) = result {
877            assert!(e.to_string().contains("too large"));
878        }
879    }
880
881    #[test]
882    fn test_header_format_selection() {
883        let mut encoder = ChunkEncoder::new();
884        let mut buf = BytesMut::new();
885
886        // First message uses fmt 0 (full header)
887        let chunk1 = RtmpChunk {
888            csid: CSID_COMMAND,
889            timestamp: 0,
890            message_type: MSG_COMMAND_AMF0,
891            stream_id: 0,
892            payload: Bytes::from_static(b"test"),
893        };
894        encoder.encode(&chunk1, &mut buf);
895
896        let first_byte = buf[0];
897        let fmt = (first_byte >> 6) & 0x03;
898        assert_eq!(fmt, 0, "First message should use fmt 0");
899
900        buf.clear();
901
902        // Same stream_id, same type, same length - should use compressed header
903        let chunk2 = RtmpChunk {
904            csid: CSID_COMMAND,
905            timestamp: 100,
906            message_type: MSG_COMMAND_AMF0,
907            stream_id: 0,
908            payload: Bytes::from_static(b"test"), // Same length
909        };
910        encoder.encode(&chunk2, &mut buf);
911
912        let first_byte = buf[0];
913        let fmt = (first_byte >> 6) & 0x03;
914        // Should use fmt 2 (timestamp delta only) or fmt 3 (if delta matches)
915        assert!(fmt >= 2, "Subsequent message should use compressed header");
916    }
917
918    #[test]
919    fn test_timestamp_delta_wrapping() {
920        let mut encoder = ChunkEncoder::new();
921        let mut decoder = ChunkDecoder::new();
922
923        // Start near the timestamp wrap point
924        let chunk1 = RtmpChunk {
925            csid: CSID_VIDEO,
926            timestamp: 0xFFFFFFF0,
927            message_type: MSG_VIDEO,
928            stream_id: 1,
929            payload: Bytes::from_static(b"near wrap"),
930        };
931
932        // Timestamp wraps around
933        let chunk2 = RtmpChunk {
934            csid: CSID_VIDEO,
935            timestamp: 0x00000010, // Wrapped around
936            message_type: MSG_VIDEO,
937            stream_id: 1,
938            payload: Bytes::from_static(b"after wrap"),
939        };
940
941        let mut encoded = BytesMut::new();
942        encoder.encode(&chunk1, &mut encoded);
943        encoder.encode(&chunk2, &mut encoded);
944
945        let decoded1 = decoder.decode(&mut encoded).unwrap().unwrap();
946        assert_eq!(decoded1.timestamp, 0xFFFFFFF0);
947
948        // The second timestamp calculation involves wrapping arithmetic
949        let decoded2 = decoder.decode(&mut encoded).unwrap().unwrap();
950        // Just verify it decoded without error; exact value depends on implementation
951        assert!(decoded2.timestamp != 0xFFFFFFF0);
952    }
953
954    #[test]
955    fn test_custom_chunk_size_encoding_decoding() {
956        let mut encoder = ChunkEncoder::new();
957        let mut decoder = ChunkDecoder::new();
958
959        // Use a larger chunk size to reduce fragmentation
960        encoder.set_chunk_size(1024);
961        decoder.set_chunk_size(1024);
962
963        let payload = vec![0u8; 2000]; // Would be 16 chunks at 128, but only 2 at 1024
964        let chunk = RtmpChunk {
965            csid: CSID_VIDEO,
966            timestamp: 0,
967            message_type: MSG_VIDEO,
968            stream_id: 1,
969            payload: Bytes::from(payload),
970        };
971
972        let mut encoded = BytesMut::new();
973        encoder.encode(&chunk, &mut encoded);
974
975        // Should be smaller than with default chunk size due to fewer headers
976        let decoded = loop {
977            if let Some(c) = decoder.decode(&mut encoded).unwrap() {
978                break c;
979            }
980        };
981        assert_eq!(decoded.payload.len(), 2000);
982    }
983
984    #[test]
985    fn test_incremental_decoding() {
986        let mut encoder = ChunkEncoder::new();
987        let mut decoder = ChunkDecoder::new();
988
989        let chunk = RtmpChunk {
990            csid: CSID_COMMAND,
991            timestamp: 500,
992            message_type: MSG_COMMAND_AMF0,
993            stream_id: 0,
994            payload: Bytes::from_static(b"test payload for incremental decode"),
995        };
996
997        let mut encoded = BytesMut::new();
998        encoder.encode(&chunk, &mut encoded);
999
1000        // Feed one byte at a time to test incremental decoding
1001        let mut partial = BytesMut::new();
1002        let mut decoded_chunk = None;
1003
1004        for byte in encoded.iter() {
1005            partial.put_u8(*byte);
1006            if let Some(c) = decoder.decode(&mut partial).unwrap() {
1007                decoded_chunk = Some(c);
1008                break;
1009            }
1010        }
1011
1012        assert!(decoded_chunk.is_some());
1013        let decoded = decoded_chunk.unwrap();
1014        assert_eq!(decoded.timestamp, 500);
1015        assert_eq!(
1016            decoded.payload.as_ref(),
1017            b"test payload for incremental decode"
1018        );
1019    }
1020}