1use crate::ErrorBoxed;
2use async_stream::stream;
3use futures::{Stream, StreamExt};
4use hyper::body::{Body, Buf, Frame, SizeHint};
5use std::collections::VecDeque;
6use std::marker::PhantomData;
7use std::pin::Pin;
8use std::task::{Context, Poll};
9use tokio::io::{AsyncRead, AsyncReadExt};
10
11#[derive(Default)]
12pub struct Empty<D> {
13 _marker: PhantomData<fn() -> D>,
14}
15
16impl<D> Empty<D>
17where
18 D: Default,
19{
20 pub fn new() -> Self {
21 Self::default()
22 }
23}
24
25impl<D: Buf> Body for Empty<D> {
26 type Data = D;
27 type Error = ErrorBoxed;
28
29 #[inline]
30 fn poll_frame(
31 self: Pin<&mut Self>,
32 _cx: &mut Context<'_>,
33 ) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
34 Poll::Ready(None)
35 }
36
37 fn is_end_stream(&self) -> bool {
38 true
39 }
40
41 fn size_hint(&self) -> SizeHint {
42 SizeHint::with_exact(0)
43 }
44}
45
46pub struct File<R>
47where
48 R: AsyncRead + Unpin + Send + 'static,
49{
50 reader: R,
51 buf_size: usize,
52}
53
54macro_rules! boxed_err {
55 ($e: expr) => {{
56 let err: ErrorBoxed = Box::new($e);
57 err
58 }};
59}
60
61impl<R> File<R>
62where
63 R: AsyncRead + Unpin + Send + 'static,
64{
65 #[inline]
66 pub fn new(reader: R, buf_size: usize) -> Self {
67 Self { reader, buf_size }
68 }
69 pub fn into_stream(
70 mut self,
71 ) -> Pin<Box<impl ?Sized + Stream<Item = Result<Frame<VecDeque<u8>>, ErrorBoxed>> + 'static>>
72 {
73 let stream = stream! {
74 loop {
75 let mut buf = vec![0; self.buf_size];
76 let r = self.reader.read(&mut buf).await.map_err(|e| boxed_err!(e))?;
77 if r == 0 {
78 break
79 }
80 buf.truncate(r);
81 yield Ok(Frame::data(buf.into()));
82 }
83 };
84 stream.boxed()
85 }
86 #[allow(clippy::cast_possible_truncation)]
88 pub fn into_stream_sized(
89 mut self,
90 max_length: u64,
91 ) -> Pin<Box<impl ?Sized + Stream<Item = Result<Frame<VecDeque<u8>>, ErrorBoxed>> + 'static>>
92 {
93 let stream = stream! {
94 let mut remaining = max_length;
95 loop {
96 if remaining == 0 {
97 break;
98 }
99 let bs = if remaining >= self.buf_size as u64 {
100 self.buf_size
101 } else {
102 remaining as usize
103 };
104 let mut buf = vec![0; bs];
105 let r = self.reader.read(&mut buf).await.map_err(|e| boxed_err!(e))?;
106 if r == 0 {
107 break;
108 }
109 buf.truncate(r);
110 yield Ok(Frame::data(buf.into()));
111 remaining -= r as u64;
112 }
113 };
114 stream.boxed()
115 }
116}