rust_rcs_core/http/
decompress.rs1extern crate async_compression;
16extern crate tokio;
17extern crate tokio_stream;
18
19use std::io;
20use std::marker::Unpin;
21
22use async_compression::futures::bufread::BrotliDecoder;
23use async_compression::futures::bufread::DeflateDecoder;
24use async_compression::futures::bufread::GzipDecoder;
25
26use futures::io::{AsyncBufRead, AsyncReadExt};
27use futures::stream::TryStreamExt;
28
29use tokio::sync::mpsc;
30
31use tokio_stream::wrappers::ReceiverStream;
32
33use super::response::Encoding;
34
35pub struct Decompressor {
36 reader: Box<dyn AsyncBufRead + Send + Unpin>,
37}
38
39impl Decompressor {
40 pub fn new(encodings: &[Encoding], rx: mpsc::Receiver<io::Result<Vec<u8>>>) -> Decompressor {
41 let stream = ReceiverStream::new(rx);
42
43 let reader = stream.into_async_read();
44 let mut reader: Box<dyn AsyncBufRead + Send + Unpin> = Box::new(reader);
45
46 for encoding in encodings {
47 match encoding {
48 Encoding::Brotli => {
49 reader = Box::new(
50 Box::pin(futures::stream::try_unfold(
51 BrotliDecoder::new(reader),
52 |mut encoder| async move {
53 let mut chunk = vec![0; 8 * 1024];
54 let len = encoder.read(&mut chunk).await?;
55 if len == 0 {
56 Ok(None)
57 } else {
58 chunk.truncate(len);
59 Ok(Some((chunk, encoder)))
60 }
61 },
62 ))
63 .into_async_read(),
64 );
65 }
66
67 Encoding::Compress => {
68 panic!("Not Implemented")
69 }
70
71 Encoding::Deflate => {
72 reader = Box::new(
73 Box::pin(futures::stream::try_unfold(
74 DeflateDecoder::new(reader),
75 |mut encoder| async move {
76 let mut chunk = vec![0; 8 * 1024];
77 let len = encoder.read(&mut chunk).await?;
78 if len == 0 {
79 Ok(None)
80 } else {
81 chunk.truncate(len);
82 Ok(Some((chunk, encoder)))
83 }
84 },
85 ))
86 .into_async_read(),
87 );
88 }
89
90 Encoding::Gzip => {
91 reader = Box::new(
92 Box::pin(futures::stream::try_unfold(
93 GzipDecoder::new(reader),
94 |mut encoder| async move {
95 let mut chunk = vec![0; 8 * 1024];
96 let len = encoder.read(&mut chunk).await?;
97 if len == 0 {
98 Ok(None)
99 } else {
100 chunk.truncate(len);
101 Ok(Some((chunk, encoder)))
102 }
103 },
104 ))
105 .into_async_read(),
106 );
107 }
108 }
109 }
110
111 Decompressor { reader }
112 }
113
114 pub fn reader(&mut self) -> &mut Box<dyn AsyncBufRead + Send + Unpin> {
115 &mut self.reader
116 }
117}