rust_rcs_core/http/
decompress.rs

1// Copyright 2023 宋昊文
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15extern crate async_compression;
16extern crate tokio;
17extern crate tokio_stream;
18
19use std::io;
20use std::marker::Unpin;
21
22use async_compression::futures::bufread::BrotliDecoder;
23use async_compression::futures::bufread::DeflateDecoder;
24use async_compression::futures::bufread::GzipDecoder;
25
26use futures::io::{AsyncBufRead, AsyncReadExt};
27use futures::stream::TryStreamExt;
28
29use tokio::sync::mpsc;
30
31use tokio_stream::wrappers::ReceiverStream;
32
33use super::response::Encoding;
34
35pub struct Decompressor {
36    reader: Box<dyn AsyncBufRead + Send + Unpin>,
37}
38
39impl Decompressor {
40    pub fn new(encodings: &[Encoding], rx: mpsc::Receiver<io::Result<Vec<u8>>>) -> Decompressor {
41        let stream = ReceiverStream::new(rx);
42
43        let reader = stream.into_async_read();
44        let mut reader: Box<dyn AsyncBufRead + Send + Unpin> = Box::new(reader);
45
46        for encoding in encodings {
47            match encoding {
48                Encoding::Brotli => {
49                    reader = Box::new(
50                        Box::pin(futures::stream::try_unfold(
51                            BrotliDecoder::new(reader),
52                            |mut encoder| async move {
53                                let mut chunk = vec![0; 8 * 1024];
54                                let len = encoder.read(&mut chunk).await?;
55                                if len == 0 {
56                                    Ok(None)
57                                } else {
58                                    chunk.truncate(len);
59                                    Ok(Some((chunk, encoder)))
60                                }
61                            },
62                        ))
63                        .into_async_read(),
64                    );
65                }
66
67                Encoding::Compress => {
68                    panic!("Not Implemented")
69                }
70
71                Encoding::Deflate => {
72                    reader = Box::new(
73                        Box::pin(futures::stream::try_unfold(
74                            DeflateDecoder::new(reader),
75                            |mut encoder| async move {
76                                let mut chunk = vec![0; 8 * 1024];
77                                let len = encoder.read(&mut chunk).await?;
78                                if len == 0 {
79                                    Ok(None)
80                                } else {
81                                    chunk.truncate(len);
82                                    Ok(Some((chunk, encoder)))
83                                }
84                            },
85                        ))
86                        .into_async_read(),
87                    );
88                }
89
90                Encoding::Gzip => {
91                    reader = Box::new(
92                        Box::pin(futures::stream::try_unfold(
93                            GzipDecoder::new(reader),
94                            |mut encoder| async move {
95                                let mut chunk = vec![0; 8 * 1024];
96                                let len = encoder.read(&mut chunk).await?;
97                                if len == 0 {
98                                    Ok(None)
99                                } else {
100                                    chunk.truncate(len);
101                                    Ok(Some((chunk, encoder)))
102                                }
103                            },
104                        ))
105                        .into_async_read(),
106                    );
107                }
108            }
109        }
110
111        Decompressor { reader }
112    }
113
114    pub fn reader(&mut self) -> &mut Box<dyn AsyncBufRead + Send + Unpin> {
115        &mut self.reader
116    }
117}