multipart_write/write/
buffered.rs1use crate::{FusedMultipartWrite, MultipartWrite};
2
3use std::collections::VecDeque;
4use std::pin::Pin;
5use std::task::{self, Context, Poll};
6
7#[must_use = "futures do nothing unless polled"]
11#[derive(Debug)]
12#[pin_project::pin_project]
13pub struct Buffered<Wr, Part> {
14 #[pin]
15 writer: Wr,
16 capacity: usize,
17 buf: VecDeque<Part>,
18}
19
20impl<Wr: MultipartWrite<Part>, Part> Buffered<Wr, Part> {
21 pub(super) fn new(writer: Wr, capacity: usize) -> Self {
22 Self {
23 writer,
24 capacity,
25 buf: VecDeque::with_capacity(capacity),
26 }
27 }
28
29 pub fn get_ref(&self) -> &Wr {
31 &self.writer
32 }
33
34 pub fn get_mut(&mut self) -> &mut Wr {
38 &mut self.writer
39 }
40
41 pub fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut Wr> {
45 self.project().writer
46 }
47
48 fn try_empty_buffer(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Wr::Error>> {
49 let mut this = self.project();
50
51 task::ready!(this.writer.as_mut().poll_ready(cx))?;
52 while let Some(part) = this.buf.pop_front() {
53 this.writer.as_mut().start_send(part)?;
54 if !this.buf.is_empty() {
55 task::ready!(this.writer.as_mut().poll_ready(cx))?;
56 }
57 }
58 Poll::Ready(Ok(()))
59 }
60}
61
62impl<Wr, Part> FusedMultipartWrite<Part> for Buffered<Wr, Part>
63where
64 Wr: FusedMultipartWrite<Part>,
65{
66 fn is_terminated(&self) -> bool {
67 self.writer.is_terminated()
68 }
69}
70
71impl<Wr, Part> MultipartWrite<Part> for Buffered<Wr, Part>
72where
73 Wr: MultipartWrite<Part>,
74{
75 type Ret = ();
76 type Output = Wr::Output;
77 type Error = Wr::Error;
78
79 fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
80 if self.capacity == 0 {
81 return self.project().writer.poll_ready(cx);
82 }
83 let _ = self.as_mut().try_empty_buffer(cx)?;
84 if self.buf.len() >= self.capacity {
85 Poll::Pending
86 } else {
87 Poll::Ready(Ok(()))
88 }
89 }
90
91 fn start_send(self: Pin<&mut Self>, part: Part) -> Result<Self::Ret, Self::Error> {
92 if self.capacity == 0 {
93 let _ = self.project().writer.start_send(part)?;
94 } else {
95 self.project().buf.push_back(part);
96 }
97 Ok(())
98 }
99
100 fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
101 task::ready!(self.as_mut().try_empty_buffer(cx))?;
102 self.project().writer.poll_flush(cx)
103 }
104
105 fn poll_complete(
106 mut self: Pin<&mut Self>,
107 cx: &mut Context<'_>,
108 ) -> Poll<Result<Self::Output, Self::Error>> {
109 task::ready!(self.as_mut().try_empty_buffer(cx))?;
110 self.project().writer.poll_complete(cx)
111 }
112}