1use super::error::BinaryError;
7
8#[derive(Debug, Clone, PartialEq)]
10pub struct StreamingConfig {
11 pub chunk_size: usize,
13 pub enable_compression: bool,
15 pub enable_checksums: bool,
17}
18
19impl StreamingConfig {
20 pub fn new() -> Self {
22 Self {
23 chunk_size: 4096, enable_compression: false,
25 enable_checksums: true,
26 }
27 }
28
29 pub fn with_chunk_size(mut self, size: usize) -> Self {
31 self.chunk_size = size;
32 self
33 }
34
35 pub fn with_compression(mut self, enabled: bool) -> Self {
37 self.enable_compression = enabled;
38 self
39 }
40
41 pub fn with_checksums(mut self, enabled: bool) -> Self {
43 self.enable_checksums = enabled;
44 self
45 }
46}
47
48impl Default for StreamingConfig {
49 fn default() -> Self {
50 Self::new()
51 }
52}
53
54#[derive(Debug, Clone, PartialEq, Eq)]
56pub enum StreamingState {
57 Idle,
59 Streaming {
61 bytes_sent: usize,
63 chunks_sent: usize,
65 },
66 Complete,
68 Error(String),
70}
71
72pub struct StreamingEncoder {
74 config: StreamingConfig,
75 state: StreamingState,
76}
77
78impl StreamingEncoder {
79 pub fn new() -> Self {
81 Self {
82 config: StreamingConfig::new(),
83 state: StreamingState::Idle,
84 }
85 }
86
87 pub fn with_config(config: StreamingConfig) -> Self {
89 Self {
90 config,
91 state: StreamingState::Idle,
92 }
93 }
94
95 pub fn begin_stream(&mut self) -> Result<Vec<u8>, StreamingError> {
97 match &self.state {
98 StreamingState::Idle | StreamingState::Complete | StreamingState::Error(_) => {
99 self.state = StreamingState::Streaming {
100 bytes_sent: 0,
101 chunks_sent: 0,
102 };
103 let frame = StreamingFrame::begin();
104 self.encode_frame(&frame)
105 }
106 StreamingState::Streaming { .. } => Err(StreamingError::UnexpectedFrame {
107 expected: FrameType::Chunk,
108 found: FrameType::Begin,
109 }),
110 }
111 }
112
113 pub fn write_chunk(&mut self, data: &[u8]) -> Result<Vec<u8>, StreamingError> {
115 match &self.state {
116 StreamingState::Streaming {
117 bytes_sent,
118 chunks_sent,
119 } => {
120 if data.len() > self.config.chunk_size {
121 return Err(StreamingError::ChunkSizeExceeded {
122 size: data.len(),
123 max: self.config.chunk_size,
124 });
125 }
126
127 let has_more = false; let frame = StreamingFrame::chunk(data.to_vec(), has_more);
129
130 self.state = StreamingState::Streaming {
132 bytes_sent: bytes_sent + data.len(),
133 chunks_sent: chunks_sent + 1,
134 };
135
136 self.encode_frame(&frame)
137 }
138 StreamingState::Idle => Err(StreamingError::StreamNotStarted),
139 StreamingState::Complete => Err(StreamingError::StreamAlreadyComplete),
140 StreamingState::Error(_msg) => Err(StreamingError::UnexpectedFrame {
141 expected: FrameType::Chunk,
142 found: FrameType::Error,
143 }),
144 }
145 }
146
147 pub fn end_stream(&mut self) -> Result<Vec<u8>, StreamingError> {
149 match &self.state {
150 StreamingState::Streaming { .. } => {
151 self.state = StreamingState::Complete;
152 let frame = StreamingFrame::end();
153 self.encode_frame(&frame)
154 }
155 StreamingState::Idle => Err(StreamingError::StreamNotStarted),
156 StreamingState::Complete => Err(StreamingError::StreamAlreadyComplete),
157 StreamingState::Error(_) => Err(StreamingError::UnexpectedFrame {
158 expected: FrameType::End,
159 found: FrameType::Error,
160 }),
161 }
162 }
163
164 pub fn error_frame(&mut self, error: &str) -> Result<Vec<u8>, StreamingError> {
166 self.state = StreamingState::Error(error.to_string());
167 let frame = StreamingFrame::error(error.to_string());
168 self.encode_frame(&frame)
169 }
170
171 pub fn state(&self) -> &StreamingState {
173 &self.state
174 }
175
176 fn encode_frame(&self, frame: &StreamingFrame) -> Result<Vec<u8>, StreamingError> {
178 let mut bytes = Vec::new();
179
180 bytes.push(frame.frame_type.to_u8());
182
183 bytes.push(frame.flags.to_u8());
185
186 bytes.extend(super::varint::encode(frame.chunk_size as i64));
188
189 if frame.frame_type == FrameType::Chunk && self.config.enable_checksums {
191 bytes.extend(&frame.checksum.to_le_bytes());
192 } else {
193 bytes.extend(&[0u8; 4]);
194 }
195
196 bytes.extend(&frame.payload);
198
199 Ok(bytes)
200 }
201}
202
203impl Default for StreamingEncoder {
204 fn default() -> Self {
205 Self::new()
206 }
207}
208
209#[derive(Debug, Clone, PartialEq)]
211pub enum StreamingEvent {
212 StreamStarted,
214 ChunkReceived {
216 bytes: usize,
218 },
219 StreamComplete {
221 total_bytes: usize,
223 },
224 StreamError {
226 message: String,
228 },
229}
230
231pub struct StreamingDecoder {
233 config: StreamingConfig,
234 state: StreamingState,
235 buffer: Vec<u8>,
236}
237
238impl StreamingDecoder {
239 pub fn new() -> Self {
241 Self {
242 config: StreamingConfig::new(),
243 state: StreamingState::Idle,
244 buffer: Vec::new(),
245 }
246 }
247
248 pub fn with_config(config: StreamingConfig) -> Self {
250 Self {
251 config,
252 state: StreamingState::Idle,
253 buffer: Vec::new(),
254 }
255 }
256
257 pub fn feed_frame(&mut self, frame_bytes: &[u8]) -> Result<StreamingEvent, StreamingError> {
259 let frame = self.decode_frame(frame_bytes)?;
260
261 match frame.frame_type {
262 FrameType::Begin => match &self.state {
263 StreamingState::Idle | StreamingState::Complete | StreamingState::Error(_) => {
264 self.state = StreamingState::Streaming {
265 bytes_sent: 0,
266 chunks_sent: 0,
267 };
268 self.buffer.clear();
269 Ok(StreamingEvent::StreamStarted)
270 }
271 StreamingState::Streaming { .. } => Err(StreamingError::UnexpectedFrame {
272 expected: FrameType::Chunk,
273 found: FrameType::Begin,
274 }),
275 },
276 FrameType::Chunk => {
277 match &self.state {
278 StreamingState::Streaming {
279 bytes_sent,
280 chunks_sent,
281 } => {
282 if self.config.enable_checksums {
284 frame.validate_checksum()?;
285 }
286
287 let chunk_size = frame.payload.len();
289 self.buffer.extend(&frame.payload);
290
291 self.state = StreamingState::Streaming {
293 bytes_sent: bytes_sent + chunk_size,
294 chunks_sent: chunks_sent + 1,
295 };
296
297 Ok(StreamingEvent::ChunkReceived { bytes: chunk_size })
298 }
299 StreamingState::Idle => Err(StreamingError::StreamNotStarted),
300 StreamingState::Complete => Err(StreamingError::StreamAlreadyComplete),
301 StreamingState::Error(_) => Err(StreamingError::UnexpectedFrame {
302 expected: FrameType::Chunk,
303 found: FrameType::Error,
304 }),
305 }
306 }
307 FrameType::End => match &self.state {
308 StreamingState::Streaming { bytes_sent, .. } => {
309 let total_bytes = *bytes_sent;
310 self.state = StreamingState::Complete;
311 Ok(StreamingEvent::StreamComplete { total_bytes })
312 }
313 StreamingState::Idle => Err(StreamingError::StreamNotStarted),
314 StreamingState::Complete => Err(StreamingError::StreamAlreadyComplete),
315 StreamingState::Error(_) => Err(StreamingError::UnexpectedFrame {
316 expected: FrameType::End,
317 found: FrameType::Error,
318 }),
319 },
320 FrameType::Error => {
321 let message = String::from_utf8_lossy(&frame.payload).to_string();
322 self.state = StreamingState::Error(message.clone());
323 Ok(StreamingEvent::StreamError { message })
324 }
325 }
326 }
327
328 pub fn get_complete_payload(&self) -> Option<&[u8]> {
330 match &self.state {
331 StreamingState::Complete => Some(&self.buffer),
332 _ => None,
333 }
334 }
335
336 pub fn state(&self) -> &StreamingState {
338 &self.state
339 }
340
341 fn decode_frame(&self, bytes: &[u8]) -> Result<StreamingFrame, StreamingError> {
343 if bytes.len() < 7 {
344 return Err(StreamingError::BinaryError(BinaryError::UnexpectedEof {
345 expected: 7,
346 found: bytes.len(),
347 }));
348 }
349
350 let mut pos = 0;
351
352 let frame_type = FrameType::from_u8(bytes[pos])?;
354 pos += 1;
355
356 let flags = FrameFlags::from_u8(bytes[pos]);
358 pos += 1;
359
360 let (chunk_size_i64, varint_len) = super::varint::decode(&bytes[pos..])?;
362 let chunk_size = chunk_size_i64 as usize;
363 pos += varint_len;
364
365 if pos + 4 > bytes.len() {
367 return Err(StreamingError::BinaryError(BinaryError::UnexpectedEof {
368 expected: pos + 4,
369 found: bytes.len(),
370 }));
371 }
372 let checksum =
373 u32::from_le_bytes([bytes[pos], bytes[pos + 1], bytes[pos + 2], bytes[pos + 3]]);
374 pos += 4;
375
376 if pos + chunk_size > bytes.len() {
378 return Err(StreamingError::BinaryError(BinaryError::UnexpectedEof {
379 expected: pos + chunk_size,
380 found: bytes.len(),
381 }));
382 }
383 let payload = bytes[pos..pos + chunk_size].to_vec();
384
385 Ok(StreamingFrame {
386 frame_type,
387 flags,
388 chunk_size,
389 checksum,
390 payload,
391 })
392 }
393}
394
395impl Default for StreamingDecoder {
396 fn default() -> Self {
397 Self::new()
398 }
399}
400
401#[derive(Debug, Clone, PartialEq)]
403pub struct BackpressureController {
404 window_size: usize,
406 bytes_in_flight: usize,
408}
409
410impl BackpressureController {
411 pub fn new() -> Self {
413 Self {
414 window_size: 65536, bytes_in_flight: 0,
416 }
417 }
418
419 pub fn with_window_size(window_size: usize) -> Self {
421 Self {
422 window_size,
423 bytes_in_flight: 0,
424 }
425 }
426
427 pub fn can_send(&self) -> bool {
429 self.bytes_in_flight < self.window_size
430 }
431
432 pub fn available_window(&self) -> usize {
434 self.window_size.saturating_sub(self.bytes_in_flight)
435 }
436
437 pub fn on_chunk_sent(&mut self, size: usize) {
439 self.bytes_in_flight = self.bytes_in_flight.saturating_add(size);
440 }
441
442 pub fn on_chunk_acked(&mut self, size: usize) {
444 self.bytes_in_flight = self.bytes_in_flight.saturating_sub(size);
445 }
446
447 pub fn window_size(&self) -> usize {
449 self.window_size
450 }
451
452 pub fn bytes_in_flight(&self) -> usize {
454 self.bytes_in_flight
455 }
456
457 pub fn reset(&mut self) {
459 self.bytes_in_flight = 0;
460 }
461}
462
463impl Default for BackpressureController {
464 fn default() -> Self {
465 Self::new()
466 }
467}
468
469#[repr(u8)]
471#[derive(Debug, Clone, Copy, PartialEq, Eq)]
472pub enum FrameType {
473 Begin = 0xA0,
475 Chunk = 0xA1,
477 End = 0xA2,
479 Error = 0xA3,
481}
482
483impl FrameType {
484 pub fn from_u8(byte: u8) -> Result<Self, StreamingError> {
486 match byte {
487 0xA0 => Ok(FrameType::Begin),
488 0xA1 => Ok(FrameType::Chunk),
489 0xA2 => Ok(FrameType::End),
490 0xA3 => Ok(FrameType::Error),
491 _ => Err(StreamingError::InvalidFrameType { found: byte }),
492 }
493 }
494
495 pub fn to_u8(self) -> u8 {
497 self as u8
498 }
499}
500
501#[derive(Debug, Clone, Copy, PartialEq, Eq)]
503pub struct FrameFlags {
504 pub has_more: bool,
506 pub compressed: bool,
508 }
510
511impl FrameFlags {
512 pub fn new() -> Self {
514 Self {
515 has_more: false,
516 compressed: false,
517 }
518 }
519
520 pub fn from_u8(byte: u8) -> Self {
522 Self {
523 has_more: (byte & 0x01) != 0,
524 compressed: (byte & 0x02) != 0,
525 }
526 }
527
528 pub fn to_u8(self) -> u8 {
530 let mut byte = 0u8;
531 if self.has_more {
532 byte |= 0x01;
533 }
534 if self.compressed {
535 byte |= 0x02;
536 }
537 byte
538 }
539}
540
541impl Default for FrameFlags {
542 fn default() -> Self {
543 Self::new()
544 }
545}
546
547#[derive(Debug, Clone, PartialEq)]
557pub struct StreamingFrame {
558 pub frame_type: FrameType,
560 pub flags: FrameFlags,
562 pub chunk_size: usize,
564 pub checksum: u32,
566 pub payload: Vec<u8>,
568}
569
570impl StreamingFrame {
571 pub fn begin() -> Self {
573 Self {
574 frame_type: FrameType::Begin,
575 flags: FrameFlags::new(),
576 chunk_size: 0,
577 checksum: 0,
578 payload: Vec::new(),
579 }
580 }
581
582 pub fn chunk(payload: Vec<u8>, has_more: bool) -> Self {
584 let chunk_size = payload.len();
585 let checksum = Self::compute_xor_checksum(&payload);
586 let mut flags = FrameFlags::new();
587 flags.has_more = has_more;
588
589 Self {
590 frame_type: FrameType::Chunk,
591 flags,
592 chunk_size,
593 checksum,
594 payload,
595 }
596 }
597
598 pub fn end() -> Self {
600 Self {
601 frame_type: FrameType::End,
602 flags: FrameFlags::new(),
603 chunk_size: 0,
604 checksum: 0,
605 payload: Vec::new(),
606 }
607 }
608
609 pub fn error(message: String) -> Self {
611 let payload = message.into_bytes();
612 let chunk_size = payload.len();
613
614 Self {
615 frame_type: FrameType::Error,
616 flags: FrameFlags::new(),
617 chunk_size,
618 checksum: 0,
619 payload,
620 }
621 }
622
623 pub fn compute_xor_checksum(data: &[u8]) -> u32 {
625 let mut checksum = 0u32;
626 for chunk in data.chunks(4) {
627 let mut word = 0u32;
628 for (i, &byte) in chunk.iter().enumerate() {
629 word |= (byte as u32) << (i * 8);
630 }
631 checksum ^= word;
632 }
633 checksum
634 }
635
636 pub fn validate_checksum(&self) -> Result<(), StreamingError> {
638 if self.frame_type == FrameType::Chunk {
639 let computed = Self::compute_xor_checksum(&self.payload);
640 if computed != self.checksum {
641 return Err(StreamingError::ChecksumMismatch {
642 expected: self.checksum,
643 found: computed,
644 });
645 }
646 }
647 Ok(())
648 }
649}
650
651#[derive(Debug, Clone, PartialEq)]
653pub enum StreamingError {
654 InvalidFrameType {
656 found: u8,
658 },
659
660 ChecksumMismatch {
662 expected: u32,
664 found: u32,
666 },
667
668 UnexpectedFrame {
670 expected: FrameType,
672 found: FrameType,
674 },
675
676 StreamNotStarted,
678
679 StreamAlreadyComplete,
681
682 ChunkSizeExceeded {
684 size: usize,
686 max: usize,
688 },
689
690 BinaryError(BinaryError),
692}
693
694impl std::fmt::Display for StreamingError {
695 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
696 match self {
697 StreamingError::InvalidFrameType { found } => {
698 write!(f, "Invalid frame type: 0x{:02X}", found)
699 }
700 StreamingError::ChecksumMismatch { expected, found } => {
701 write!(
702 f,
703 "Checksum mismatch: expected 0x{:08X}, found 0x{:08X}",
704 expected, found
705 )
706 }
707 StreamingError::UnexpectedFrame { expected, found } => {
708 write!(
709 f,
710 "Unexpected frame: expected {:?}, found {:?}",
711 expected, found
712 )
713 }
714 StreamingError::StreamNotStarted => {
715 write!(f, "Stream not started: no BEGIN frame received")
716 }
717 StreamingError::StreamAlreadyComplete => {
718 write!(f, "Stream already complete: END frame already received")
719 }
720 StreamingError::ChunkSizeExceeded { size, max } => {
721 write!(
722 f,
723 "Chunk size exceeded: size {} bytes exceeds maximum {} bytes",
724 size, max
725 )
726 }
727 StreamingError::BinaryError(err) => {
728 write!(f, "Binary error: {}", err)
729 }
730 }
731 }
732}
733
734impl std::error::Error for StreamingError {}
735
736impl From<BinaryError> for StreamingError {
737 fn from(err: BinaryError) -> Self {
738 StreamingError::BinaryError(err)
739 }
740}
741
742#[cfg(test)]
743mod tests {
744 #![allow(clippy::approx_constant)]
745
746 use super::*;
747
748 #[test]
749 fn test_frame_type_from_u8() {
750 assert_eq!(FrameType::from_u8(0xA0).unwrap(), FrameType::Begin);
751 assert_eq!(FrameType::from_u8(0xA1).unwrap(), FrameType::Chunk);
752 assert_eq!(FrameType::from_u8(0xA2).unwrap(), FrameType::End);
753 assert_eq!(FrameType::from_u8(0xA3).unwrap(), FrameType::Error);
754 }
755
756 #[test]
757 fn test_frame_type_from_u8_invalid() {
758 assert!(FrameType::from_u8(0x00).is_err());
759 assert!(FrameType::from_u8(0xFF).is_err());
760 assert!(FrameType::from_u8(0xA4).is_err());
761 }
762
763 #[test]
764 fn test_frame_type_to_u8() {
765 assert_eq!(FrameType::Begin.to_u8(), 0xA0);
766 assert_eq!(FrameType::Chunk.to_u8(), 0xA1);
767 assert_eq!(FrameType::End.to_u8(), 0xA2);
768 assert_eq!(FrameType::Error.to_u8(), 0xA3);
769 }
770
771 #[test]
772 fn test_frame_type_round_trip() {
773 let types = vec![
774 FrameType::Begin,
775 FrameType::Chunk,
776 FrameType::End,
777 FrameType::Error,
778 ];
779
780 for frame_type in types {
781 let byte = frame_type.to_u8();
782 let parsed = FrameType::from_u8(byte).unwrap();
783 assert_eq!(parsed, frame_type);
784 }
785 }
786
787 #[test]
788 fn test_frame_flags_default() {
789 let flags = FrameFlags::new();
790 assert!(!flags.has_more);
791 assert!(!flags.compressed);
792 }
793
794 #[test]
795 fn test_frame_flags_from_u8() {
796 let flags = FrameFlags::from_u8(0x00);
797 assert!(!flags.has_more);
798 assert!(!flags.compressed);
799
800 let flags = FrameFlags::from_u8(0x01);
801 assert!(flags.has_more);
802 assert!(!flags.compressed);
803
804 let flags = FrameFlags::from_u8(0x02);
805 assert!(!flags.has_more);
806 assert!(flags.compressed);
807
808 let flags = FrameFlags::from_u8(0x03);
809 assert!(flags.has_more);
810 assert!(flags.compressed);
811 }
812
813 #[test]
814 fn test_frame_flags_to_u8() {
815 let mut flags = FrameFlags::new();
816 assert_eq!(flags.to_u8(), 0x00);
817
818 flags.has_more = true;
819 assert_eq!(flags.to_u8(), 0x01);
820
821 flags.has_more = false;
822 flags.compressed = true;
823 assert_eq!(flags.to_u8(), 0x02);
824
825 flags.has_more = true;
826 assert_eq!(flags.to_u8(), 0x03);
827 }
828
829 #[test]
830 fn test_frame_flags_round_trip() {
831 for byte in 0..=0xFF {
832 let flags = FrameFlags::from_u8(byte);
833 let back = flags.to_u8();
834 assert_eq!(back, byte & 0x03);
836 }
837 }
838
839 #[test]
840 fn test_streaming_frame_begin() {
841 let frame = StreamingFrame::begin();
842 assert_eq!(frame.frame_type, FrameType::Begin);
843 assert_eq!(frame.chunk_size, 0);
844 assert_eq!(frame.checksum, 0);
845 assert!(frame.payload.is_empty());
846 }
847
848 #[test]
849 fn test_streaming_frame_chunk() {
850 let payload = vec![1, 2, 3, 4, 5];
851 let frame = StreamingFrame::chunk(payload.clone(), true);
852 assert_eq!(frame.frame_type, FrameType::Chunk);
853 assert_eq!(frame.chunk_size, 5);
854 assert!(frame.flags.has_more);
855 assert_eq!(frame.payload, payload);
856 assert_ne!(frame.checksum, 0);
857 }
858
859 #[test]
860 fn test_streaming_frame_end() {
861 let frame = StreamingFrame::end();
862 assert_eq!(frame.frame_type, FrameType::End);
863 assert_eq!(frame.chunk_size, 0);
864 assert_eq!(frame.checksum, 0);
865 assert!(frame.payload.is_empty());
866 }
867
868 #[test]
869 fn test_streaming_frame_error() {
870 let message = "Test error".to_string();
871 let frame = StreamingFrame::error(message.clone());
872 assert_eq!(frame.frame_type, FrameType::Error);
873 assert_eq!(frame.chunk_size, message.len());
874 assert_eq!(frame.payload, message.as_bytes());
875 }
876
877 #[test]
878 fn test_compute_xor_checksum_empty() {
879 let checksum = StreamingFrame::compute_xor_checksum(&[]);
880 assert_eq!(checksum, 0);
881 }
882
883 #[test]
884 fn test_compute_xor_checksum_single_byte() {
885 let checksum = StreamingFrame::compute_xor_checksum(&[0x42]);
886 assert_eq!(checksum, 0x42);
887 }
888
889 #[test]
890 fn test_compute_xor_checksum_four_bytes() {
891 let data = vec![0x01, 0x02, 0x03, 0x04];
892 let checksum = StreamingFrame::compute_xor_checksum(&data);
893 assert_eq!(checksum, 0x04030201);
895 }
896
897 #[test]
898 fn test_compute_xor_checksum_multiple_words() {
899 let data = vec![0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08];
900 let checksum = StreamingFrame::compute_xor_checksum(&data);
901 assert_eq!(checksum, 0x04030201 ^ 0x08070605);
903 }
904
905 #[test]
906 fn test_compute_xor_checksum_partial_word() {
907 let data = vec![0x01, 0x02, 0x03, 0x04, 0x05];
908 let checksum = StreamingFrame::compute_xor_checksum(&data);
909 assert_eq!(checksum, 0x04030201 ^ 0x00000005);
911 }
912
913 #[test]
914 fn test_validate_checksum_chunk_valid() {
915 let payload = vec![1, 2, 3, 4, 5];
916 let frame = StreamingFrame::chunk(payload, false);
917 assert!(frame.validate_checksum().is_ok());
918 }
919
920 #[test]
921 fn test_validate_checksum_chunk_invalid() {
922 let payload = vec![1, 2, 3, 4, 5];
923 let mut frame = StreamingFrame::chunk(payload, false);
924 frame.checksum = 0xDEADBEEF; assert!(frame.validate_checksum().is_err());
926 }
927
928 #[test]
929 fn test_validate_checksum_non_chunk() {
930 let frame = StreamingFrame::begin();
931 assert!(frame.validate_checksum().is_ok());
932
933 let frame = StreamingFrame::end();
934 assert!(frame.validate_checksum().is_ok());
935
936 let frame = StreamingFrame::error("test".to_string());
937 assert!(frame.validate_checksum().is_ok());
938 }
939
940 #[test]
941 fn test_streaming_error_display() {
942 let err = StreamingError::InvalidFrameType { found: 0xFF };
943 assert!(format!("{}", err).contains("0xFF"));
944
945 let err = StreamingError::ChecksumMismatch {
946 expected: 0x1234,
947 found: 0x5678,
948 };
949 assert!(format!("{}", err).contains("0x00001234"));
950 assert!(format!("{}", err).contains("0x00005678"));
951
952 let err = StreamingError::StreamNotStarted;
953 assert!(format!("{}", err).contains("not started"));
954
955 let err = StreamingError::StreamAlreadyComplete;
956 assert!(format!("{}", err).contains("already complete"));
957 }
958
959 #[test]
960 fn test_streaming_config_default() {
961 let config = StreamingConfig::new();
962 assert_eq!(config.chunk_size, 4096);
963 assert!(!config.enable_compression);
964 assert!(config.enable_checksums);
965 }
966
967 #[test]
968 fn test_streaming_config_builder() {
969 let config = StreamingConfig::new()
970 .with_chunk_size(8192)
971 .with_compression(true)
972 .with_checksums(false);
973
974 assert_eq!(config.chunk_size, 8192);
975 assert!(config.enable_compression);
976 assert!(!config.enable_checksums);
977 }
978
979 #[test]
980 fn test_streaming_config_default_trait() {
981 let config = StreamingConfig::default();
982 assert_eq!(config.chunk_size, 4096);
983 assert!(!config.enable_compression);
984 assert!(config.enable_checksums);
985 }
986
987 #[test]
988 fn test_streaming_encoder_new() {
989 let encoder = StreamingEncoder::new();
990 assert_eq!(encoder.state, StreamingState::Idle);
991 }
992
993 #[test]
994 fn test_streaming_encoder_begin_stream() {
995 let mut encoder = StreamingEncoder::new();
996 let result = encoder.begin_stream();
997 assert!(result.is_ok());
998
999 match encoder.state {
1000 StreamingState::Streaming {
1001 bytes_sent,
1002 chunks_sent,
1003 } => {
1004 assert_eq!(bytes_sent, 0);
1005 assert_eq!(chunks_sent, 0);
1006 }
1007 _ => panic!("Expected Streaming state"),
1008 }
1009 }
1010
1011 #[test]
1012 fn test_streaming_encoder_write_chunk() {
1013 let mut encoder = StreamingEncoder::new();
1014 encoder.begin_stream().unwrap();
1015
1016 let data = vec![1, 2, 3, 4, 5];
1017 let result = encoder.write_chunk(&data);
1018 assert!(result.is_ok());
1019
1020 match encoder.state {
1021 StreamingState::Streaming {
1022 bytes_sent,
1023 chunks_sent,
1024 } => {
1025 assert_eq!(bytes_sent, 5);
1026 assert_eq!(chunks_sent, 1);
1027 }
1028 _ => panic!("Expected Streaming state"),
1029 }
1030 }
1031
1032 #[test]
1033 fn test_streaming_encoder_write_chunk_without_begin() {
1034 let mut encoder = StreamingEncoder::new();
1035 let data = vec![1, 2, 3];
1036 let result = encoder.write_chunk(&data);
1037 assert!(matches!(result, Err(StreamingError::StreamNotStarted)));
1038 }
1039
1040 #[test]
1041 fn test_streaming_encoder_write_chunk_exceeds_size() {
1042 let config = StreamingConfig::new().with_chunk_size(10);
1043 let mut encoder = StreamingEncoder::with_config(config);
1044 encoder.begin_stream().unwrap();
1045
1046 let data = vec![0u8; 20]; let result = encoder.write_chunk(&data);
1048 assert!(matches!(
1049 result,
1050 Err(StreamingError::ChunkSizeExceeded { .. })
1051 ));
1052 }
1053
1054 #[test]
1055 fn test_streaming_encoder_end_stream() {
1056 let mut encoder = StreamingEncoder::new();
1057 encoder.begin_stream().unwrap();
1058
1059 let result = encoder.end_stream();
1060 assert!(result.is_ok());
1061 assert_eq!(encoder.state, StreamingState::Complete);
1062 }
1063
1064 #[test]
1065 fn test_streaming_encoder_end_stream_without_begin() {
1066 let mut encoder = StreamingEncoder::new();
1067 let result = encoder.end_stream();
1068 assert!(matches!(result, Err(StreamingError::StreamNotStarted)));
1069 }
1070
1071 #[test]
1072 fn test_streaming_encoder_error_frame() {
1073 let mut encoder = StreamingEncoder::new();
1074 let result = encoder.error_frame("Test error");
1075 assert!(result.is_ok());
1076
1077 match encoder.state {
1078 StreamingState::Error(msg) => {
1079 assert_eq!(msg, "Test error");
1080 }
1081 _ => panic!("Expected Error state"),
1082 }
1083 }
1084
1085 #[test]
1086 fn test_streaming_encoder_full_flow() {
1087 let mut encoder = StreamingEncoder::new();
1088
1089 let begin_bytes = encoder.begin_stream().unwrap();
1091 assert!(!begin_bytes.is_empty());
1092
1093 let chunk1 = vec![1, 2, 3];
1095 let chunk1_bytes = encoder.write_chunk(&chunk1).unwrap();
1096 assert!(!chunk1_bytes.is_empty());
1097
1098 let chunk2 = vec![4, 5, 6];
1099 let chunk2_bytes = encoder.write_chunk(&chunk2).unwrap();
1100 assert!(!chunk2_bytes.is_empty());
1101
1102 let end_bytes = encoder.end_stream().unwrap();
1104 assert!(!end_bytes.is_empty());
1105
1106 assert_eq!(encoder.state, StreamingState::Complete);
1107 }
1108
1109 #[test]
1110 fn test_streaming_encoder_encode_frame_begin() {
1111 let encoder = StreamingEncoder::new();
1112 let frame = StreamingFrame::begin();
1113 let bytes = encoder.encode_frame(&frame).unwrap();
1114
1115 assert!(bytes.len() >= 7);
1117 assert_eq!(bytes[0], FrameType::Begin.to_u8());
1118 }
1119
1120 #[test]
1121 fn test_streaming_encoder_encode_frame_chunk() {
1122 let encoder = StreamingEncoder::new();
1123 let payload = vec![1, 2, 3, 4, 5];
1124 let frame = StreamingFrame::chunk(payload.clone(), false);
1125 let bytes = encoder.encode_frame(&frame).unwrap();
1126
1127 assert!(bytes.len() >= 7 + payload.len());
1129 assert_eq!(bytes[0], FrameType::Chunk.to_u8());
1130 }
1131
1132 #[test]
1133 fn test_streaming_encoder_multiple_chunks() {
1134 let mut encoder = StreamingEncoder::new();
1135 encoder.begin_stream().unwrap();
1136
1137 for i in 0..5 {
1138 let data = vec![i; 10];
1139 let result = encoder.write_chunk(&data);
1140 assert!(result.is_ok());
1141 }
1142
1143 match encoder.state {
1144 StreamingState::Streaming {
1145 bytes_sent,
1146 chunks_sent,
1147 } => {
1148 assert_eq!(bytes_sent, 50);
1149 assert_eq!(chunks_sent, 5);
1150 }
1151 _ => panic!("Expected Streaming state"),
1152 }
1153 }
1154
1155 #[test]
1156 fn test_streaming_decoder_new() {
1157 let decoder = StreamingDecoder::new();
1158 assert_eq!(decoder.state, StreamingState::Idle);
1159 assert!(decoder.buffer.is_empty());
1160 }
1161
1162 #[test]
1163 fn test_streaming_decoder_feed_begin() {
1164 let mut encoder = StreamingEncoder::new();
1165 let begin_bytes = encoder.begin_stream().unwrap();
1166
1167 let mut decoder = StreamingDecoder::new();
1168 let event = decoder.feed_frame(&begin_bytes).unwrap();
1169
1170 assert_eq!(event, StreamingEvent::StreamStarted);
1171 match decoder.state {
1172 StreamingState::Streaming { .. } => {}
1173 _ => panic!("Expected Streaming state"),
1174 }
1175 }
1176
1177 #[test]
1178 fn test_streaming_decoder_feed_chunk() {
1179 let mut encoder = StreamingEncoder::new();
1180 encoder.begin_stream().unwrap();
1181
1182 let data = vec![1, 2, 3, 4, 5];
1183 let chunk_bytes = encoder.write_chunk(&data).unwrap();
1184
1185 let mut decoder = StreamingDecoder::new();
1186 decoder
1187 .feed_frame(&encoder.encode_frame(&StreamingFrame::begin()).unwrap())
1188 .unwrap();
1189
1190 let event = decoder.feed_frame(&chunk_bytes).unwrap();
1191 assert_eq!(event, StreamingEvent::ChunkReceived { bytes: 5 });
1192 }
1193
1194 #[test]
1195 fn test_streaming_decoder_feed_end() {
1196 let mut encoder = StreamingEncoder::new();
1197 encoder.begin_stream().unwrap();
1198 let end_bytes = encoder.end_stream().unwrap();
1199
1200 let mut decoder = StreamingDecoder::new();
1201 decoder
1202 .feed_frame(&encoder.encode_frame(&StreamingFrame::begin()).unwrap())
1203 .unwrap();
1204
1205 let event = decoder.feed_frame(&end_bytes).unwrap();
1206 match event {
1207 StreamingEvent::StreamComplete { total_bytes } => {
1208 assert_eq!(total_bytes, 0);
1209 }
1210 _ => panic!("Expected StreamComplete event"),
1211 }
1212 }
1213
1214 #[test]
1215 fn test_streaming_decoder_feed_error() {
1216 let mut encoder = StreamingEncoder::new();
1217 let error_bytes = encoder.error_frame("Test error").unwrap();
1218
1219 let mut decoder = StreamingDecoder::new();
1220 let event = decoder.feed_frame(&error_bytes).unwrap();
1221
1222 match event {
1223 StreamingEvent::StreamError { message } => {
1224 assert_eq!(message, "Test error");
1225 }
1226 _ => panic!("Expected StreamError event"),
1227 }
1228 }
1229
1230 #[test]
1231 fn test_streaming_decoder_chunk_without_begin() {
1232 let mut encoder = StreamingEncoder::new();
1233 encoder.begin_stream().unwrap();
1234 let chunk_bytes = encoder.write_chunk(&[1, 2, 3]).unwrap();
1235
1236 let mut decoder = StreamingDecoder::new();
1237 let result = decoder.feed_frame(&chunk_bytes);
1238 assert!(matches!(result, Err(StreamingError::StreamNotStarted)));
1239 }
1240
1241 #[test]
1242 fn test_streaming_decoder_get_complete_payload() {
1243 let mut encoder = StreamingEncoder::new();
1244 encoder.begin_stream().unwrap();
1245
1246 let data1 = vec![1, 2, 3];
1247 let data2 = vec![4, 5, 6];
1248
1249 let chunk1_bytes = encoder.write_chunk(&data1).unwrap();
1250 let chunk2_bytes = encoder.write_chunk(&data2).unwrap();
1251 let end_bytes = encoder.end_stream().unwrap();
1252
1253 let mut decoder = StreamingDecoder::new();
1254 decoder
1255 .feed_frame(&encoder.encode_frame(&StreamingFrame::begin()).unwrap())
1256 .unwrap();
1257 decoder.feed_frame(&chunk1_bytes).unwrap();
1258 decoder.feed_frame(&chunk2_bytes).unwrap();
1259 decoder.feed_frame(&end_bytes).unwrap();
1260
1261 let payload = decoder.get_complete_payload().unwrap();
1262 assert_eq!(payload, &[1, 2, 3, 4, 5, 6]);
1263 }
1264
1265 #[test]
1266 fn test_streaming_decoder_get_complete_payload_before_end() {
1267 let mut decoder = StreamingDecoder::new();
1268 assert!(decoder.get_complete_payload().is_none());
1269
1270 let mut encoder = StreamingEncoder::new();
1271 let begin_bytes = encoder.begin_stream().unwrap();
1272 decoder.feed_frame(&begin_bytes).unwrap();
1273
1274 assert!(decoder.get_complete_payload().is_none());
1275 }
1276
1277 #[test]
1278 fn test_streaming_round_trip() {
1279 let mut encoder = StreamingEncoder::new();
1280 let mut decoder = StreamingDecoder::new();
1281
1282 let begin_bytes = encoder.begin_stream().unwrap();
1284 let event = decoder.feed_frame(&begin_bytes).unwrap();
1285 assert_eq!(event, StreamingEvent::StreamStarted);
1286
1287 let data1 = vec![1, 2, 3, 4, 5];
1289 let chunk1_bytes = encoder.write_chunk(&data1).unwrap();
1290 let event = decoder.feed_frame(&chunk1_bytes).unwrap();
1291 assert_eq!(event, StreamingEvent::ChunkReceived { bytes: 5 });
1292
1293 let data2 = vec![6, 7, 8];
1294 let chunk2_bytes = encoder.write_chunk(&data2).unwrap();
1295 let event = decoder.feed_frame(&chunk2_bytes).unwrap();
1296 assert_eq!(event, StreamingEvent::ChunkReceived { bytes: 3 });
1297
1298 let end_bytes = encoder.end_stream().unwrap();
1300 let event = decoder.feed_frame(&end_bytes).unwrap();
1301 match event {
1302 StreamingEvent::StreamComplete { total_bytes } => {
1303 assert_eq!(total_bytes, 8);
1304 }
1305 _ => panic!("Expected StreamComplete event"),
1306 }
1307
1308 let payload = decoder.get_complete_payload().unwrap();
1310 assert_eq!(payload, &[1, 2, 3, 4, 5, 6, 7, 8]);
1311 }
1312
1313 #[test]
1314 fn test_streaming_decoder_checksum_validation() {
1315 let config = StreamingConfig::new().with_checksums(true);
1316 let mut encoder = StreamingEncoder::with_config(config.clone());
1317 let mut decoder = StreamingDecoder::with_config(config);
1318
1319 encoder.begin_stream().unwrap();
1320 let data = vec![1, 2, 3, 4, 5];
1321 let mut chunk_bytes = encoder.write_chunk(&data).unwrap();
1322
1323 if chunk_bytes.len() > 10 {
1325 chunk_bytes[5] ^= 0xFF;
1326 }
1327
1328 decoder
1329 .feed_frame(&encoder.encode_frame(&StreamingFrame::begin()).unwrap())
1330 .unwrap();
1331 let result = decoder.feed_frame(&chunk_bytes);
1332 assert!(matches!(
1333 result,
1334 Err(StreamingError::ChecksumMismatch { .. })
1335 ));
1336 }
1337
1338 #[test]
1339 fn test_streaming_decoder_invalid_frame_bytes() {
1340 let mut decoder = StreamingDecoder::new();
1341 let invalid_bytes = vec![0xFF, 0x00]; let result = decoder.feed_frame(&invalid_bytes);
1343 assert!(result.is_err());
1344 }
1345
1346 #[test]
1347 fn test_backpressure_controller_new() {
1348 let controller = BackpressureController::new();
1349 assert_eq!(controller.window_size(), 65536);
1350 assert_eq!(controller.bytes_in_flight(), 0);
1351 assert!(controller.can_send());
1352 }
1353
1354 #[test]
1355 fn test_backpressure_controller_with_window_size() {
1356 let controller = BackpressureController::with_window_size(8192);
1357 assert_eq!(controller.window_size(), 8192);
1358 assert_eq!(controller.bytes_in_flight(), 0);
1359 }
1360
1361 #[test]
1362 fn test_backpressure_controller_can_send() {
1363 let mut controller = BackpressureController::with_window_size(1000);
1364 assert!(controller.can_send());
1365
1366 controller.on_chunk_sent(500);
1367 assert!(controller.can_send());
1368
1369 controller.on_chunk_sent(500);
1370 assert!(!controller.can_send());
1371 }
1372
1373 #[test]
1374 fn test_backpressure_controller_available_window() {
1375 let mut controller = BackpressureController::with_window_size(1000);
1376 assert_eq!(controller.available_window(), 1000);
1377
1378 controller.on_chunk_sent(300);
1379 assert_eq!(controller.available_window(), 700);
1380
1381 controller.on_chunk_sent(400);
1382 assert_eq!(controller.available_window(), 300);
1383 }
1384
1385 #[test]
1386 fn test_backpressure_controller_on_chunk_sent() {
1387 let mut controller = BackpressureController::new();
1388 assert_eq!(controller.bytes_in_flight(), 0);
1389
1390 controller.on_chunk_sent(100);
1391 assert_eq!(controller.bytes_in_flight(), 100);
1392
1393 controller.on_chunk_sent(200);
1394 assert_eq!(controller.bytes_in_flight(), 300);
1395 }
1396
1397 #[test]
1398 fn test_backpressure_controller_on_chunk_acked() {
1399 let mut controller = BackpressureController::new();
1400 controller.on_chunk_sent(500);
1401 assert_eq!(controller.bytes_in_flight(), 500);
1402
1403 controller.on_chunk_acked(200);
1404 assert_eq!(controller.bytes_in_flight(), 300);
1405
1406 controller.on_chunk_acked(300);
1407 assert_eq!(controller.bytes_in_flight(), 0);
1408 }
1409
1410 #[test]
1411 fn test_backpressure_controller_reset() {
1412 let mut controller = BackpressureController::new();
1413 controller.on_chunk_sent(1000);
1414 assert_eq!(controller.bytes_in_flight(), 1000);
1415
1416 controller.reset();
1417 assert_eq!(controller.bytes_in_flight(), 0);
1418 assert!(controller.can_send());
1419 }
1420
1421 #[test]
1422 fn test_backpressure_controller_saturating_add() {
1423 let mut controller = BackpressureController::with_window_size(100);
1424 controller.on_chunk_sent(usize::MAX);
1425 assert!(controller.bytes_in_flight() > 0);
1427 }
1428
1429 #[test]
1430 fn test_backpressure_controller_saturating_sub() {
1431 let mut controller = BackpressureController::new();
1432 controller.on_chunk_sent(100);
1433 controller.on_chunk_acked(200); assert_eq!(controller.bytes_in_flight(), 0);
1436 }
1437
1438 #[test]
1439 fn test_backpressure_controller_flow_control_scenario() {
1440 let mut controller = BackpressureController::with_window_size(10000);
1441
1442 let chunk_size = 2000;
1444 let mut chunks_sent = 0;
1445
1446 while controller.can_send() && controller.available_window() >= chunk_size {
1447 controller.on_chunk_sent(chunk_size);
1448 chunks_sent += 1;
1449 }
1450
1451 assert_eq!(chunks_sent, 5); assert!(!controller.can_send());
1453
1454 controller.on_chunk_acked(chunk_size);
1456 controller.on_chunk_acked(chunk_size);
1457
1458 assert!(controller.can_send());
1460 assert_eq!(controller.available_window(), 4000);
1461 }
1462
1463 #[test]
1464 fn test_backpressure_controller_default() {
1465 let controller = BackpressureController::default();
1466 assert_eq!(controller.window_size(), 65536);
1467 assert_eq!(controller.bytes_in_flight(), 0);
1468 }
1469}