1use std::io::{self, Read, Write};
12use std::sync::Arc;
13
14use oxiarc_core::cancel::CancellationToken;
15use oxiarc_core::progress::ProgressHandle;
16
17use crate::compress;
18use crate::crc32c::{crc32c, masked_crc32c};
19use crate::decompress;
20use crate::error::SnappyError;
21use crate::pool::{PoolInner, SnappyPool};
22
23const STREAM_IDENTIFIER: [u8; 10] = [0xFF, 0x06, 0x00, 0x00, 0x73, 0x4E, 0x61, 0x50, 0x70, 0x59];
25
26const CHUNK_TYPE_OXIARC_DICT: u8 = 0xFE;
29
30const OXIARC_DICT_MAGIC: &[u8] = b"OXIAD";
32
33const STREAM_BODY: [u8; 6] = [0x73, 0x4E, 0x61, 0x50, 0x70, 0x59];
35
36const CHUNK_TYPE_COMPRESSED: u8 = 0x00;
38
39const CHUNK_TYPE_UNCOMPRESSED: u8 = 0x01;
41
42const CHUNK_TYPE_STREAM_ID: u8 = 0xFF;
44
45const MAX_UNCOMPRESSED_CHUNK_SIZE: usize = 65536;
47
48pub struct FrameEncoder<W: Write> {
67 inner: Option<W>,
68 buffer: Vec<u8>,
69 header_written: bool,
70 progress: Option<ProgressHandle>,
72 cancel: Option<CancellationToken>,
74 bytes_processed: u64,
76 pool: Option<SnappyPool>,
78}
79
80impl<W: Write> FrameEncoder<W> {
81 pub fn new(inner: W) -> Self {
85 Self {
86 inner: Some(inner),
87 buffer: Vec::with_capacity(MAX_UNCOMPRESSED_CHUNK_SIZE),
88 header_written: false,
89 progress: None,
90 cancel: None,
91 bytes_processed: 0,
92 pool: None,
93 }
94 }
95
96 pub fn with_pool(inner: W, pool: &SnappyPool) -> Self {
100 let mut enc = Self::new(inner);
101 enc.pool = Some(pool.clone());
102 enc
103 }
104
105 pub fn with_progress(mut self, handle: ProgressHandle) -> Self {
108 self.progress = Some(handle);
109 self
110 }
111
112 pub fn with_cancel(mut self, token: CancellationToken) -> Self {
116 self.cancel = Some(token);
117 self
118 }
119
120 pub fn finish(mut self) -> io::Result<W> {
127 self.flush_buffer()?;
128 self.inner
129 .take()
130 .ok_or_else(|| io::Error::other("encoder already finished"))
131 }
132
133 fn ensure_header(&mut self) -> io::Result<()> {
135 if !self.header_written {
136 if let Some(ref mut w) = self.inner {
137 w.write_all(&STREAM_IDENTIFIER)?;
138 }
139 self.header_written = true;
140 }
141 Ok(())
142 }
143
144 fn flush_buffer(&mut self) -> io::Result<()> {
146 if self.buffer.is_empty() {
147 return Ok(());
148 }
149
150 self.ensure_header()?;
151
152 if let Some(ref token) = self.cancel {
154 token
155 .check()
156 .map_err(|_| io::Error::other("operation cancelled"))?;
157 }
158
159 let chunk_len = self.buffer.len() as u64;
160
161 let data = if let Some(ref snappy_pool) = self.pool {
166 let pi = &snappy_pool.inner;
167 let mut replacement = {
168 let mut guard = pi.encoder_scratch.lock().unwrap_or_else(|e| e.into_inner());
169 if let Some(mut b) = guard.pop() {
170 pi.encoder_scratch_hits
171 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
172 b.clear();
173 b
174 } else {
175 pi.encoder_scratch_allocs
176 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
177 Vec::with_capacity(crate::pool::ENCODER_SCRATCH_CAP)
178 }
179 };
180 std::mem::swap(&mut self.buffer, &mut replacement);
183 replacement
184 } else {
185 std::mem::take(&mut self.buffer)
186 };
187
188 self.write_chunk(&data)?;
189
190 if let Some(ref snappy_pool) = self.pool {
192 let pi = &snappy_pool.inner;
193 let mut buf = data;
194 buf.clear();
195 let mut guard = pi.encoder_scratch.lock().unwrap_or_else(|e| e.into_inner());
196 if guard.len() < pi.cap {
197 guard.push(buf);
198 }
199 }
200
201 self.bytes_processed += chunk_len;
203 if let Some(ref handle) = self.progress {
204 handle.on_progress(self.bytes_processed, None);
205 }
206
207 Ok(())
208 }
209
210 fn write_chunk(&mut self, data: &[u8]) -> io::Result<()> {
212 let writer = self
213 .inner
214 .as_mut()
215 .ok_or_else(|| io::Error::other("encoder already finished"))?;
216
217 let checksum = masked_crc32c(data);
218 let compressed = compress::compress(data);
219
220 if compressed.len() < data.len() {
223 let chunk_len = 4 + compressed.len(); write_chunk_header(writer, CHUNK_TYPE_COMPRESSED, chunk_len)?;
226 writer.write_all(&checksum.to_le_bytes())?;
227 writer.write_all(&compressed)?;
228 } else {
229 let chunk_len = 4 + data.len(); write_chunk_header(writer, CHUNK_TYPE_UNCOMPRESSED, chunk_len)?;
232 writer.write_all(&checksum.to_le_bytes())?;
233 writer.write_all(data)?;
234 }
235
236 Ok(())
237 }
238}
239
240impl<W: Write> Write for FrameEncoder<W> {
241 fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
242 if buf.is_empty() {
243 return Ok(0);
244 }
245
246 self.ensure_header()?;
247
248 let mut written = 0;
249
250 while written < buf.len() {
251 let remaining_capacity = MAX_UNCOMPRESSED_CHUNK_SIZE - self.buffer.len();
252 let to_copy = remaining_capacity.min(buf.len() - written);
253
254 self.buffer
255 .extend_from_slice(&buf[written..written + to_copy]);
256 written += to_copy;
257
258 if self.buffer.len() >= MAX_UNCOMPRESSED_CHUNK_SIZE {
259 self.flush_buffer()?;
260 }
261 }
262
263 Ok(written)
264 }
265
266 fn flush(&mut self) -> io::Result<()> {
267 self.flush_buffer()?;
268 if let Some(ref mut w) = self.inner {
269 w.flush()?;
270 }
271 Ok(())
272 }
273}
274
275impl<W: Write> Drop for FrameEncoder<W> {
276 fn drop(&mut self) {
277 if !self.buffer.is_empty() && self.inner.is_some() {
280 let _ = self.flush_buffer();
281 }
282 }
283}
284
285pub struct FrameDecoder<R: Read> {
300 inner: R,
301 output_buffer: Vec<u8>,
303 output_pos: usize,
305 header_validated: bool,
307 at_eof: bool,
309 progress: Option<ProgressHandle>,
311 cancel: Option<CancellationToken>,
313 bytes_processed: u64,
315 pool: Option<Arc<PoolInner>>,
317}
318
319impl<R: Read> FrameDecoder<R> {
320 pub fn new(inner: R) -> Self {
322 Self {
323 inner,
324 output_buffer: Vec::new(),
325 output_pos: 0,
326 header_validated: false,
327 at_eof: false,
328 progress: None,
329 cancel: None,
330 bytes_processed: 0,
331 pool: None,
332 }
333 }
334
335 pub fn with_pool(inner: R, pool: &SnappyPool) -> Self {
339 let mut dec = Self::new(inner);
340 dec.pool = Some(Arc::clone(&pool.inner));
341 dec
342 }
343
344 pub fn with_progress(mut self, handle: ProgressHandle) -> Self {
347 self.progress = Some(handle);
348 self
349 }
350
351 pub fn with_cancel(mut self, token: CancellationToken) -> Self {
355 self.cancel = Some(token);
356 self
357 }
358
359 fn validate_header(&mut self) -> io::Result<()> {
361 if self.header_validated {
362 return Ok(());
363 }
364
365 let mut header = [0u8; 10];
366 match self.inner.read_exact(&mut header) {
367 Ok(()) => {}
368 Err(e) if e.kind() == io::ErrorKind::UnexpectedEof => {
369 self.at_eof = true;
370 return Ok(());
371 }
372 Err(e) => return Err(e),
373 }
374
375 if header != STREAM_IDENTIFIER {
376 return Err(io::Error::new(
377 io::ErrorKind::InvalidData,
378 SnappyError::InvalidStreamIdentifier.to_string(),
379 ));
380 }
381
382 self.header_validated = true;
383 Ok(())
384 }
385
386 fn read_next_chunk(&mut self) -> io::Result<bool> {
388 if let Some(ref token) = self.cancel {
390 token
391 .check()
392 .map_err(|_| io::Error::other("operation cancelled"))?;
393 }
394
395 let mut chunk_header = [0u8; 4];
397 match self.inner.read_exact(&mut chunk_header) {
398 Ok(()) => {}
399 Err(e) if e.kind() == io::ErrorKind::UnexpectedEof => {
400 self.at_eof = true;
401 return Ok(false);
402 }
403 Err(e) => return Err(e),
404 }
405
406 let chunk_type = chunk_header[0];
407 let chunk_len = (chunk_header[1] as usize)
408 | ((chunk_header[2] as usize) << 8)
409 | ((chunk_header[3] as usize) << 16);
410
411 match chunk_type {
412 CHUNK_TYPE_COMPRESSED => {
413 self.read_compressed_chunk(chunk_len)?;
414 self.emit_chunk_progress();
415 Ok(true)
416 }
417 CHUNK_TYPE_UNCOMPRESSED => {
418 self.read_uncompressed_chunk(chunk_len)?;
419 self.emit_chunk_progress();
420 Ok(true)
421 }
422 CHUNK_TYPE_STREAM_ID => {
423 self.read_stream_identifier_chunk(chunk_len)?;
425 Ok(true)
426 }
427 0x02..=0x7F => {
428 Err(io::Error::new(
430 io::ErrorKind::InvalidData,
431 SnappyError::InvalidChunkType { chunk_type }.to_string(),
432 ))
433 }
434 _ => {
435 let mut skip_buf = vec![0u8; chunk_len];
437 self.inner.read_exact(&mut skip_buf)?;
438 Ok(true)
439 }
440 }
441 }
442
443 fn emit_chunk_progress(&mut self) {
447 let chunk_size = self.output_buffer.len() as u64;
448 self.bytes_processed += chunk_size;
449 if let Some(ref handle) = self.progress {
450 handle.on_progress(self.bytes_processed, None);
451 }
452 }
453
454 fn acquire_decoder_scratch(&mut self, chunk_len: usize) -> Vec<u8> {
459 if let Some(ref pool_inner) = self.pool {
460 let mut guard = pool_inner
461 .decoder_scratch
462 .lock()
463 .unwrap_or_else(|e| e.into_inner());
464 if let Some(mut b) = guard.pop() {
465 pool_inner
466 .decoder_scratch_hits
467 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
468 b.clear();
469 b.resize(chunk_len, 0);
470 return b;
471 }
472 pool_inner
473 .decoder_scratch_allocs
474 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
475 }
476 vec![0u8; chunk_len]
477 }
478
479 fn release_decoder_scratch(&self, mut buf: Vec<u8>) {
481 if let Some(ref pool_inner) = self.pool {
482 buf.clear();
483 let mut guard = pool_inner
484 .decoder_scratch
485 .lock()
486 .unwrap_or_else(|e| e.into_inner());
487 if guard.len() < pool_inner.cap {
488 guard.push(buf);
489 }
490 }
491 }
492
493 fn read_compressed_chunk(&mut self, chunk_len: usize) -> io::Result<()> {
495 if chunk_len < 4 {
496 return Err(io::Error::new(
497 io::ErrorKind::InvalidData,
498 "compressed chunk too short for checksum",
499 ));
500 }
501
502 let mut chunk_data = self.acquire_decoder_scratch(chunk_len);
503 self.inner.read_exact(&mut chunk_data)?;
504
505 let expected_checksum =
507 u32::from_le_bytes([chunk_data[0], chunk_data[1], chunk_data[2], chunk_data[3]]);
508
509 let compressed_data = &chunk_data[4..];
510 let decompressed = decompress::decompress(compressed_data)
511 .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e.to_string()))?;
512
513 self.release_decoder_scratch(chunk_data);
515
516 let computed_checksum = masked_crc32c(&decompressed);
518 if expected_checksum != computed_checksum {
519 return Err(io::Error::new(
520 io::ErrorKind::InvalidData,
521 SnappyError::ChecksumMismatch {
522 expected: expected_checksum,
523 computed: computed_checksum,
524 }
525 .to_string(),
526 ));
527 }
528
529 self.output_buffer = decompressed;
530 self.output_pos = 0;
531 Ok(())
532 }
533
534 fn read_uncompressed_chunk(&mut self, chunk_len: usize) -> io::Result<()> {
536 if chunk_len < 4 {
537 return Err(io::Error::new(
538 io::ErrorKind::InvalidData,
539 "uncompressed chunk too short for checksum",
540 ));
541 }
542
543 let mut chunk_data = self.acquire_decoder_scratch(chunk_len);
544 self.inner.read_exact(&mut chunk_data)?;
545
546 let expected_checksum =
547 u32::from_le_bytes([chunk_data[0], chunk_data[1], chunk_data[2], chunk_data[3]]);
548
549 let data_slice = chunk_data[4..].to_vec();
550
551 self.release_decoder_scratch(chunk_data);
552
553 let computed_checksum = masked_crc32c(&data_slice);
554 if expected_checksum != computed_checksum {
555 return Err(io::Error::new(
556 io::ErrorKind::InvalidData,
557 SnappyError::ChecksumMismatch {
558 expected: expected_checksum,
559 computed: computed_checksum,
560 }
561 .to_string(),
562 ));
563 }
564
565 self.output_buffer = data_slice;
566 self.output_pos = 0;
567 Ok(())
568 }
569
570 fn read_stream_identifier_chunk(&mut self, chunk_len: usize) -> io::Result<()> {
572 if chunk_len != 6 {
573 return Err(io::Error::new(
574 io::ErrorKind::InvalidData,
575 "invalid stream identifier length",
576 ));
577 }
578
579 let mut body = [0u8; 6];
580 self.inner.read_exact(&mut body)?;
581
582 if body != STREAM_BODY {
583 return Err(io::Error::new(
584 io::ErrorKind::InvalidData,
585 SnappyError::InvalidStreamIdentifier.to_string(),
586 ));
587 }
588
589 Ok(())
590 }
591}
592
593impl<R: Read> Read for FrameDecoder<R> {
594 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
595 if buf.is_empty() {
596 return Ok(0);
597 }
598
599 if !self.header_validated && !self.at_eof {
601 self.validate_header()?;
602 }
603
604 loop {
605 if self.at_eof {
606 return Ok(0);
607 }
608
609 let available = self.output_buffer.len() - self.output_pos;
611 if available > 0 {
612 let to_copy = available.min(buf.len());
613 buf[..to_copy].copy_from_slice(
614 &self.output_buffer[self.output_pos..self.output_pos + to_copy],
615 );
616 self.output_pos += to_copy;
617 return Ok(to_copy);
618 }
619
620 if !self.read_next_chunk()? {
622 return Ok(0);
623 }
624 }
625 }
626}
627
628fn write_chunk_header(writer: &mut impl Write, chunk_type: u8, data_len: usize) -> io::Result<()> {
630 let header = [
631 chunk_type,
632 (data_len & 0xFF) as u8,
633 ((data_len >> 8) & 0xFF) as u8,
634 ((data_len >> 16) & 0xFF) as u8,
635 ];
636 writer.write_all(&header)
637}
638
639pub fn compress_frame_pooled(input: &[u8], pool: &SnappyPool) -> io::Result<Vec<u8>> {
650 let mut output = Vec::new();
651 let mut encoder = FrameEncoder::with_pool(&mut output, pool);
652 encoder.write_all(input)?;
653 encoder.finish()?;
654 Ok(output)
655}
656
657pub fn compress_frame_with_dict(input: &[u8], dict: &[u8]) -> Vec<u8> {
682 let dict = if dict.len() > 65536 {
684 &dict[dict.len() - 65536..]
685 } else {
686 dict
687 };
688
689 let mut output = Vec::new();
690
691 output.extend_from_slice(&STREAM_IDENTIFIER);
693
694 let dict_crc = crc32c(dict);
697 let dict_len_u32 = dict.len() as u32;
698 let mut dict_body = Vec::with_capacity(13);
699 dict_body.extend_from_slice(OXIARC_DICT_MAGIC);
700 dict_body.extend_from_slice(&dict_crc.to_le_bytes());
701 dict_body.extend_from_slice(&dict_len_u32.to_le_bytes());
702
703 let body_len = dict_body.len();
705 output.push(CHUNK_TYPE_OXIARC_DICT);
706 output.push((body_len & 0xFF) as u8);
707 output.push(((body_len >> 8) & 0xFF) as u8);
708 output.push(((body_len >> 16) & 0xFF) as u8);
709 output.extend_from_slice(&dict_body);
710
711 let mut src_pos = 0usize;
713 while src_pos < input.len() {
714 let chunk_end = (src_pos + MAX_UNCOMPRESSED_CHUNK_SIZE).min(input.len());
715 let chunk_data = &input[src_pos..chunk_end];
716
717 let checksum = masked_crc32c(chunk_data);
718 let compressed = compress::compress_block_with_dict(chunk_data, dict);
719
720 if compressed.len() < chunk_data.len() {
721 let chunk_len = 4 + compressed.len();
723 write_chunk_header_vec(&mut output, CHUNK_TYPE_COMPRESSED, chunk_len);
724 output.extend_from_slice(&checksum.to_le_bytes());
725 output.extend_from_slice(&compressed);
726 } else {
727 let chunk_len = 4 + chunk_data.len();
729 write_chunk_header_vec(&mut output, CHUNK_TYPE_UNCOMPRESSED, chunk_len);
730 output.extend_from_slice(&checksum.to_le_bytes());
731 output.extend_from_slice(chunk_data);
732 }
733
734 src_pos = chunk_end;
735 }
736
737 output
738}
739
740pub fn decompress_frame_with_dict(input: &[u8], dict: &[u8]) -> Result<Vec<u8>, SnappyError> {
753 let dict = if dict.len() > 65536 {
755 &dict[dict.len() - 65536..]
756 } else {
757 dict
758 };
759
760 let mut pos = 0usize;
761
762 if pos + 10 > input.len() {
764 return Err(SnappyError::UnexpectedEof {
765 context: "stream identifier",
766 });
767 }
768 if input[pos..pos + 10] != STREAM_IDENTIFIER[..] {
769 return Err(SnappyError::InvalidStreamIdentifier);
770 }
771 pos += 10;
772
773 if pos + 4 > input.len() {
775 return Err(SnappyError::UnexpectedEof {
776 context: "dict-info chunk header",
777 });
778 }
779 let dict_chunk_type = input[pos];
780 let dict_chunk_body_len = (input[pos + 1] as usize)
781 | ((input[pos + 2] as usize) << 8)
782 | ((input[pos + 3] as usize) << 16);
783 pos += 4;
784
785 if dict_chunk_type != CHUNK_TYPE_OXIARC_DICT {
786 return Err(SnappyError::CorruptedData {
787 message: format!(
788 "expected OxiArc dict-info chunk (0xFE), found {dict_chunk_type:#04x}"
789 ),
790 });
791 }
792
793 if dict_chunk_body_len < 13 {
794 return Err(SnappyError::CorruptedData {
795 message: format!("OxiArc dict-info chunk body too short: {dict_chunk_body_len} bytes"),
796 });
797 }
798
799 if pos + dict_chunk_body_len > input.len() {
800 return Err(SnappyError::UnexpectedEof {
801 context: "dict-info chunk body",
802 });
803 }
804
805 let dict_body = &input[pos..pos + dict_chunk_body_len];
806 pos += dict_chunk_body_len;
807
808 if &dict_body[..5] != OXIARC_DICT_MAGIC {
810 return Err(SnappyError::CorruptedData {
811 message: "OxiArc dict-info magic mismatch".to_string(),
812 });
813 }
814
815 let stored_crc = u32::from_le_bytes([dict_body[5], dict_body[6], dict_body[7], dict_body[8]]);
816 let stored_len =
817 u32::from_le_bytes([dict_body[9], dict_body[10], dict_body[11], dict_body[12]]) as usize;
818
819 let computed_crc = crc32c(dict);
821 if computed_crc != stored_crc {
822 return Err(SnappyError::ChecksumMismatch {
823 expected: stored_crc,
824 computed: computed_crc,
825 });
826 }
827 if dict.len() != stored_len {
828 return Err(SnappyError::CorruptedData {
829 message: format!(
830 "dict length mismatch: frame has {stored_len} bytes, supplied dict is {} bytes",
831 dict.len()
832 ),
833 });
834 }
835
836 let mut output = Vec::new();
838
839 while pos < input.len() {
840 if pos + 4 > input.len() {
841 return Err(SnappyError::UnexpectedEof {
842 context: "chunk header",
843 });
844 }
845 let chunk_type = input[pos];
846 let chunk_body_len = (input[pos + 1] as usize)
847 | ((input[pos + 2] as usize) << 8)
848 | ((input[pos + 3] as usize) << 16);
849 pos += 4;
850
851 if pos + chunk_body_len > input.len() {
852 return Err(SnappyError::UnexpectedEof {
853 context: "chunk body",
854 });
855 }
856
857 let chunk_body = &input[pos..pos + chunk_body_len];
858 pos += chunk_body_len;
859
860 match chunk_type {
861 CHUNK_TYPE_COMPRESSED => {
862 if chunk_body.len() < 4 {
863 return Err(SnappyError::CorruptedData {
864 message: "compressed chunk too short for checksum".to_string(),
865 });
866 }
867 let expected_checksum = u32::from_le_bytes([
868 chunk_body[0],
869 chunk_body[1],
870 chunk_body[2],
871 chunk_body[3],
872 ]);
873 let compressed_payload = &chunk_body[4..];
874 let decompressed =
875 decompress::decompress_block_with_dict(compressed_payload, dict)?;
876
877 let computed_checksum = masked_crc32c(&decompressed);
878 if expected_checksum != computed_checksum {
879 return Err(SnappyError::ChecksumMismatch {
880 expected: expected_checksum,
881 computed: computed_checksum,
882 });
883 }
884 output.extend_from_slice(&decompressed);
885 }
886 CHUNK_TYPE_UNCOMPRESSED => {
887 if chunk_body.len() < 4 {
888 return Err(SnappyError::CorruptedData {
889 message: "uncompressed chunk too short for checksum".to_string(),
890 });
891 }
892 let expected_checksum = u32::from_le_bytes([
893 chunk_body[0],
894 chunk_body[1],
895 chunk_body[2],
896 chunk_body[3],
897 ]);
898 let raw_data = &chunk_body[4..];
899
900 let computed_checksum = masked_crc32c(raw_data);
901 if expected_checksum != computed_checksum {
902 return Err(SnappyError::ChecksumMismatch {
903 expected: expected_checksum,
904 computed: computed_checksum,
905 });
906 }
907 output.extend_from_slice(raw_data);
908 }
909 CHUNK_TYPE_STREAM_ID => {
910 }
912 0x02..=0x7F => {
913 return Err(SnappyError::InvalidChunkType { chunk_type });
914 }
915 _ => {
916 }
918 }
919 }
920
921 Ok(output)
922}
923
924fn write_chunk_header_vec(output: &mut Vec<u8>, chunk_type: u8, data_len: usize) {
927 output.push(chunk_type);
928 output.push((data_len & 0xFF) as u8);
929 output.push(((data_len >> 8) & 0xFF) as u8);
930 output.push(((data_len >> 16) & 0xFF) as u8);
931}
932
933#[cfg(test)]
934mod tests {
935 use super::*;
936
937 #[test]
938 fn test_frame_roundtrip_small() {
939 let data = b"Hello, World! This is a test of Snappy framing.";
940
941 let mut compressed = Vec::new();
942 {
943 let mut encoder = FrameEncoder::new(&mut compressed);
944 encoder.write_all(data).expect("write should succeed");
945 encoder.finish().expect("finish should succeed");
946 }
947
948 assert_eq!(&compressed[..10], &STREAM_IDENTIFIER);
950
951 let mut decoder = FrameDecoder::new(&compressed[..]);
952 let mut output = Vec::new();
953 decoder
954 .read_to_end(&mut output)
955 .expect("read should succeed");
956
957 assert_eq!(output, data);
958 }
959
960 #[test]
961 fn test_frame_roundtrip_empty() {
962 let data = b"";
963
964 let mut compressed = Vec::new();
965 {
966 let mut encoder = FrameEncoder::new(&mut compressed);
967 encoder.write_all(data).expect("write should succeed");
968 encoder.finish().expect("finish should succeed");
969 }
970
971 let mut decoder = FrameDecoder::new(&compressed[..]);
972 let mut output = Vec::new();
973 decoder
974 .read_to_end(&mut output)
975 .expect("read should succeed");
976
977 assert_eq!(output, data);
978 }
979
980 #[test]
981 fn test_frame_roundtrip_large() {
982 let mut data = Vec::with_capacity(100_000);
984 for i in 0..100_000u32 {
985 data.push((i % 256) as u8);
986 }
987
988 let mut compressed = Vec::new();
989 {
990 let mut encoder = FrameEncoder::new(&mut compressed);
991 encoder.write_all(&data).expect("write should succeed");
992 encoder.finish().expect("finish should succeed");
993 }
994
995 let mut decoder = FrameDecoder::new(&compressed[..]);
996 let mut output = Vec::new();
997 decoder
998 .read_to_end(&mut output)
999 .expect("read should succeed");
1000
1001 assert_eq!(output, data);
1002 }
1003
1004 #[test]
1005 fn test_frame_roundtrip_repeated() {
1006 let data = vec![0xAB; 200_000];
1007
1008 let mut compressed = Vec::new();
1009 {
1010 let mut encoder = FrameEncoder::new(&mut compressed);
1011 encoder.write_all(&data).expect("write should succeed");
1012 encoder.finish().expect("finish should succeed");
1013 }
1014
1015 assert!(compressed.len() < data.len());
1017
1018 let mut decoder = FrameDecoder::new(&compressed[..]);
1019 let mut output = Vec::new();
1020 decoder
1021 .read_to_end(&mut output)
1022 .expect("read should succeed");
1023
1024 assert_eq!(output, data);
1025 }
1026
1027 #[test]
1028 fn test_frame_incremental_write() {
1029 let data = b"Hello, this is a test of incremental writing to the encoder.";
1030
1031 let mut compressed = Vec::new();
1032 {
1033 let mut encoder = FrameEncoder::new(&mut compressed);
1034 for chunk in data.chunks(5) {
1036 encoder.write_all(chunk).expect("write should succeed");
1037 }
1038 encoder.finish().expect("finish should succeed");
1039 }
1040
1041 let mut decoder = FrameDecoder::new(&compressed[..]);
1042 let mut output = Vec::new();
1043 decoder
1044 .read_to_end(&mut output)
1045 .expect("read should succeed");
1046
1047 assert_eq!(output, data);
1048 }
1049
1050 #[test]
1051 fn test_frame_incremental_read() {
1052 let data = b"Test data for incremental reading from the decoder.";
1053
1054 let mut compressed = Vec::new();
1055 {
1056 let mut encoder = FrameEncoder::new(&mut compressed);
1057 encoder.write_all(data).expect("write should succeed");
1058 encoder.finish().expect("finish should succeed");
1059 }
1060
1061 let mut decoder = FrameDecoder::new(&compressed[..]);
1062 let mut output = Vec::new();
1063 let mut buf = [0u8; 7]; loop {
1065 let n = decoder.read(&mut buf).expect("read should succeed");
1066 if n == 0 {
1067 break;
1068 }
1069 output.extend_from_slice(&buf[..n]);
1070 }
1071
1072 assert_eq!(output, data);
1073 }
1074
1075 #[test]
1076 fn test_frame_decoder_invalid_header() {
1077 let bad_data = [0x00, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x09];
1078 let mut decoder = FrameDecoder::new(&bad_data[..]);
1079 let mut output = Vec::new();
1080 let result = decoder.read_to_end(&mut output);
1081 assert!(result.is_err());
1082 }
1083
1084 #[test]
1085 fn test_frame_decoder_empty_input() {
1086 let empty: &[u8] = &[];
1087 let mut decoder = FrameDecoder::new(empty);
1088 let mut output = Vec::new();
1089 decoder
1090 .read_to_end(&mut output)
1091 .expect("empty input should succeed");
1092 assert!(output.is_empty());
1093 }
1094
1095 #[test]
1096 fn test_write_chunk_header() {
1097 let mut buf = Vec::new();
1098 write_chunk_header(&mut buf, 0x00, 0x123456).expect("should succeed");
1099 assert_eq!(buf, vec![0x00, 0x56, 0x34, 0x12]);
1100 }
1101
1102 #[test]
1103 fn test_stream_identifier_constant() {
1104 assert_eq!(STREAM_IDENTIFIER[0], 0xFF); assert_eq!(STREAM_IDENTIFIER[1], 0x06); assert_eq!(STREAM_IDENTIFIER[2], 0x00); assert_eq!(STREAM_IDENTIFIER[3], 0x00); assert_eq!(&STREAM_IDENTIFIER[4..], b"sNaPpY");
1110 }
1111
1112 use oxiarc_core::cancel::CancellationToken;
1117 use oxiarc_core::progress::{ProgressHandle, ProgressSink};
1118 use std::sync::{
1119 Arc,
1120 atomic::{AtomicUsize, Ordering},
1121 };
1122
1123 struct CountingSink {
1125 calls: AtomicUsize,
1126 }
1127
1128 impl CountingSink {
1129 fn new() -> Self {
1130 Self {
1131 calls: AtomicUsize::new(0),
1132 }
1133 }
1134
1135 fn call_count(&self) -> usize {
1136 self.calls.load(Ordering::SeqCst)
1137 }
1138 }
1139
1140 impl ProgressSink for CountingSink {
1141 fn on_progress(&self, _processed: u64, _total: Option<u64>) {
1142 self.calls.fetch_add(1, Ordering::SeqCst);
1143 }
1144 }
1145
1146 struct MonotonicSink {
1149 values: std::sync::Mutex<Vec<u64>>,
1150 }
1151
1152 impl MonotonicSink {
1153 fn new() -> Self {
1154 Self {
1155 values: std::sync::Mutex::new(Vec::new()),
1156 }
1157 }
1158
1159 fn values(&self) -> Vec<u64> {
1160 self.values
1161 .lock()
1162 .unwrap_or_else(|p| p.into_inner())
1163 .clone()
1164 }
1165 }
1166
1167 impl ProgressSink for MonotonicSink {
1168 fn on_progress(&self, processed: u64, _total: Option<u64>) {
1169 let mut guard = self.values.lock().unwrap_or_else(|p| p.into_inner());
1170 guard.push(processed);
1171 }
1172 }
1173
1174 #[test]
1180 fn test_progress_counting_sink() {
1181 let data: Vec<u8> = (0..131_072u64)
1184 .map(|i| (i.wrapping_mul(6_364_136_223_846_793_005_u64) >> 56) as u8)
1185 .collect();
1186
1187 let enc_sink = Arc::new(CountingSink::new());
1189 let enc_handle: ProgressHandle = enc_sink.clone();
1190
1191 let mut compressed = Vec::new();
1192 {
1193 let mut encoder = FrameEncoder::new(&mut compressed).with_progress(enc_handle);
1194 encoder
1195 .write_all(&data)
1196 .expect("encode write should succeed");
1197 encoder.finish().expect("encode finish should succeed");
1198 }
1199
1200 assert!(
1201 enc_sink.call_count() >= 2,
1202 "encoder on_progress called {} times, expected >= 2",
1203 enc_sink.call_count()
1204 );
1205
1206 let dec_sink = Arc::new(MonotonicSink::new());
1208 let dec_handle: ProgressHandle = dec_sink.clone();
1209
1210 let mut decoder = FrameDecoder::new(&compressed[..]).with_progress(dec_handle);
1211 let mut output = Vec::new();
1212 decoder
1213 .read_to_end(&mut output)
1214 .expect("decode should succeed");
1215
1216 assert_eq!(output, data, "decoded data must match original");
1217
1218 let values = dec_sink.values();
1219 assert!(
1220 values.len() >= 2,
1221 "decoder on_progress called {} times, expected >= 2",
1222 values.len()
1223 );
1224 for window in values.windows(2) {
1226 assert!(
1227 window[1] >= window[0],
1228 "progress values not monotonic: {} followed by {}",
1229 window[0],
1230 window[1]
1231 );
1232 }
1233 }
1234
1235 #[test]
1241 fn test_cancellation_encoder_pre_cancelled() {
1242 let data: Vec<u8> = vec![0xBEu8; 131_072];
1244
1245 let token = CancellationToken::new();
1246 token.cancel();
1247
1248 let mut compressed = Vec::new();
1249 let mut encoder = FrameEncoder::new(&mut compressed).with_cancel(token);
1250 let write_result = encoder.write_all(&data);
1252 let finish_result = encoder.finish();
1253
1254 let triggered = write_result.is_err() || finish_result.is_err();
1256 assert!(
1257 triggered,
1258 "expected a cancellation error but neither write nor finish returned Err"
1259 );
1260 }
1261
1262 #[test]
1264 fn test_cancellation_decoder_pre_cancelled() {
1265 let data: Vec<u8> = vec![0xCAu8; 131_072];
1267 let mut compressed = Vec::new();
1268 {
1269 let mut encoder = FrameEncoder::new(&mut compressed);
1270 encoder
1271 .write_all(&data)
1272 .expect("encode write should succeed");
1273 encoder.finish().expect("encode finish should succeed");
1274 }
1275
1276 let token = CancellationToken::new();
1277 token.cancel();
1278
1279 let mut decoder = FrameDecoder::new(&compressed[..]).with_cancel(token);
1280 let mut output = Vec::new();
1281 let result = decoder.read_to_end(&mut output);
1282 assert!(result.is_err(), "expected cancellation error from decoder");
1283 }
1284
1285 #[test]
1292 fn test_frame_max_size_chunk() {
1293 let data: Vec<u8> = (0..MAX_UNCOMPRESSED_CHUNK_SIZE)
1294 .map(|i| (i % 251) as u8)
1295 .collect();
1296 assert_eq!(data.len(), MAX_UNCOMPRESSED_CHUNK_SIZE);
1297
1298 let mut compressed = Vec::new();
1299 {
1300 let mut encoder = FrameEncoder::new(&mut compressed);
1301 encoder.write_all(&data).expect("write should succeed");
1302 encoder.finish().expect("finish should succeed");
1303 }
1304
1305 let mut decoder = FrameDecoder::new(&compressed[..]);
1306 let mut output = Vec::new();
1307 decoder
1308 .read_to_end(&mut output)
1309 .expect("read should succeed");
1310
1311 assert_eq!(output, data, "max-size chunk roundtrip failed");
1312 }
1313
1314 #[test]
1321 fn test_frame_just_over_max_chunk() {
1322 let size = MAX_UNCOMPRESSED_CHUNK_SIZE + 1;
1323 let data: Vec<u8> = (0..size).map(|i| (i % 253) as u8).collect();
1324
1325 let mut compressed = Vec::new();
1326 {
1327 let mut encoder = FrameEncoder::new(&mut compressed);
1328 encoder.write_all(&data).expect("write should succeed");
1329 encoder.finish().expect("finish should succeed");
1330 }
1331
1332 let payload = &compressed[10..];
1335 let mut data_chunk_count = 0usize;
1336 let mut pos = 0usize;
1337 while pos + 4 <= payload.len() {
1338 let chunk_type = payload[pos];
1339 let chunk_len = (payload[pos + 1] as usize)
1340 | ((payload[pos + 2] as usize) << 8)
1341 | ((payload[pos + 3] as usize) << 16);
1342 if chunk_type == CHUNK_TYPE_COMPRESSED || chunk_type == CHUNK_TYPE_UNCOMPRESSED {
1343 data_chunk_count += 1;
1344 }
1345 pos += 4 + chunk_len;
1346 }
1347
1348 assert_eq!(
1349 data_chunk_count, 2,
1350 "expected exactly 2 data chunks for 65537-byte input, got {data_chunk_count}"
1351 );
1352
1353 let mut decoder = FrameDecoder::new(&compressed[..]);
1355 let mut output = Vec::new();
1356 decoder
1357 .read_to_end(&mut output)
1358 .expect("read should succeed");
1359 assert_eq!(output, data, "just-over-max chunk roundtrip failed");
1360 }
1361
1362 #[test]
1366 fn test_block_max_compress_len() {
1367 use crate::compress::max_compress_len;
1368
1369 let max_len_65536 = max_compress_len(MAX_UNCOMPRESSED_CHUNK_SIZE);
1370 assert!(
1371 max_len_65536 >= MAX_UNCOMPRESSED_CHUNK_SIZE,
1372 "max_compress_len(65536) = {max_len_65536}, expected >= 65536"
1373 );
1374
1375 let max_len_0 = max_compress_len(0);
1376 assert!(
1377 max_len_0 >= 1,
1378 "max_compress_len(0) = {max_len_0}, expected >= 1"
1379 );
1380 }
1381
1382 #[test]
1385 fn test_decompress_truncated_frame() {
1386 let data = vec![b'X'; 1000];
1388 let mut compressed = Vec::new();
1389 {
1390 let mut encoder = FrameEncoder::new(&mut compressed);
1391 encoder.write_all(&data).expect("write should succeed");
1392 encoder.finish().expect("finish should succeed");
1393 }
1394
1395 let truncated_len = compressed.len() / 2;
1397 let truncated = &compressed[..truncated_len];
1398
1399 let mut decoder = FrameDecoder::new(truncated);
1400 let mut output = Vec::new();
1401 let result = decoder.read_to_end(&mut output);
1402 assert!(
1404 result.is_err(),
1405 "expected error on truncated input, but read_to_end succeeded"
1406 );
1407 }
1408
1409 #[test]
1412 fn test_decompress_corrupt_crc() {
1413 let data = vec![b'A'; 500];
1414 let mut compressed = Vec::new();
1415 {
1416 let mut encoder = FrameEncoder::new(&mut compressed);
1417 encoder.write_all(&data).expect("write should succeed");
1418 encoder.finish().expect("finish should succeed");
1419 }
1420
1421 let crc_offset = 14;
1424 assert!(
1425 crc_offset < compressed.len(),
1426 "compressed output is too short to contain a CRC field"
1427 );
1428 let mut corrupt = compressed.clone();
1429 corrupt[crc_offset] ^= 0xFF;
1430
1431 let mut decoder = FrameDecoder::new(&corrupt[..]);
1432 let mut output = Vec::new();
1433 let result = decoder.read_to_end(&mut output);
1434 assert!(
1435 result.is_err(),
1436 "expected checksum error on corrupt CRC, but read_to_end succeeded"
1437 );
1438 }
1439
1440 #[test]
1443 fn test_compress_all_zeros() {
1444 let data = vec![0u8; MAX_UNCOMPRESSED_CHUNK_SIZE];
1445
1446 let mut compressed = Vec::new();
1447 {
1448 let mut encoder = FrameEncoder::new(&mut compressed);
1449 encoder.write_all(&data).expect("write should succeed");
1450 encoder.finish().expect("finish should succeed");
1451 }
1452
1453 assert!(
1455 compressed.len() < data.len() / 4,
1456 "expected compressed output much smaller than {}, got {}",
1457 data.len(),
1458 compressed.len()
1459 );
1460
1461 let mut decoder = FrameDecoder::new(&compressed[..]);
1462 let mut output = Vec::new();
1463 decoder
1464 .read_to_end(&mut output)
1465 .expect("read should succeed");
1466 assert_eq!(output, data, "all-zeros roundtrip failed");
1467 }
1468
1469 #[test]
1472 fn test_compress_all_ones() {
1473 let data = vec![0xFFu8; MAX_UNCOMPRESSED_CHUNK_SIZE];
1474
1475 let mut compressed = Vec::new();
1476 {
1477 let mut encoder = FrameEncoder::new(&mut compressed);
1478 encoder.write_all(&data).expect("write should succeed");
1479 encoder.finish().expect("finish should succeed");
1480 }
1481
1482 let mut decoder = FrameDecoder::new(&compressed[..]);
1483 let mut output = Vec::new();
1484 decoder
1485 .read_to_end(&mut output)
1486 .expect("read should succeed");
1487 assert_eq!(output, data, "all-0xFF roundtrip failed");
1488 }
1489
1490 #[test]
1494 fn test_frame_decoder_incremental_max_size() {
1495 use std::io::BufReader;
1496
1497 let data: Vec<u8> = (0..MAX_UNCOMPRESSED_CHUNK_SIZE)
1498 .map(|i| (i % 199) as u8)
1499 .collect();
1500
1501 let mut compressed = Vec::new();
1502 {
1503 let mut encoder = FrameEncoder::new(&mut compressed);
1504 encoder.write_all(&data).expect("write should succeed");
1505 encoder.finish().expect("finish should succeed");
1506 }
1507
1508 let buf_reader = BufReader::with_capacity(1, &compressed[..]);
1511 let mut decoder = FrameDecoder::new(buf_reader);
1512 let mut output = Vec::new();
1513 decoder
1514 .read_to_end(&mut output)
1515 .expect("incremental read should succeed");
1516
1517 assert_eq!(
1518 output, data,
1519 "incremental max-size chunk decoder roundtrip failed"
1520 );
1521 }
1522}