1use core::{
2 future::Future,
3 pin::Pin,
4 task::{ready, Context, Poll},
5};
6
7use std::io;
8
9use bytes::{Bytes, BytesMut};
10use futures_core::stream::Stream;
11use pin_project_lite::pin_project;
12
13use super::runtime::ChunkRead;
14
15pin_project! {
16 #[project = ChunkReaderProj]
18 pub enum ChunkReader<F>
19 where
20 F: ChunkRead,
21 {
22 Empty,
23 Reader {
24 #[pin]
25 reader: _ChunkReader<F>
26 }
27 }
28}
29
30impl<F> ChunkReader<F>
31where
32 F: ChunkRead,
33{
34 pub(super) fn empty() -> Self {
35 Self::Empty
36 }
37
38 pub(super) fn reader(file: F, size: u64, chunk_size: usize) -> Self {
39 Self::Reader {
40 reader: _ChunkReader {
41 chunk_size,
42 size,
43 on_flight: file.next(BytesMut::with_capacity(chunk_size)),
44 },
45 }
46 }
47}
48
49impl<F> Stream for ChunkReader<F>
50where
51 F: ChunkRead,
52{
53 type Item = io::Result<Bytes>;
54
55 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
56 match self.project() {
57 ChunkReaderProj::Empty => Poll::Ready(None),
58 ChunkReaderProj::Reader { reader } => reader.poll_next(cx),
59 }
60 }
61
62 fn size_hint(&self) -> (usize, Option<usize>) {
63 match self {
64 Self::Empty => (usize::MAX, Some(0)),
66 Self::Reader { ref reader } => reader.size_hint(),
67 }
68 }
69}
70
71pin_project! {
72 #[doc(hidden)]
73 pub struct _ChunkReader<F>
74 where
75 F: ChunkRead,
76 {
77 chunk_size: usize,
78 size: u64,
79 #[pin]
80 on_flight: F::Future
81 }
82}
83
84impl<F> Stream for _ChunkReader<F>
85where
86 F: ChunkRead,
87{
88 type Item = io::Result<Bytes>;
89
90 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
91 let mut this = self.project();
92
93 if *this.size == 0 {
94 return Poll::Ready(None);
95 }
96
97 Poll::Ready(ready!(this.on_flight.as_mut().poll(cx))?.map(|(file, mut bytes, n)| {
98 let mut chunk = bytes.split_to(n);
99
100 let n = n as u64;
101
102 if *this.size <= n {
103 if *this.size < n {
104 chunk.truncate(*this.size as usize);
108 }
109 *this.size = 0;
110 return Ok(chunk.freeze());
111 }
112
113 *this.size -= n;
114
115 bytes.reserve(*this.chunk_size);
118 this.on_flight.set(file.next(bytes));
119
120 Ok(chunk.freeze())
121 }))
122 }
123
124 #[inline]
125 fn size_hint(&self) -> (usize, Option<usize>) {
126 let size = self.size as usize;
127 (size, Some(size))
128 }
129}