lnmp_codec/binary/
streaming.rs

1//! Streaming Frame Layer (SFL) for LNMP v0.5
2//!
3//! This module provides chunked transmission support for large LNMP payloads,
4//! enabling streaming with backpressure control and integrity validation.
5
6use super::error::BinaryError;
7
8/// Configuration for streaming operations
9#[derive(Debug, Clone, PartialEq)]
10pub struct StreamingConfig {
11    /// Size of each chunk in bytes (default: 4096)
12    pub chunk_size: usize,
13    /// Enable compression for payloads
14    pub enable_compression: bool,
15    /// Enable checksum validation for chunks
16    pub enable_checksums: bool,
17}
18
19impl StreamingConfig {
20    /// Creates a new StreamingConfig with default values
21    pub fn new() -> Self {
22        Self {
23            chunk_size: 4096, // 4KB default
24            enable_compression: false,
25            enable_checksums: true,
26        }
27    }
28
29    /// Sets the chunk size
30    pub fn with_chunk_size(mut self, size: usize) -> Self {
31        self.chunk_size = size;
32        self
33    }
34
35    /// Enables or disables compression
36    pub fn with_compression(mut self, enabled: bool) -> Self {
37        self.enable_compression = enabled;
38        self
39    }
40
41    /// Enables or disables checksums
42    pub fn with_checksums(mut self, enabled: bool) -> Self {
43        self.enable_checksums = enabled;
44        self
45    }
46}
47
48impl Default for StreamingConfig {
49    fn default() -> Self {
50        Self::new()
51    }
52}
53
54/// Streaming state for encoder
55#[derive(Debug, Clone, PartialEq, Eq)]
56pub enum StreamingState {
57    /// No stream active
58    Idle,
59    /// Stream in progress
60    Streaming {
61        /// Number of bytes sent so far
62        bytes_sent: usize,
63        /// Number of chunks sent so far
64        chunks_sent: usize,
65    },
66    /// Stream completed successfully
67    Complete,
68    /// Stream failed with error
69    Error(String),
70}
71
72/// Streaming encoder for chunked transmission
73pub struct StreamingEncoder {
74    config: StreamingConfig,
75    state: StreamingState,
76}
77
78impl StreamingEncoder {
79    /// Creates a new StreamingEncoder with default configuration
80    pub fn new() -> Self {
81        Self {
82            config: StreamingConfig::new(),
83            state: StreamingState::Idle,
84        }
85    }
86
87    /// Creates a new StreamingEncoder with custom configuration
88    pub fn with_config(config: StreamingConfig) -> Self {
89        Self {
90            config,
91            state: StreamingState::Idle,
92        }
93    }
94
95    /// Emits a BEGIN frame to start the stream
96    pub fn begin_stream(&mut self) -> Result<Vec<u8>, StreamingError> {
97        match &self.state {
98            StreamingState::Idle | StreamingState::Complete | StreamingState::Error(_) => {
99                self.state = StreamingState::Streaming {
100                    bytes_sent: 0,
101                    chunks_sent: 0,
102                };
103                let frame = StreamingFrame::begin();
104                self.encode_frame(&frame)
105            }
106            StreamingState::Streaming { .. } => Err(StreamingError::UnexpectedFrame {
107                expected: FrameType::Chunk,
108                found: FrameType::Begin,
109            }),
110        }
111    }
112
113    /// Segments data and emits CHUNK frames
114    pub fn write_chunk(&mut self, data: &[u8]) -> Result<Vec<u8>, StreamingError> {
115        match &self.state {
116            StreamingState::Streaming {
117                bytes_sent,
118                chunks_sent,
119            } => {
120                if data.len() > self.config.chunk_size {
121                    return Err(StreamingError::ChunkSizeExceeded {
122                        size: data.len(),
123                        max: self.config.chunk_size,
124                    });
125                }
126
127                let has_more = false; // Caller determines if more chunks follow
128                let frame = StreamingFrame::chunk(data.to_vec(), has_more);
129
130                // Update state
131                self.state = StreamingState::Streaming {
132                    bytes_sent: bytes_sent + data.len(),
133                    chunks_sent: chunks_sent + 1,
134                };
135
136                self.encode_frame(&frame)
137            }
138            StreamingState::Idle => Err(StreamingError::StreamNotStarted),
139            StreamingState::Complete => Err(StreamingError::StreamAlreadyComplete),
140            StreamingState::Error(_msg) => Err(StreamingError::UnexpectedFrame {
141                expected: FrameType::Chunk,
142                found: FrameType::Error,
143            }),
144        }
145    }
146
147    /// Emits an END frame to complete the stream
148    pub fn end_stream(&mut self) -> Result<Vec<u8>, StreamingError> {
149        match &self.state {
150            StreamingState::Streaming { .. } => {
151                self.state = StreamingState::Complete;
152                let frame = StreamingFrame::end();
153                self.encode_frame(&frame)
154            }
155            StreamingState::Idle => Err(StreamingError::StreamNotStarted),
156            StreamingState::Complete => Err(StreamingError::StreamAlreadyComplete),
157            StreamingState::Error(_) => Err(StreamingError::UnexpectedFrame {
158                expected: FrameType::End,
159                found: FrameType::Error,
160            }),
161        }
162    }
163
164    /// Emits an ERROR frame
165    pub fn error_frame(&mut self, error: &str) -> Result<Vec<u8>, StreamingError> {
166        self.state = StreamingState::Error(error.to_string());
167        let frame = StreamingFrame::error(error.to_string());
168        self.encode_frame(&frame)
169    }
170
171    /// Returns the current streaming state
172    pub fn state(&self) -> &StreamingState {
173        &self.state
174    }
175
176    /// Encodes a frame to bytes
177    fn encode_frame(&self, frame: &StreamingFrame) -> Result<Vec<u8>, StreamingError> {
178        let mut bytes = Vec::new();
179
180        // FRAME_ID (1 byte)
181        bytes.push(frame.frame_type.to_u8());
182
183        // FLAGS (1 byte)
184        bytes.push(frame.flags.to_u8());
185
186        // CHUNK_SIZE (VarInt)
187        bytes.extend(super::varint::encode(frame.chunk_size as i64));
188
189        // CHECKSUM (4 bytes) - only for CHUNK frames with checksums enabled
190        if frame.frame_type == FrameType::Chunk && self.config.enable_checksums {
191            bytes.extend(&frame.checksum.to_le_bytes());
192        } else {
193            bytes.extend(&[0u8; 4]);
194        }
195
196        // PAYLOAD (variable)
197        bytes.extend(&frame.payload);
198
199        Ok(bytes)
200    }
201}
202
203impl Default for StreamingEncoder {
204    fn default() -> Self {
205        Self::new()
206    }
207}
208
209/// Events emitted by the streaming decoder
210#[derive(Debug, Clone, PartialEq)]
211pub enum StreamingEvent {
212    /// Stream started (BEGIN frame received)
213    StreamStarted,
214    /// Chunk received
215    ChunkReceived {
216        /// Number of bytes in this chunk
217        bytes: usize,
218    },
219    /// Stream completed successfully (END frame received)
220    StreamComplete {
221        /// Total bytes received
222        total_bytes: usize,
223    },
224    /// Stream error (ERROR frame received)
225    StreamError {
226        /// Error message
227        message: String,
228    },
229}
230
231/// Streaming decoder for receiving chunked transmissions
232pub struct StreamingDecoder {
233    config: StreamingConfig,
234    state: StreamingState,
235    buffer: Vec<u8>,
236}
237
238impl StreamingDecoder {
239    /// Creates a new StreamingDecoder with default configuration
240    pub fn new() -> Self {
241        Self {
242            config: StreamingConfig::new(),
243            state: StreamingState::Idle,
244            buffer: Vec::new(),
245        }
246    }
247
248    /// Creates a new StreamingDecoder with custom configuration
249    pub fn with_config(config: StreamingConfig) -> Self {
250        Self {
251            config,
252            state: StreamingState::Idle,
253            buffer: Vec::new(),
254        }
255    }
256
257    /// Processes an incoming frame
258    pub fn feed_frame(&mut self, frame_bytes: &[u8]) -> Result<StreamingEvent, StreamingError> {
259        let frame = self.decode_frame(frame_bytes)?;
260
261        match frame.frame_type {
262            FrameType::Begin => match &self.state {
263                StreamingState::Idle | StreamingState::Complete | StreamingState::Error(_) => {
264                    self.state = StreamingState::Streaming {
265                        bytes_sent: 0,
266                        chunks_sent: 0,
267                    };
268                    self.buffer.clear();
269                    Ok(StreamingEvent::StreamStarted)
270                }
271                StreamingState::Streaming { .. } => Err(StreamingError::UnexpectedFrame {
272                    expected: FrameType::Chunk,
273                    found: FrameType::Begin,
274                }),
275            },
276            FrameType::Chunk => {
277                match &self.state {
278                    StreamingState::Streaming {
279                        bytes_sent,
280                        chunks_sent,
281                    } => {
282                        // Validate checksum if enabled
283                        if self.config.enable_checksums {
284                            frame.validate_checksum()?;
285                        }
286
287                        // Append to buffer
288                        let chunk_size = frame.payload.len();
289                        self.buffer.extend(&frame.payload);
290
291                        // Update state
292                        self.state = StreamingState::Streaming {
293                            bytes_sent: bytes_sent + chunk_size,
294                            chunks_sent: chunks_sent + 1,
295                        };
296
297                        Ok(StreamingEvent::ChunkReceived { bytes: chunk_size })
298                    }
299                    StreamingState::Idle => Err(StreamingError::StreamNotStarted),
300                    StreamingState::Complete => Err(StreamingError::StreamAlreadyComplete),
301                    StreamingState::Error(_) => Err(StreamingError::UnexpectedFrame {
302                        expected: FrameType::Chunk,
303                        found: FrameType::Error,
304                    }),
305                }
306            }
307            FrameType::End => match &self.state {
308                StreamingState::Streaming { bytes_sent, .. } => {
309                    let total_bytes = *bytes_sent;
310                    self.state = StreamingState::Complete;
311                    Ok(StreamingEvent::StreamComplete { total_bytes })
312                }
313                StreamingState::Idle => Err(StreamingError::StreamNotStarted),
314                StreamingState::Complete => Err(StreamingError::StreamAlreadyComplete),
315                StreamingState::Error(_) => Err(StreamingError::UnexpectedFrame {
316                    expected: FrameType::End,
317                    found: FrameType::Error,
318                }),
319            },
320            FrameType::Error => {
321                let message = String::from_utf8_lossy(&frame.payload).to_string();
322                self.state = StreamingState::Error(message.clone());
323                Ok(StreamingEvent::StreamError { message })
324            }
325        }
326    }
327
328    /// Returns the complete payload if stream is complete
329    pub fn get_complete_payload(&self) -> Option<&[u8]> {
330        match &self.state {
331            StreamingState::Complete => Some(&self.buffer),
332            _ => None,
333        }
334    }
335
336    /// Returns the current streaming state
337    pub fn state(&self) -> &StreamingState {
338        &self.state
339    }
340
341    /// Decodes a frame from bytes
342    fn decode_frame(&self, bytes: &[u8]) -> Result<StreamingFrame, StreamingError> {
343        if bytes.len() < 7 {
344            return Err(StreamingError::BinaryError(BinaryError::UnexpectedEof {
345                expected: 7,
346                found: bytes.len(),
347            }));
348        }
349
350        let mut pos = 0;
351
352        // FRAME_ID (1 byte)
353        let frame_type = FrameType::from_u8(bytes[pos])?;
354        pos += 1;
355
356        // FLAGS (1 byte)
357        let flags = FrameFlags::from_u8(bytes[pos]);
358        pos += 1;
359
360        // CHUNK_SIZE (VarInt)
361        let (chunk_size_i64, varint_len) = super::varint::decode(&bytes[pos..])?;
362        let chunk_size = chunk_size_i64 as usize;
363        pos += varint_len;
364
365        // CHECKSUM (4 bytes)
366        if pos + 4 > bytes.len() {
367            return Err(StreamingError::BinaryError(BinaryError::UnexpectedEof {
368                expected: pos + 4,
369                found: bytes.len(),
370            }));
371        }
372        let checksum =
373            u32::from_le_bytes([bytes[pos], bytes[pos + 1], bytes[pos + 2], bytes[pos + 3]]);
374        pos += 4;
375
376        // PAYLOAD (variable)
377        if pos + chunk_size > bytes.len() {
378            return Err(StreamingError::BinaryError(BinaryError::UnexpectedEof {
379                expected: pos + chunk_size,
380                found: bytes.len(),
381            }));
382        }
383        let payload = bytes[pos..pos + chunk_size].to_vec();
384
385        Ok(StreamingFrame {
386            frame_type,
387            flags,
388            chunk_size,
389            checksum,
390            payload,
391        })
392    }
393}
394
395impl Default for StreamingDecoder {
396    fn default() -> Self {
397        Self::new()
398    }
399}
400
401/// Backpressure controller for flow control
402#[derive(Debug, Clone, PartialEq)]
403pub struct BackpressureController {
404    /// Maximum number of bytes that can be in flight
405    window_size: usize,
406    /// Current number of bytes in flight (sent but not acknowledged)
407    bytes_in_flight: usize,
408}
409
410impl BackpressureController {
411    /// Creates a new BackpressureController with default window size (64KB)
412    pub fn new() -> Self {
413        Self {
414            window_size: 65536, // 64KB default
415            bytes_in_flight: 0,
416        }
417    }
418
419    /// Creates a new BackpressureController with custom window size
420    pub fn with_window_size(window_size: usize) -> Self {
421        Self {
422            window_size,
423            bytes_in_flight: 0,
424        }
425    }
426
427    /// Checks if more data can be sent
428    pub fn can_send(&self) -> bool {
429        self.bytes_in_flight < self.window_size
430    }
431
432    /// Returns the number of bytes that can be sent
433    pub fn available_window(&self) -> usize {
434        self.window_size.saturating_sub(self.bytes_in_flight)
435    }
436
437    /// Records that a chunk has been sent
438    pub fn on_chunk_sent(&mut self, size: usize) {
439        self.bytes_in_flight = self.bytes_in_flight.saturating_add(size);
440    }
441
442    /// Records that a chunk has been acknowledged
443    pub fn on_chunk_acked(&mut self, size: usize) {
444        self.bytes_in_flight = self.bytes_in_flight.saturating_sub(size);
445    }
446
447    /// Returns the current window size
448    pub fn window_size(&self) -> usize {
449        self.window_size
450    }
451
452    /// Returns the current bytes in flight
453    pub fn bytes_in_flight(&self) -> usize {
454        self.bytes_in_flight
455    }
456
457    /// Resets the controller state
458    pub fn reset(&mut self) {
459        self.bytes_in_flight = 0;
460    }
461}
462
463impl Default for BackpressureController {
464    fn default() -> Self {
465        Self::new()
466    }
467}
468
469/// Frame type identifiers for streaming protocol
470#[repr(u8)]
471#[derive(Debug, Clone, Copy, PartialEq, Eq)]
472pub enum FrameType {
473    /// Begin frame - signals start of stream
474    Begin = 0xA0,
475    /// Chunk frame - contains data segment
476    Chunk = 0xA1,
477    /// End frame - signals completion of stream
478    End = 0xA2,
479    /// Error frame - signals error condition
480    Error = 0xA3,
481}
482
483impl FrameType {
484    /// Converts a byte to a FrameType
485    pub fn from_u8(byte: u8) -> Result<Self, StreamingError> {
486        match byte {
487            0xA0 => Ok(FrameType::Begin),
488            0xA1 => Ok(FrameType::Chunk),
489            0xA2 => Ok(FrameType::End),
490            0xA3 => Ok(FrameType::Error),
491            _ => Err(StreamingError::InvalidFrameType { found: byte }),
492        }
493    }
494
495    /// Converts the FrameType to a byte
496    pub fn to_u8(self) -> u8 {
497        self as u8
498    }
499}
500
501/// Flags byte layout for streaming frames
502#[derive(Debug, Clone, Copy, PartialEq, Eq)]
503pub struct FrameFlags {
504    /// Bit 0: HAS_MORE - indicates more chunks follow
505    pub has_more: bool,
506    /// Bit 1: COMPRESSED - indicates payload is compressed
507    pub compressed: bool,
508    // Bits 2-7: Reserved for future use
509}
510
511impl FrameFlags {
512    /// Creates a new FrameFlags with default values
513    pub fn new() -> Self {
514        Self {
515            has_more: false,
516            compressed: false,
517        }
518    }
519
520    /// Creates FrameFlags from a byte
521    pub fn from_u8(byte: u8) -> Self {
522        Self {
523            has_more: (byte & 0x01) != 0,
524            compressed: (byte & 0x02) != 0,
525        }
526    }
527
528    /// Converts FrameFlags to a byte
529    pub fn to_u8(self) -> u8 {
530        let mut byte = 0u8;
531        if self.has_more {
532            byte |= 0x01;
533        }
534        if self.compressed {
535            byte |= 0x02;
536        }
537        byte
538    }
539}
540
541impl Default for FrameFlags {
542    fn default() -> Self {
543        Self::new()
544    }
545}
546
547/// Streaming frame structure
548///
549/// Frame layout:
550/// ```text
551/// ┌──────────┬──────────┬──────────────┬──────────┬─────────────┐
552/// │ FRAME_ID │  FLAGS   │ CHUNK_SIZE   │ CHECKSUM │   PAYLOAD   │
553/// │ (1 byte) │ (1 byte) │  (VarInt)    │ (4 bytes)│  (variable) │
554/// └──────────┴──────────┴──────────────┴──────────┴─────────────┘
555/// ```
556#[derive(Debug, Clone, PartialEq)]
557pub struct StreamingFrame {
558    /// Frame type identifier
559    pub frame_type: FrameType,
560    /// Frame flags
561    pub flags: FrameFlags,
562    /// Size of the payload chunk (0 for BEGIN/END/ERROR frames)
563    pub chunk_size: usize,
564    /// XOR checksum of the payload (0 for BEGIN/END frames)
565    pub checksum: u32,
566    /// Frame payload data
567    pub payload: Vec<u8>,
568}
569
570impl StreamingFrame {
571    /// Creates a new BEGIN frame
572    pub fn begin() -> Self {
573        Self {
574            frame_type: FrameType::Begin,
575            flags: FrameFlags::new(),
576            chunk_size: 0,
577            checksum: 0,
578            payload: Vec::new(),
579        }
580    }
581
582    /// Creates a new CHUNK frame with payload
583    pub fn chunk(payload: Vec<u8>, has_more: bool) -> Self {
584        let chunk_size = payload.len();
585        let checksum = Self::compute_xor_checksum(&payload);
586        let mut flags = FrameFlags::new();
587        flags.has_more = has_more;
588
589        Self {
590            frame_type: FrameType::Chunk,
591            flags,
592            chunk_size,
593            checksum,
594            payload,
595        }
596    }
597
598    /// Creates a new END frame
599    pub fn end() -> Self {
600        Self {
601            frame_type: FrameType::End,
602            flags: FrameFlags::new(),
603            chunk_size: 0,
604            checksum: 0,
605            payload: Vec::new(),
606        }
607    }
608
609    /// Creates a new ERROR frame with error message
610    pub fn error(message: String) -> Self {
611        let payload = message.into_bytes();
612        let chunk_size = payload.len();
613
614        Self {
615            frame_type: FrameType::Error,
616            flags: FrameFlags::new(),
617            chunk_size,
618            checksum: 0,
619            payload,
620        }
621    }
622
623    /// Computes XOR checksum for payload data
624    pub fn compute_xor_checksum(data: &[u8]) -> u32 {
625        let mut checksum = 0u32;
626        for chunk in data.chunks(4) {
627            let mut word = 0u32;
628            for (i, &byte) in chunk.iter().enumerate() {
629                word |= (byte as u32) << (i * 8);
630            }
631            checksum ^= word;
632        }
633        checksum
634    }
635
636    /// Validates the checksum of this frame
637    pub fn validate_checksum(&self) -> Result<(), StreamingError> {
638        if self.frame_type == FrameType::Chunk {
639            let computed = Self::compute_xor_checksum(&self.payload);
640            if computed != self.checksum {
641                return Err(StreamingError::ChecksumMismatch {
642                    expected: self.checksum,
643                    found: computed,
644                });
645            }
646        }
647        Ok(())
648    }
649}
650
651/// Error types for streaming operations
652#[derive(Debug, Clone, PartialEq)]
653pub enum StreamingError {
654    /// Invalid frame type byte
655    InvalidFrameType {
656        /// The invalid frame type byte
657        found: u8,
658    },
659
660    /// Checksum mismatch in chunk frame
661    ChecksumMismatch {
662        /// Expected checksum value
663        expected: u32,
664        /// Computed checksum value
665        found: u32,
666    },
667
668    /// Unexpected frame type in sequence
669    UnexpectedFrame {
670        /// Expected frame type
671        expected: FrameType,
672        /// Found frame type
673        found: FrameType,
674    },
675
676    /// Stream not started (no BEGIN frame received)
677    StreamNotStarted,
678
679    /// Stream already complete (END frame already received)
680    StreamAlreadyComplete,
681
682    /// Chunk size exceeds maximum allowed
683    ChunkSizeExceeded {
684        /// Actual chunk size
685        size: usize,
686        /// Maximum allowed size
687        max: usize,
688    },
689
690    /// Binary encoding/decoding error
691    BinaryError(BinaryError),
692}
693
694impl std::fmt::Display for StreamingError {
695    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
696        match self {
697            StreamingError::InvalidFrameType { found } => {
698                write!(f, "Invalid frame type: 0x{:02X}", found)
699            }
700            StreamingError::ChecksumMismatch { expected, found } => {
701                write!(
702                    f,
703                    "Checksum mismatch: expected 0x{:08X}, found 0x{:08X}",
704                    expected, found
705                )
706            }
707            StreamingError::UnexpectedFrame { expected, found } => {
708                write!(
709                    f,
710                    "Unexpected frame: expected {:?}, found {:?}",
711                    expected, found
712                )
713            }
714            StreamingError::StreamNotStarted => {
715                write!(f, "Stream not started: no BEGIN frame received")
716            }
717            StreamingError::StreamAlreadyComplete => {
718                write!(f, "Stream already complete: END frame already received")
719            }
720            StreamingError::ChunkSizeExceeded { size, max } => {
721                write!(
722                    f,
723                    "Chunk size exceeded: size {} bytes exceeds maximum {} bytes",
724                    size, max
725                )
726            }
727            StreamingError::BinaryError(err) => {
728                write!(f, "Binary error: {}", err)
729            }
730        }
731    }
732}
733
734impl std::error::Error for StreamingError {}
735
736impl From<BinaryError> for StreamingError {
737    fn from(err: BinaryError) -> Self {
738        StreamingError::BinaryError(err)
739    }
740}
741
742#[cfg(test)]
743mod tests {
744    #![allow(clippy::approx_constant)]
745
746    use super::*;
747
748    #[test]
749    fn test_frame_type_from_u8() {
750        assert_eq!(FrameType::from_u8(0xA0).unwrap(), FrameType::Begin);
751        assert_eq!(FrameType::from_u8(0xA1).unwrap(), FrameType::Chunk);
752        assert_eq!(FrameType::from_u8(0xA2).unwrap(), FrameType::End);
753        assert_eq!(FrameType::from_u8(0xA3).unwrap(), FrameType::Error);
754    }
755
756    #[test]
757    fn test_frame_type_from_u8_invalid() {
758        assert!(FrameType::from_u8(0x00).is_err());
759        assert!(FrameType::from_u8(0xFF).is_err());
760        assert!(FrameType::from_u8(0xA4).is_err());
761    }
762
763    #[test]
764    fn test_frame_type_to_u8() {
765        assert_eq!(FrameType::Begin.to_u8(), 0xA0);
766        assert_eq!(FrameType::Chunk.to_u8(), 0xA1);
767        assert_eq!(FrameType::End.to_u8(), 0xA2);
768        assert_eq!(FrameType::Error.to_u8(), 0xA3);
769    }
770
771    #[test]
772    fn test_frame_type_round_trip() {
773        let types = vec![
774            FrameType::Begin,
775            FrameType::Chunk,
776            FrameType::End,
777            FrameType::Error,
778        ];
779
780        for frame_type in types {
781            let byte = frame_type.to_u8();
782            let parsed = FrameType::from_u8(byte).unwrap();
783            assert_eq!(parsed, frame_type);
784        }
785    }
786
787    #[test]
788    fn test_frame_flags_default() {
789        let flags = FrameFlags::new();
790        assert!(!flags.has_more);
791        assert!(!flags.compressed);
792    }
793
794    #[test]
795    fn test_frame_flags_from_u8() {
796        let flags = FrameFlags::from_u8(0x00);
797        assert!(!flags.has_more);
798        assert!(!flags.compressed);
799
800        let flags = FrameFlags::from_u8(0x01);
801        assert!(flags.has_more);
802        assert!(!flags.compressed);
803
804        let flags = FrameFlags::from_u8(0x02);
805        assert!(!flags.has_more);
806        assert!(flags.compressed);
807
808        let flags = FrameFlags::from_u8(0x03);
809        assert!(flags.has_more);
810        assert!(flags.compressed);
811    }
812
813    #[test]
814    fn test_frame_flags_to_u8() {
815        let mut flags = FrameFlags::new();
816        assert_eq!(flags.to_u8(), 0x00);
817
818        flags.has_more = true;
819        assert_eq!(flags.to_u8(), 0x01);
820
821        flags.has_more = false;
822        flags.compressed = true;
823        assert_eq!(flags.to_u8(), 0x02);
824
825        flags.has_more = true;
826        assert_eq!(flags.to_u8(), 0x03);
827    }
828
829    #[test]
830    fn test_frame_flags_round_trip() {
831        for byte in 0..=0xFF {
832            let flags = FrameFlags::from_u8(byte);
833            let back = flags.to_u8();
834            // Only bits 0 and 1 are used, so mask the result
835            assert_eq!(back, byte & 0x03);
836        }
837    }
838
839    #[test]
840    fn test_streaming_frame_begin() {
841        let frame = StreamingFrame::begin();
842        assert_eq!(frame.frame_type, FrameType::Begin);
843        assert_eq!(frame.chunk_size, 0);
844        assert_eq!(frame.checksum, 0);
845        assert!(frame.payload.is_empty());
846    }
847
848    #[test]
849    fn test_streaming_frame_chunk() {
850        let payload = vec![1, 2, 3, 4, 5];
851        let frame = StreamingFrame::chunk(payload.clone(), true);
852        assert_eq!(frame.frame_type, FrameType::Chunk);
853        assert_eq!(frame.chunk_size, 5);
854        assert!(frame.flags.has_more);
855        assert_eq!(frame.payload, payload);
856        assert_ne!(frame.checksum, 0);
857    }
858
859    #[test]
860    fn test_streaming_frame_end() {
861        let frame = StreamingFrame::end();
862        assert_eq!(frame.frame_type, FrameType::End);
863        assert_eq!(frame.chunk_size, 0);
864        assert_eq!(frame.checksum, 0);
865        assert!(frame.payload.is_empty());
866    }
867
868    #[test]
869    fn test_streaming_frame_error() {
870        let message = "Test error".to_string();
871        let frame = StreamingFrame::error(message.clone());
872        assert_eq!(frame.frame_type, FrameType::Error);
873        assert_eq!(frame.chunk_size, message.len());
874        assert_eq!(frame.payload, message.as_bytes());
875    }
876
877    #[test]
878    fn test_compute_xor_checksum_empty() {
879        let checksum = StreamingFrame::compute_xor_checksum(&[]);
880        assert_eq!(checksum, 0);
881    }
882
883    #[test]
884    fn test_compute_xor_checksum_single_byte() {
885        let checksum = StreamingFrame::compute_xor_checksum(&[0x42]);
886        assert_eq!(checksum, 0x42);
887    }
888
889    #[test]
890    fn test_compute_xor_checksum_four_bytes() {
891        let data = vec![0x01, 0x02, 0x03, 0x04];
892        let checksum = StreamingFrame::compute_xor_checksum(&data);
893        // 0x04030201 in little-endian
894        assert_eq!(checksum, 0x04030201);
895    }
896
897    #[test]
898    fn test_compute_xor_checksum_multiple_words() {
899        let data = vec![0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08];
900        let checksum = StreamingFrame::compute_xor_checksum(&data);
901        // XOR of 0x04030201 and 0x08070605
902        assert_eq!(checksum, 0x04030201 ^ 0x08070605);
903    }
904
905    #[test]
906    fn test_compute_xor_checksum_partial_word() {
907        let data = vec![0x01, 0x02, 0x03, 0x04, 0x05];
908        let checksum = StreamingFrame::compute_xor_checksum(&data);
909        // XOR of 0x04030201 and 0x00000005
910        assert_eq!(checksum, 0x04030201 ^ 0x00000005);
911    }
912
913    #[test]
914    fn test_validate_checksum_chunk_valid() {
915        let payload = vec![1, 2, 3, 4, 5];
916        let frame = StreamingFrame::chunk(payload, false);
917        assert!(frame.validate_checksum().is_ok());
918    }
919
920    #[test]
921    fn test_validate_checksum_chunk_invalid() {
922        let payload = vec![1, 2, 3, 4, 5];
923        let mut frame = StreamingFrame::chunk(payload, false);
924        frame.checksum = 0xDEADBEEF; // Wrong checksum
925        assert!(frame.validate_checksum().is_err());
926    }
927
928    #[test]
929    fn test_validate_checksum_non_chunk() {
930        let frame = StreamingFrame::begin();
931        assert!(frame.validate_checksum().is_ok());
932
933        let frame = StreamingFrame::end();
934        assert!(frame.validate_checksum().is_ok());
935
936        let frame = StreamingFrame::error("test".to_string());
937        assert!(frame.validate_checksum().is_ok());
938    }
939
940    #[test]
941    fn test_streaming_error_display() {
942        let err = StreamingError::InvalidFrameType { found: 0xFF };
943        assert!(format!("{}", err).contains("0xFF"));
944
945        let err = StreamingError::ChecksumMismatch {
946            expected: 0x1234,
947            found: 0x5678,
948        };
949        assert!(format!("{}", err).contains("0x00001234"));
950        assert!(format!("{}", err).contains("0x00005678"));
951
952        let err = StreamingError::StreamNotStarted;
953        assert!(format!("{}", err).contains("not started"));
954
955        let err = StreamingError::StreamAlreadyComplete;
956        assert!(format!("{}", err).contains("already complete"));
957    }
958
959    #[test]
960    fn test_streaming_config_default() {
961        let config = StreamingConfig::new();
962        assert_eq!(config.chunk_size, 4096);
963        assert!(!config.enable_compression);
964        assert!(config.enable_checksums);
965    }
966
967    #[test]
968    fn test_streaming_config_builder() {
969        let config = StreamingConfig::new()
970            .with_chunk_size(8192)
971            .with_compression(true)
972            .with_checksums(false);
973
974        assert_eq!(config.chunk_size, 8192);
975        assert!(config.enable_compression);
976        assert!(!config.enable_checksums);
977    }
978
979    #[test]
980    fn test_streaming_config_default_trait() {
981        let config = StreamingConfig::default();
982        assert_eq!(config.chunk_size, 4096);
983        assert!(!config.enable_compression);
984        assert!(config.enable_checksums);
985    }
986
987    #[test]
988    fn test_streaming_encoder_new() {
989        let encoder = StreamingEncoder::new();
990        assert_eq!(encoder.state, StreamingState::Idle);
991    }
992
993    #[test]
994    fn test_streaming_encoder_begin_stream() {
995        let mut encoder = StreamingEncoder::new();
996        let result = encoder.begin_stream();
997        assert!(result.is_ok());
998
999        match encoder.state {
1000            StreamingState::Streaming {
1001                bytes_sent,
1002                chunks_sent,
1003            } => {
1004                assert_eq!(bytes_sent, 0);
1005                assert_eq!(chunks_sent, 0);
1006            }
1007            _ => panic!("Expected Streaming state"),
1008        }
1009    }
1010
1011    #[test]
1012    fn test_streaming_encoder_write_chunk() {
1013        let mut encoder = StreamingEncoder::new();
1014        encoder.begin_stream().unwrap();
1015
1016        let data = vec![1, 2, 3, 4, 5];
1017        let result = encoder.write_chunk(&data);
1018        assert!(result.is_ok());
1019
1020        match encoder.state {
1021            StreamingState::Streaming {
1022                bytes_sent,
1023                chunks_sent,
1024            } => {
1025                assert_eq!(bytes_sent, 5);
1026                assert_eq!(chunks_sent, 1);
1027            }
1028            _ => panic!("Expected Streaming state"),
1029        }
1030    }
1031
1032    #[test]
1033    fn test_streaming_encoder_write_chunk_without_begin() {
1034        let mut encoder = StreamingEncoder::new();
1035        let data = vec![1, 2, 3];
1036        let result = encoder.write_chunk(&data);
1037        assert!(matches!(result, Err(StreamingError::StreamNotStarted)));
1038    }
1039
1040    #[test]
1041    fn test_streaming_encoder_write_chunk_exceeds_size() {
1042        let config = StreamingConfig::new().with_chunk_size(10);
1043        let mut encoder = StreamingEncoder::with_config(config);
1044        encoder.begin_stream().unwrap();
1045
1046        let data = vec![0u8; 20]; // Exceeds chunk size
1047        let result = encoder.write_chunk(&data);
1048        assert!(matches!(
1049            result,
1050            Err(StreamingError::ChunkSizeExceeded { .. })
1051        ));
1052    }
1053
1054    #[test]
1055    fn test_streaming_encoder_end_stream() {
1056        let mut encoder = StreamingEncoder::new();
1057        encoder.begin_stream().unwrap();
1058
1059        let result = encoder.end_stream();
1060        assert!(result.is_ok());
1061        assert_eq!(encoder.state, StreamingState::Complete);
1062    }
1063
1064    #[test]
1065    fn test_streaming_encoder_end_stream_without_begin() {
1066        let mut encoder = StreamingEncoder::new();
1067        let result = encoder.end_stream();
1068        assert!(matches!(result, Err(StreamingError::StreamNotStarted)));
1069    }
1070
1071    #[test]
1072    fn test_streaming_encoder_error_frame() {
1073        let mut encoder = StreamingEncoder::new();
1074        let result = encoder.error_frame("Test error");
1075        assert!(result.is_ok());
1076
1077        match encoder.state {
1078            StreamingState::Error(msg) => {
1079                assert_eq!(msg, "Test error");
1080            }
1081            _ => panic!("Expected Error state"),
1082        }
1083    }
1084
1085    #[test]
1086    fn test_streaming_encoder_full_flow() {
1087        let mut encoder = StreamingEncoder::new();
1088
1089        // Begin
1090        let begin_bytes = encoder.begin_stream().unwrap();
1091        assert!(!begin_bytes.is_empty());
1092
1093        // Write chunks
1094        let chunk1 = vec![1, 2, 3];
1095        let chunk1_bytes = encoder.write_chunk(&chunk1).unwrap();
1096        assert!(!chunk1_bytes.is_empty());
1097
1098        let chunk2 = vec![4, 5, 6];
1099        let chunk2_bytes = encoder.write_chunk(&chunk2).unwrap();
1100        assert!(!chunk2_bytes.is_empty());
1101
1102        // End
1103        let end_bytes = encoder.end_stream().unwrap();
1104        assert!(!end_bytes.is_empty());
1105
1106        assert_eq!(encoder.state, StreamingState::Complete);
1107    }
1108
1109    #[test]
1110    fn test_streaming_encoder_encode_frame_begin() {
1111        let encoder = StreamingEncoder::new();
1112        let frame = StreamingFrame::begin();
1113        let bytes = encoder.encode_frame(&frame).unwrap();
1114
1115        // Should have: FRAME_ID (1) + FLAGS (1) + CHUNK_SIZE (VarInt) + CHECKSUM (4)
1116        assert!(bytes.len() >= 7);
1117        assert_eq!(bytes[0], FrameType::Begin.to_u8());
1118    }
1119
1120    #[test]
1121    fn test_streaming_encoder_encode_frame_chunk() {
1122        let encoder = StreamingEncoder::new();
1123        let payload = vec![1, 2, 3, 4, 5];
1124        let frame = StreamingFrame::chunk(payload.clone(), false);
1125        let bytes = encoder.encode_frame(&frame).unwrap();
1126
1127        // Should have: FRAME_ID (1) + FLAGS (1) + CHUNK_SIZE (VarInt) + CHECKSUM (4) + PAYLOAD
1128        assert!(bytes.len() >= 7 + payload.len());
1129        assert_eq!(bytes[0], FrameType::Chunk.to_u8());
1130    }
1131
1132    #[test]
1133    fn test_streaming_encoder_multiple_chunks() {
1134        let mut encoder = StreamingEncoder::new();
1135        encoder.begin_stream().unwrap();
1136
1137        for i in 0..5 {
1138            let data = vec![i; 10];
1139            let result = encoder.write_chunk(&data);
1140            assert!(result.is_ok());
1141        }
1142
1143        match encoder.state {
1144            StreamingState::Streaming {
1145                bytes_sent,
1146                chunks_sent,
1147            } => {
1148                assert_eq!(bytes_sent, 50);
1149                assert_eq!(chunks_sent, 5);
1150            }
1151            _ => panic!("Expected Streaming state"),
1152        }
1153    }
1154
1155    #[test]
1156    fn test_streaming_decoder_new() {
1157        let decoder = StreamingDecoder::new();
1158        assert_eq!(decoder.state, StreamingState::Idle);
1159        assert!(decoder.buffer.is_empty());
1160    }
1161
1162    #[test]
1163    fn test_streaming_decoder_feed_begin() {
1164        let mut encoder = StreamingEncoder::new();
1165        let begin_bytes = encoder.begin_stream().unwrap();
1166
1167        let mut decoder = StreamingDecoder::new();
1168        let event = decoder.feed_frame(&begin_bytes).unwrap();
1169
1170        assert_eq!(event, StreamingEvent::StreamStarted);
1171        match decoder.state {
1172            StreamingState::Streaming { .. } => {}
1173            _ => panic!("Expected Streaming state"),
1174        }
1175    }
1176
1177    #[test]
1178    fn test_streaming_decoder_feed_chunk() {
1179        let mut encoder = StreamingEncoder::new();
1180        encoder.begin_stream().unwrap();
1181
1182        let data = vec![1, 2, 3, 4, 5];
1183        let chunk_bytes = encoder.write_chunk(&data).unwrap();
1184
1185        let mut decoder = StreamingDecoder::new();
1186        decoder
1187            .feed_frame(&encoder.encode_frame(&StreamingFrame::begin()).unwrap())
1188            .unwrap();
1189
1190        let event = decoder.feed_frame(&chunk_bytes).unwrap();
1191        assert_eq!(event, StreamingEvent::ChunkReceived { bytes: 5 });
1192    }
1193
1194    #[test]
1195    fn test_streaming_decoder_feed_end() {
1196        let mut encoder = StreamingEncoder::new();
1197        encoder.begin_stream().unwrap();
1198        let end_bytes = encoder.end_stream().unwrap();
1199
1200        let mut decoder = StreamingDecoder::new();
1201        decoder
1202            .feed_frame(&encoder.encode_frame(&StreamingFrame::begin()).unwrap())
1203            .unwrap();
1204
1205        let event = decoder.feed_frame(&end_bytes).unwrap();
1206        match event {
1207            StreamingEvent::StreamComplete { total_bytes } => {
1208                assert_eq!(total_bytes, 0);
1209            }
1210            _ => panic!("Expected StreamComplete event"),
1211        }
1212    }
1213
1214    #[test]
1215    fn test_streaming_decoder_feed_error() {
1216        let mut encoder = StreamingEncoder::new();
1217        let error_bytes = encoder.error_frame("Test error").unwrap();
1218
1219        let mut decoder = StreamingDecoder::new();
1220        let event = decoder.feed_frame(&error_bytes).unwrap();
1221
1222        match event {
1223            StreamingEvent::StreamError { message } => {
1224                assert_eq!(message, "Test error");
1225            }
1226            _ => panic!("Expected StreamError event"),
1227        }
1228    }
1229
1230    #[test]
1231    fn test_streaming_decoder_chunk_without_begin() {
1232        let mut encoder = StreamingEncoder::new();
1233        encoder.begin_stream().unwrap();
1234        let chunk_bytes = encoder.write_chunk(&[1, 2, 3]).unwrap();
1235
1236        let mut decoder = StreamingDecoder::new();
1237        let result = decoder.feed_frame(&chunk_bytes);
1238        assert!(matches!(result, Err(StreamingError::StreamNotStarted)));
1239    }
1240
1241    #[test]
1242    fn test_streaming_decoder_get_complete_payload() {
1243        let mut encoder = StreamingEncoder::new();
1244        encoder.begin_stream().unwrap();
1245
1246        let data1 = vec![1, 2, 3];
1247        let data2 = vec![4, 5, 6];
1248
1249        let chunk1_bytes = encoder.write_chunk(&data1).unwrap();
1250        let chunk2_bytes = encoder.write_chunk(&data2).unwrap();
1251        let end_bytes = encoder.end_stream().unwrap();
1252
1253        let mut decoder = StreamingDecoder::new();
1254        decoder
1255            .feed_frame(&encoder.encode_frame(&StreamingFrame::begin()).unwrap())
1256            .unwrap();
1257        decoder.feed_frame(&chunk1_bytes).unwrap();
1258        decoder.feed_frame(&chunk2_bytes).unwrap();
1259        decoder.feed_frame(&end_bytes).unwrap();
1260
1261        let payload = decoder.get_complete_payload().unwrap();
1262        assert_eq!(payload, &[1, 2, 3, 4, 5, 6]);
1263    }
1264
1265    #[test]
1266    fn test_streaming_decoder_get_complete_payload_before_end() {
1267        let mut decoder = StreamingDecoder::new();
1268        assert!(decoder.get_complete_payload().is_none());
1269
1270        let mut encoder = StreamingEncoder::new();
1271        let begin_bytes = encoder.begin_stream().unwrap();
1272        decoder.feed_frame(&begin_bytes).unwrap();
1273
1274        assert!(decoder.get_complete_payload().is_none());
1275    }
1276
1277    #[test]
1278    fn test_streaming_round_trip() {
1279        let mut encoder = StreamingEncoder::new();
1280        let mut decoder = StreamingDecoder::new();
1281
1282        // Begin
1283        let begin_bytes = encoder.begin_stream().unwrap();
1284        let event = decoder.feed_frame(&begin_bytes).unwrap();
1285        assert_eq!(event, StreamingEvent::StreamStarted);
1286
1287        // Chunks
1288        let data1 = vec![1, 2, 3, 4, 5];
1289        let chunk1_bytes = encoder.write_chunk(&data1).unwrap();
1290        let event = decoder.feed_frame(&chunk1_bytes).unwrap();
1291        assert_eq!(event, StreamingEvent::ChunkReceived { bytes: 5 });
1292
1293        let data2 = vec![6, 7, 8];
1294        let chunk2_bytes = encoder.write_chunk(&data2).unwrap();
1295        let event = decoder.feed_frame(&chunk2_bytes).unwrap();
1296        assert_eq!(event, StreamingEvent::ChunkReceived { bytes: 3 });
1297
1298        // End
1299        let end_bytes = encoder.end_stream().unwrap();
1300        let event = decoder.feed_frame(&end_bytes).unwrap();
1301        match event {
1302            StreamingEvent::StreamComplete { total_bytes } => {
1303                assert_eq!(total_bytes, 8);
1304            }
1305            _ => panic!("Expected StreamComplete event"),
1306        }
1307
1308        // Verify payload
1309        let payload = decoder.get_complete_payload().unwrap();
1310        assert_eq!(payload, &[1, 2, 3, 4, 5, 6, 7, 8]);
1311    }
1312
1313    #[test]
1314    fn test_streaming_decoder_checksum_validation() {
1315        let config = StreamingConfig::new().with_checksums(true);
1316        let mut encoder = StreamingEncoder::with_config(config.clone());
1317        let mut decoder = StreamingDecoder::with_config(config);
1318
1319        encoder.begin_stream().unwrap();
1320        let data = vec![1, 2, 3, 4, 5];
1321        let mut chunk_bytes = encoder.write_chunk(&data).unwrap();
1322
1323        // Corrupt the checksum
1324        if chunk_bytes.len() > 10 {
1325            chunk_bytes[5] ^= 0xFF;
1326        }
1327
1328        decoder
1329            .feed_frame(&encoder.encode_frame(&StreamingFrame::begin()).unwrap())
1330            .unwrap();
1331        let result = decoder.feed_frame(&chunk_bytes);
1332        assert!(matches!(
1333            result,
1334            Err(StreamingError::ChecksumMismatch { .. })
1335        ));
1336    }
1337
1338    #[test]
1339    fn test_streaming_decoder_invalid_frame_bytes() {
1340        let mut decoder = StreamingDecoder::new();
1341        let invalid_bytes = vec![0xFF, 0x00]; // Too short
1342        let result = decoder.feed_frame(&invalid_bytes);
1343        assert!(result.is_err());
1344    }
1345
1346    #[test]
1347    fn test_backpressure_controller_new() {
1348        let controller = BackpressureController::new();
1349        assert_eq!(controller.window_size(), 65536);
1350        assert_eq!(controller.bytes_in_flight(), 0);
1351        assert!(controller.can_send());
1352    }
1353
1354    #[test]
1355    fn test_backpressure_controller_with_window_size() {
1356        let controller = BackpressureController::with_window_size(8192);
1357        assert_eq!(controller.window_size(), 8192);
1358        assert_eq!(controller.bytes_in_flight(), 0);
1359    }
1360
1361    #[test]
1362    fn test_backpressure_controller_can_send() {
1363        let mut controller = BackpressureController::with_window_size(1000);
1364        assert!(controller.can_send());
1365
1366        controller.on_chunk_sent(500);
1367        assert!(controller.can_send());
1368
1369        controller.on_chunk_sent(500);
1370        assert!(!controller.can_send());
1371    }
1372
1373    #[test]
1374    fn test_backpressure_controller_available_window() {
1375        let mut controller = BackpressureController::with_window_size(1000);
1376        assert_eq!(controller.available_window(), 1000);
1377
1378        controller.on_chunk_sent(300);
1379        assert_eq!(controller.available_window(), 700);
1380
1381        controller.on_chunk_sent(400);
1382        assert_eq!(controller.available_window(), 300);
1383    }
1384
1385    #[test]
1386    fn test_backpressure_controller_on_chunk_sent() {
1387        let mut controller = BackpressureController::new();
1388        assert_eq!(controller.bytes_in_flight(), 0);
1389
1390        controller.on_chunk_sent(100);
1391        assert_eq!(controller.bytes_in_flight(), 100);
1392
1393        controller.on_chunk_sent(200);
1394        assert_eq!(controller.bytes_in_flight(), 300);
1395    }
1396
1397    #[test]
1398    fn test_backpressure_controller_on_chunk_acked() {
1399        let mut controller = BackpressureController::new();
1400        controller.on_chunk_sent(500);
1401        assert_eq!(controller.bytes_in_flight(), 500);
1402
1403        controller.on_chunk_acked(200);
1404        assert_eq!(controller.bytes_in_flight(), 300);
1405
1406        controller.on_chunk_acked(300);
1407        assert_eq!(controller.bytes_in_flight(), 0);
1408    }
1409
1410    #[test]
1411    fn test_backpressure_controller_reset() {
1412        let mut controller = BackpressureController::new();
1413        controller.on_chunk_sent(1000);
1414        assert_eq!(controller.bytes_in_flight(), 1000);
1415
1416        controller.reset();
1417        assert_eq!(controller.bytes_in_flight(), 0);
1418        assert!(controller.can_send());
1419    }
1420
1421    #[test]
1422    fn test_backpressure_controller_saturating_add() {
1423        let mut controller = BackpressureController::with_window_size(100);
1424        controller.on_chunk_sent(usize::MAX);
1425        // Should saturate, not overflow
1426        assert!(controller.bytes_in_flight() > 0);
1427    }
1428
1429    #[test]
1430    fn test_backpressure_controller_saturating_sub() {
1431        let mut controller = BackpressureController::new();
1432        controller.on_chunk_sent(100);
1433        controller.on_chunk_acked(200); // Ack more than sent
1434                                        // Should saturate at 0, not underflow
1435        assert_eq!(controller.bytes_in_flight(), 0);
1436    }
1437
1438    #[test]
1439    fn test_backpressure_controller_flow_control_scenario() {
1440        let mut controller = BackpressureController::with_window_size(10000);
1441
1442        // Send chunks until window is full
1443        let chunk_size = 2000;
1444        let mut chunks_sent = 0;
1445
1446        while controller.can_send() && controller.available_window() >= chunk_size {
1447            controller.on_chunk_sent(chunk_size);
1448            chunks_sent += 1;
1449        }
1450
1451        assert_eq!(chunks_sent, 5); // 5 * 2000 = 10000
1452        assert!(!controller.can_send());
1453
1454        // Acknowledge some chunks
1455        controller.on_chunk_acked(chunk_size);
1456        controller.on_chunk_acked(chunk_size);
1457
1458        // Should be able to send again
1459        assert!(controller.can_send());
1460        assert_eq!(controller.available_window(), 4000);
1461    }
1462
1463    #[test]
1464    fn test_backpressure_controller_default() {
1465        let controller = BackpressureController::default();
1466        assert_eq!(controller.window_size(), 65536);
1467        assert_eq!(controller.bytes_in_flight(), 0);
1468    }
1469}