multipart_write/write/
buffered.rs

1use crate::{FusedMultipartWrite, MultipartWrite};
2
3use std::collections::VecDeque;
4use std::pin::Pin;
5use std::task::{self, Context, Poll};
6
7/// `MultipartWrite` for the [`buffered`] method.
8///
9/// [`buffered`]: super::MultipartWriteExt::buffered
10#[must_use = "futures do nothing unless polled"]
11#[derive(Debug)]
12#[pin_project::pin_project]
13pub struct Buffered<Wr, Part> {
14    #[pin]
15    writer: Wr,
16    capacity: usize,
17    buf: VecDeque<Part>,
18}
19
20impl<Wr: MultipartWrite<Part>, Part> Buffered<Wr, Part> {
21    pub(super) fn new(writer: Wr, capacity: usize) -> Self {
22        Self {
23            writer,
24            capacity,
25            buf: VecDeque::with_capacity(capacity),
26        }
27    }
28
29    /// Acquires a reference to the underlying writer.
30    pub fn get_ref(&self) -> &Wr {
31        &self.writer
32    }
33
34    /// Acquires a mutable reference to the underlying writer.
35    ///
36    /// It is inadvisable to directly write to the underlying writer.
37    pub fn get_mut(&mut self) -> &mut Wr {
38        &mut self.writer
39    }
40
41    /// Acquires a pinned mutable reference to the underlying writer.
42    ///
43    /// It is inadvisable to directly write to the underlying writer.
44    pub fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut Wr> {
45        self.project().writer
46    }
47
48    fn try_empty_buffer(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Wr::Error>> {
49        let mut this = self.project();
50
51        task::ready!(this.writer.as_mut().poll_ready(cx))?;
52        while let Some(part) = this.buf.pop_front() {
53            this.writer.as_mut().start_send(part)?;
54            if !this.buf.is_empty() {
55                task::ready!(this.writer.as_mut().poll_ready(cx))?;
56            }
57        }
58        Poll::Ready(Ok(()))
59    }
60}
61
62impl<Wr, Part> FusedMultipartWrite<Part> for Buffered<Wr, Part>
63where
64    Wr: FusedMultipartWrite<Part>,
65{
66    fn is_terminated(&self) -> bool {
67        self.writer.is_terminated()
68    }
69}
70
71impl<Wr, Part> MultipartWrite<Part> for Buffered<Wr, Part>
72where
73    Wr: MultipartWrite<Part>,
74{
75    type Ret = ();
76    type Output = Wr::Output;
77    type Error = Wr::Error;
78
79    fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
80        if self.capacity == 0 {
81            return self.project().writer.poll_ready(cx);
82        }
83        let _ = self.as_mut().try_empty_buffer(cx)?;
84        if self.buf.len() >= self.capacity {
85            Poll::Pending
86        } else {
87            Poll::Ready(Ok(()))
88        }
89    }
90
91    fn start_send(self: Pin<&mut Self>, part: Part) -> Result<Self::Ret, Self::Error> {
92        if self.capacity == 0 {
93            let _ = self.project().writer.start_send(part)?;
94        } else {
95            self.project().buf.push_back(part);
96        }
97        Ok(())
98    }
99
100    fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
101        task::ready!(self.as_mut().try_empty_buffer(cx))?;
102        self.project().writer.poll_flush(cx)
103    }
104
105    fn poll_complete(
106        mut self: Pin<&mut Self>,
107        cx: &mut Context<'_>,
108    ) -> Poll<Result<Self::Output, Self::Error>> {
109        task::ready!(self.as_mut().try_empty_buffer(cx))?;
110        self.project().writer.poll_complete(cx)
111    }
112}