1use core::{
2 pin::Pin,
3 task::{ready, Context, Poll},
4};
5
6use std::io;
7
8use bytes::Bytes;
9use futures_core::stream::Stream;
10use pin_project_lite::pin_project;
11
12use super::error::CoderError;
13
14pin_project! {
15 #[derive(Default)]
17 pub struct Coder<S, C = FeaturedCode>{
18 #[pin]
19 body: S,
20 coder: C,
21 }
22}
23
24impl<S, C, T, E> Coder<S, C>
25where
26 S: Stream<Item = Result<T, E>>,
27 C: Code<T>,
28 T: AsRef<[u8]>,
29{
30 #[inline]
32 pub const fn new(body: S, coder: C) -> Self {
33 Self { body, coder }
34 }
35
36 #[inline]
37 pub fn into_inner(self) -> S {
38 self.body
39 }
40}
41
42impl<S, C, T, E> Stream for Coder<S, C>
43where
44 S: Stream<Item = Result<T, E>>,
45 CoderError: From<E>,
46 C: Code<T>,
47{
48 type Item = Result<C::Item, CoderError>;
49
50 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
51 let mut this = self.project();
52
53 while let Some(res) = ready!(this.body.as_mut().poll_next(cx)) {
54 let item = res?;
55 if let Some(item) = this.coder.code(item)? {
56 return Poll::Ready(Some(Ok(item)));
57 }
58 }
59
60 match this.coder.code_eof()? {
61 Some(res) => Poll::Ready(Some(Ok(res))),
62 None => Poll::Ready(None),
63 }
64 }
65
66 #[inline]
67 fn size_hint(&self) -> (usize, Option<usize>) {
68 self.coder.size_hint(&self.body)
70 }
71}
72
73pub trait Code<T>: Sized {
74 type Item;
75
76 fn code(&mut self, item: T) -> io::Result<Option<Self::Item>>;
77
78 fn code_eof(&mut self) -> io::Result<Option<Self::Item>>;
79
80 #[allow(unused_variables)]
84 #[inline]
85 fn size_hint(&self, stream: &impl Stream) -> (usize, Option<usize>) {
86 (0, None)
87 }
88}
89
90pub struct NoOpCode;
92
93impl<T> Code<T> for NoOpCode
94where
95 T: AsRef<[u8]> + 'static,
96{
97 type Item = Bytes;
98
99 fn code(&mut self, item: T) -> io::Result<Option<Self::Item>> {
100 Ok(Some(
101 try_downcast_to_bytes(item).unwrap_or_else(|item| Bytes::copy_from_slice(item.as_ref())),
102 ))
103 }
104
105 #[inline]
106 fn code_eof(&mut self) -> io::Result<Option<Self::Item>> {
107 Ok(None)
108 }
109
110 #[inline]
114 fn size_hint(&self, stream: &impl Stream) -> (usize, Option<usize>) {
115 stream.size_hint()
116 }
117}
118
119pub enum FeaturedCode {
120 NoOp(NoOpCode),
121 #[cfg(feature = "br")]
122 DecodeBr(super::brotli::Decoder),
123 #[cfg(feature = "br")]
124 EncodeBr(super::brotli::Encoder),
125 #[cfg(feature = "gz")]
126 DecodeGz(super::gzip::Decoder),
127 #[cfg(feature = "gz")]
128 EncodeGz(super::gzip::Encoder),
129 #[cfg(feature = "de")]
130 DecodeDe(super::deflate::Decoder),
131 #[cfg(feature = "de")]
132 EncodeDe(super::deflate::Encoder),
133}
134
135impl Default for FeaturedCode {
136 fn default() -> Self {
137 Self::NoOp(NoOpCode)
138 }
139}
140
141impl<T> Code<T> for FeaturedCode
142where
143 T: AsRef<[u8]> + 'static,
144{
145 type Item = Bytes;
146
147 fn code(&mut self, item: T) -> io::Result<Option<Self::Item>> {
148 match self {
149 Self::NoOp(ref mut coder) => coder.code(item),
150 #[cfg(feature = "br")]
151 Self::DecodeBr(ref mut coder) => coder.code(item),
152 #[cfg(feature = "br")]
153 Self::EncodeBr(ref mut coder) => coder.code(item),
154 #[cfg(feature = "gz")]
155 Self::DecodeGz(ref mut coder) => coder.code(item),
156 #[cfg(feature = "gz")]
157 Self::EncodeGz(ref mut coder) => coder.code(item),
158 #[cfg(feature = "de")]
159 Self::DecodeDe(ref mut coder) => coder.code(item),
160 #[cfg(feature = "de")]
161 Self::EncodeDe(ref mut coder) => coder.code(item),
162 }
163 }
164
165 fn code_eof(&mut self) -> io::Result<Option<Self::Item>> {
166 match self {
167 Self::NoOp(ref mut coder) => <NoOpCode as Code<T>>::code_eof(coder),
168 #[cfg(feature = "br")]
169 Self::DecodeBr(ref mut coder) => <super::brotli::Decoder as Code<T>>::code_eof(coder),
170 #[cfg(feature = "br")]
171 Self::EncodeBr(ref mut coder) => <super::brotli::Encoder as Code<T>>::code_eof(coder),
172 #[cfg(feature = "gz")]
173 Self::DecodeGz(ref mut coder) => <super::gzip::Decoder as Code<T>>::code_eof(coder),
174 #[cfg(feature = "gz")]
175 Self::EncodeGz(ref mut coder) => <super::gzip::Encoder as Code<T>>::code_eof(coder),
176 #[cfg(feature = "de")]
177 Self::DecodeDe(ref mut coder) => <super::deflate::Decoder as Code<T>>::code_eof(coder),
178 #[cfg(feature = "de")]
179 Self::EncodeDe(ref mut coder) => <super::deflate::Encoder as Code<T>>::code_eof(coder),
180 }
181 }
182
183 fn size_hint(&self, stream: &impl Stream) -> (usize, Option<usize>) {
184 match self {
185 Self::NoOp(ref coder) => <NoOpCode as Code<T>>::size_hint(coder, stream),
186 #[cfg(feature = "br")]
187 Self::DecodeBr(ref coder) => <super::brotli::Decoder as Code<T>>::size_hint(coder, stream),
188 #[cfg(feature = "br")]
189 Self::EncodeBr(ref coder) => <super::brotli::Encoder as Code<T>>::size_hint(coder, stream),
190 #[cfg(feature = "gz")]
191 Self::DecodeGz(ref coder) => <super::gzip::Decoder as Code<T>>::size_hint(coder, stream),
192 #[cfg(feature = "gz")]
193 Self::EncodeGz(ref coder) => <super::gzip::Encoder as Code<T>>::size_hint(coder, stream),
194 #[cfg(feature = "de")]
195 Self::DecodeDe(ref coder) => <super::deflate::Decoder as Code<T>>::size_hint(coder, stream),
196 #[cfg(feature = "de")]
197 Self::EncodeDe(ref coder) => <super::deflate::Encoder as Code<T>>::size_hint(coder, stream),
198 }
199 }
200}
201
202#[cfg(any(feature = "gz", feature = "de"))]
203macro_rules! code_impl {
204 ($coder: ident) => {
205 impl<T> crate::Code<T> for $coder<crate::writer::BytesMutWriter>
206 where
207 T: AsRef<[u8]>,
208 {
209 type Item = ::bytes::Bytes;
210
211 fn code(&mut self, item: T) -> ::std::io::Result<Option<Self::Item>> {
212 use ::std::io::Write;
213
214 self.write_all(item.as_ref())?;
215 let b = self.get_mut().take();
216 if !b.is_empty() {
217 Ok(Some(b))
218 } else {
219 Ok(None)
220 }
221 }
222
223 fn code_eof(&mut self) -> ::std::io::Result<Option<Self::Item>> {
224 self.try_finish()?;
225 let b = self.get_mut().take();
226 if !b.is_empty() {
227 Ok(Some(b))
228 } else {
229 Ok(None)
230 }
231 }
232 }
233 };
234}
235
236fn try_downcast_to_bytes<T: 'static>(item: T) -> Result<Bytes, T> {
237 use core::any::Any;
238
239 let item = &mut Some(item);
240 match (item as &mut dyn Any).downcast_mut::<Option<Bytes>>() {
241 Some(bytes) => Ok(bytes.take().unwrap()),
242 None => Err(item.take().unwrap()),
243 }
244}
245
246#[cfg(test)]
247mod test {
248 use super::*;
249
250 #[test]
251 fn downcast_bytes() {
252 let bytes = Bytes::new();
253 assert!(try_downcast_to_bytes(bytes).is_ok());
254 let bytes = Vec::<u8>::new();
255 assert!(try_downcast_to_bytes(bytes).is_err());
256 }
257}