hyper_static/
streamer.rs

1use crate::ErrorBoxed;
2use async_stream::stream;
3use futures::{Stream, StreamExt};
4use hyper::body::{Body, Buf, Frame, SizeHint};
5use std::collections::VecDeque;
6use std::marker::PhantomData;
7use std::pin::Pin;
8use std::task::{Context, Poll};
9use tokio::io::{AsyncRead, AsyncReadExt};
10
11#[derive(Default)]
12pub struct Empty<D> {
13    _marker: PhantomData<fn() -> D>,
14}
15
16impl<D> Empty<D>
17where
18    D: Default,
19{
20    pub fn new() -> Self {
21        Self::default()
22    }
23}
24
25impl<D: Buf> Body for Empty<D> {
26    type Data = D;
27    type Error = ErrorBoxed;
28
29    #[inline]
30    fn poll_frame(
31        self: Pin<&mut Self>,
32        _cx: &mut Context<'_>,
33    ) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
34        Poll::Ready(None)
35    }
36
37    fn is_end_stream(&self) -> bool {
38        true
39    }
40
41    fn size_hint(&self) -> SizeHint {
42        SizeHint::with_exact(0)
43    }
44}
45
46pub struct File<R>
47where
48    R: AsyncRead + Unpin + Send + 'static,
49{
50    reader: R,
51    buf_size: usize,
52}
53
54macro_rules! boxed_err {
55    ($e: expr) => {{
56        let err: ErrorBoxed = Box::new($e);
57        err
58    }};
59}
60
61impl<R> File<R>
62where
63    R: AsyncRead + Unpin + Send + 'static,
64{
65    #[inline]
66    pub fn new(reader: R, buf_size: usize) -> Self {
67        Self { reader, buf_size }
68    }
69    pub fn into_stream(
70        mut self,
71    ) -> Pin<Box<impl ?Sized + Stream<Item = Result<Frame<VecDeque<u8>>, ErrorBoxed>> + 'static>>
72    {
73        let stream = stream! {
74            loop {
75                let mut buf = vec![0; self.buf_size];
76                let r = self.reader.read(&mut buf).await.map_err(|e| boxed_err!(e))?;
77                if r == 0 {
78                    break
79                }
80                buf.truncate(r);
81                yield Ok(Frame::data(buf.into()));
82            }
83        };
84        stream.boxed()
85    }
86    // allow truncation as truncated remaining is always less than buf_size: usize
87    #[allow(clippy::cast_possible_truncation)]
88    pub fn into_stream_sized(
89        mut self,
90        max_length: u64,
91    ) -> Pin<Box<impl ?Sized + Stream<Item = Result<Frame<VecDeque<u8>>, ErrorBoxed>> + 'static>>
92    {
93        let stream = stream! {
94            let mut remaining = max_length;
95            loop {
96                if remaining == 0 {
97                    break;
98                }
99                let bs = if remaining >= self.buf_size as u64 {
100                    self.buf_size
101                } else {
102                    remaining as usize
103                };
104                let mut buf = vec![0; bs];
105                let r = self.reader.read(&mut buf).await.map_err(|e| boxed_err!(e))?;
106                if r == 0 {
107                    break;
108                }
109                buf.truncate(r);
110                yield Ok(Frame::data(buf.into()));
111                remaining -= r as u64;
112            }
113        };
114        stream.boxed()
115    }
116}