Skip to main content

multipart_write/write/
buffered.rs

1use std::collections::VecDeque;
2use std::fmt::{self, Debug, Formatter};
3use std::pin::Pin;
4use std::task::{Context, Poll};
5
6use futures_core::ready;
7
8use crate::{FusedMultipartWrite, MultipartWrite};
9
10pin_project_lite::pin_project! {
11    /// `MultipartWrite` for [`buffered`](super::MultipartWriteExt::buffered).
12    #[must_use = "futures do nothing unless polled"]
13    pub struct Buffered<Wr: MultipartWrite<Part>, Part> {
14        #[pin]
15        writer: Wr,
16        capacity: usize,
17        buf: VecDeque<Part>,
18        recv: Vec<Wr::Recv>,
19    }
20}
21
22impl<Part, Wr: MultipartWrite<Part>> Buffered<Wr, Part> {
23    pub(super) fn new(writer: Wr, capacity: usize) -> Self {
24        Self {
25            writer,
26            capacity,
27            buf: VecDeque::with_capacity(capacity),
28            recv: Vec::with_capacity(capacity),
29        }
30    }
31
32    /// Consumes `Buffered`, returning the underlying writer.
33    pub fn into_inner(self) -> Wr {
34        self.writer
35    }
36
37    /// Acquires a reference to the underlying writer.
38    pub fn get_ref(&self) -> &Wr {
39        &self.writer
40    }
41
42    /// Acquires a mutable reference to the underlying writer.
43    ///
44    /// It is inadvisable to directly write to the underlying writer.
45    pub fn get_mut(&mut self) -> &mut Wr {
46        &mut self.writer
47    }
48
49    /// Acquires a pinned mutable reference to the underlying writer.
50    ///
51    /// It is inadvisable to directly write to the underlying writer.
52    pub fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut Wr> {
53        self.project().writer
54    }
55
56    fn try_empty_buffer(
57        self: Pin<&mut Self>,
58        cx: &mut Context<'_>,
59    ) -> Poll<Result<(), Wr::Error>> {
60        let mut this = self.project();
61        // Must check readiness of `Wr`.
62        ready!(this.writer.as_mut().poll_ready(cx))?;
63        while let Some(part) = this.buf.pop_front() {
64            let recv = this.writer.as_mut().start_send(part)?;
65            this.recv.push(recv);
66            if !this.buf.is_empty() {
67                // Check readiness again; if ready we'll stay in the loop.  If
68                // not we'll re-enter at the top and that readiness check will
69                // cover it.
70                ready!(this.writer.as_mut().poll_ready(cx))?;
71            }
72        }
73        Poll::Ready(Ok(()))
74    }
75}
76
77impl<Part, Wr: FusedMultipartWrite<Part>> FusedMultipartWrite<Part>
78    for Buffered<Wr, Part>
79{
80    fn is_terminated(&self) -> bool {
81        self.writer.is_terminated()
82    }
83}
84
85impl<Part, Wr: MultipartWrite<Part>> MultipartWrite<Part>
86    for Buffered<Wr, Part>
87{
88    type Error = Wr::Error;
89    type Output = Wr::Output;
90    type Recv = Option<Vec<Wr::Recv>>;
91
92    fn poll_ready(
93        mut self: Pin<&mut Self>,
94        cx: &mut Context<'_>,
95    ) -> Poll<Result<(), Self::Error>> {
96        if self.capacity == 0 {
97            return self.project().writer.poll_ready(cx);
98        }
99        ready!(self.as_mut().try_empty_buffer(cx))?;
100        if self.buf.len() >= self.capacity {
101            Poll::Pending
102        } else {
103            Poll::Ready(Ok(()))
104        }
105    }
106
107    fn start_send(
108        self: Pin<&mut Self>,
109        part: Part,
110    ) -> Result<Self::Recv, Self::Error> {
111        if self.capacity == 0 {
112            let recv = self.project().writer.start_send(part)?;
113            return Ok(Some(vec![recv]));
114        }
115        let this = self.project();
116        this.buf.push_back(part);
117        // If we have accumulated enough of the values returned by the inner
118        // writer's `start_send` return the vector of them.
119        if this.recv.len() >= *this.capacity {
120            let new_recv = Vec::with_capacity(*this.capacity);
121            let recv = std::mem::replace(this.recv, new_recv);
122            Ok(Some(recv))
123        } else {
124            Ok(None)
125        }
126    }
127
128    fn poll_flush(
129        mut self: Pin<&mut Self>,
130        cx: &mut Context<'_>,
131    ) -> Poll<Result<(), Self::Error>> {
132        ready!(self.as_mut().try_empty_buffer(cx))?;
133        self.project().writer.poll_flush(cx)
134    }
135
136    fn poll_complete(
137        mut self: Pin<&mut Self>,
138        cx: &mut Context<'_>,
139    ) -> Poll<Result<Self::Output, Self::Error>> {
140        ready!(self.as_mut().try_empty_buffer(cx))?;
141        self.project().writer.poll_complete(cx)
142    }
143}
144
145impl<Wr, Part> Debug for Buffered<Wr, Part>
146where
147    Part: Debug,
148    Wr: Debug + MultipartWrite<Part>,
149    Wr::Recv: Debug,
150{
151    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
152        f.debug_struct("Buffered")
153            .field("writer", &self.writer)
154            .field("capacity", &self.capacity)
155            .field("buf", &self.buf)
156            .field("recv", &self.recv)
157            .finish()
158    }
159}