http_file/
chunk.rs

1use core::{
2    future::Future,
3    pin::Pin,
4    task::{ready, Context, Poll},
5};
6
7use std::io;
8
9use bytes::{Bytes, BytesMut};
10use futures_core::stream::Stream;
11use pin_project_lite::pin_project;
12
13use super::runtime::ChunkRead;
14
15pin_project! {
16    /// chunked file reader with async [Stream]
17    #[project = ChunkReaderProj]
18    pub enum ChunkReader<F>
19    where
20        F: ChunkRead,
21    {
22        Empty,
23        Reader {
24            #[pin]
25            reader:  _ChunkReader<F>
26        }
27    }
28}
29
30impl<F> ChunkReader<F>
31where
32    F: ChunkRead,
33{
34    pub(super) fn empty() -> Self {
35        Self::Empty
36    }
37
38    pub(super) fn reader(file: F, size: u64, chunk_size: usize) -> Self {
39        Self::Reader {
40            reader: _ChunkReader {
41                chunk_size,
42                size,
43                on_flight: file.next(BytesMut::with_capacity(chunk_size)),
44            },
45        }
46    }
47}
48
49impl<F> Stream for ChunkReader<F>
50where
51    F: ChunkRead,
52{
53    type Item = io::Result<Bytes>;
54
55    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
56        match self.project() {
57            ChunkReaderProj::Empty => Poll::Ready(None),
58            ChunkReaderProj::Reader { reader } => reader.poll_next(cx),
59        }
60    }
61
62    fn size_hint(&self) -> (usize, Option<usize>) {
63        match self {
64            // see xitca_http::body::none_body_hint for reason. this is a library hack.
65            Self::Empty => (usize::MAX, Some(0)),
66            Self::Reader { ref reader } => reader.size_hint(),
67        }
68    }
69}
70
71pin_project! {
72    #[doc(hidden)]
73    pub struct _ChunkReader<F>
74    where
75        F: ChunkRead,
76    {
77        chunk_size: usize,
78        size: u64,
79        #[pin]
80        on_flight: F::Future
81    }
82}
83
84impl<F> Stream for _ChunkReader<F>
85where
86    F: ChunkRead,
87{
88    type Item = io::Result<Bytes>;
89
90    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
91        let mut this = self.project();
92
93        if *this.size == 0 {
94            return Poll::Ready(None);
95        }
96
97        Poll::Ready(ready!(this.on_flight.as_mut().poll(cx))?.map(|(file, mut bytes, n)| {
98            let mut chunk = bytes.split_to(n);
99
100            let n = n as u64;
101
102            if *this.size <= n {
103                if *this.size < n {
104                    // an unlikely case happen when someone append data to file while it's being
105                    // read.
106                    // drop the extra part. only self.size bytes of data were promised to client.
107                    chunk.truncate(*this.size as usize);
108                }
109                *this.size = 0;
110                return Ok(chunk.freeze());
111            }
112
113            *this.size -= n;
114
115            // TODO: better handling additional memory alloc?
116            // the goal should be linear growth targeting page size.
117            bytes.reserve(*this.chunk_size);
118            this.on_flight.set(file.next(bytes));
119
120            Ok(chunk.freeze())
121        }))
122    }
123
124    #[inline]
125    fn size_hint(&self) -> (usize, Option<usize>) {
126        let size = self.size as usize;
127        (size, Some(size))
128    }
129}