Skip to main content

multipart_write/write/
buffered.rs

1use std::collections::VecDeque;
2use std::fmt::{self, Debug, Formatter};
3use std::pin::Pin;
4use std::task::{Context, Poll};
5
6use futures_core::ready;
7
8use crate::{FusedMultipartWrite, MultipartWrite};
9
10pin_project_lite::pin_project! {
11    /// `MultipartWrite` for the [`buffered`] method.
12    ///
13    /// [`buffered`]: super::MultipartWriteExt::buffered
14    #[must_use = "futures do nothing unless polled"]
15    pub struct Buffered<Wr: MultipartWrite<Part>, Part> {
16        #[pin]
17        writer: Wr,
18        capacity: usize,
19        buf: VecDeque<Part>,
20        recv: Vec<Wr::Recv>,
21    }
22}
23
24impl<Part, Wr: MultipartWrite<Part>> Buffered<Wr, Part> {
25    pub(super) fn new(writer: Wr, capacity: usize) -> Self {
26        Self {
27            writer,
28            capacity,
29            buf: VecDeque::with_capacity(capacity),
30            recv: Vec::with_capacity(capacity),
31        }
32    }
33
34    /// Consumes `Buffered`, returning the underlying writer.
35    pub fn into_inner(self) -> Wr {
36        self.writer
37    }
38
39    /// Acquires a reference to the underlying writer.
40    pub fn get_ref(&self) -> &Wr {
41        &self.writer
42    }
43
44    /// Acquires a mutable reference to the underlying writer.
45    ///
46    /// It is inadvisable to directly write to the underlying writer.
47    pub fn get_mut(&mut self) -> &mut Wr {
48        &mut self.writer
49    }
50
51    /// Acquires a pinned mutable reference to the underlying writer.
52    ///
53    /// It is inadvisable to directly write to the underlying writer.
54    pub fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut Wr> {
55        self.project().writer
56    }
57
58    fn try_empty_buffer(
59        self: Pin<&mut Self>,
60        cx: &mut Context<'_>,
61    ) -> Poll<Result<(), Wr::Error>> {
62        let mut this = self.project();
63        // Must check readiness of `Wr`.
64        ready!(this.writer.as_mut().poll_ready(cx))?;
65        while let Some(part) = this.buf.pop_front() {
66            let recv = this.writer.as_mut().start_send(part)?;
67            this.recv.push(recv);
68            if !this.buf.is_empty() {
69                // Check readiness again; if ready we'll stay in the loop.  If
70                // not we'll re-enter at the top and that readiness check will
71                // cover it.
72                ready!(this.writer.as_mut().poll_ready(cx))?;
73            }
74        }
75        Poll::Ready(Ok(()))
76    }
77}
78
79impl<Part, Wr: FusedMultipartWrite<Part>> FusedMultipartWrite<Part>
80    for Buffered<Wr, Part>
81{
82    fn is_terminated(&self) -> bool {
83        self.writer.is_terminated()
84    }
85}
86
87impl<Part, Wr: MultipartWrite<Part>> MultipartWrite<Part>
88    for Buffered<Wr, Part>
89{
90    type Error = Wr::Error;
91    type Output = Wr::Output;
92    type Recv = Option<Vec<Wr::Recv>>;
93
94    fn poll_ready(
95        mut self: Pin<&mut Self>,
96        cx: &mut Context<'_>,
97    ) -> Poll<Result<(), Self::Error>> {
98        if self.capacity == 0 {
99            return self.project().writer.poll_ready(cx);
100        }
101        ready!(self.as_mut().try_empty_buffer(cx))?;
102        if self.buf.len() >= self.capacity {
103            Poll::Pending
104        } else {
105            Poll::Ready(Ok(()))
106        }
107    }
108
109    fn start_send(
110        self: Pin<&mut Self>,
111        part: Part,
112    ) -> Result<Self::Recv, Self::Error> {
113        if self.capacity == 0 {
114            let recv = self.project().writer.start_send(part)?;
115            return Ok(Some(vec![recv]));
116        }
117        let this = self.project();
118        this.buf.push_back(part);
119        // If we have accumulated enough of the values returned by the inner
120        // writer's `start_send` return the vector of them.
121        if this.recv.len() >= *this.capacity {
122            let new_recv = Vec::with_capacity(*this.capacity);
123            let recv = std::mem::replace(this.recv, new_recv);
124            Ok(Some(recv))
125        } else {
126            Ok(None)
127        }
128    }
129
130    fn poll_flush(
131        mut self: Pin<&mut Self>,
132        cx: &mut Context<'_>,
133    ) -> Poll<Result<(), Self::Error>> {
134        ready!(self.as_mut().try_empty_buffer(cx))?;
135        self.project().writer.poll_flush(cx)
136    }
137
138    fn poll_complete(
139        mut self: Pin<&mut Self>,
140        cx: &mut Context<'_>,
141    ) -> Poll<Result<Self::Output, Self::Error>> {
142        ready!(self.as_mut().try_empty_buffer(cx))?;
143        self.project().writer.poll_complete(cx)
144    }
145}
146
147impl<Wr, Part> Debug for Buffered<Wr, Part>
148where
149    Part: Debug,
150    Wr: Debug + MultipartWrite<Part>,
151    Wr::Recv: Debug,
152{
153    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
154        f.debug_struct("Buffered")
155            .field("writer", &self.writer)
156            .field("capacity", &self.capacity)
157            .field("buf", &self.buf)
158            .field("recv", &self.recv)
159            .finish()
160    }
161}