multipart_write/write/
buffered.rs1use std::collections::VecDeque;
2use std::fmt::{self, Debug, Formatter};
3use std::pin::Pin;
4use std::task::{Context, Poll};
5
6use futures_core::ready;
7
8use crate::{FusedMultipartWrite, MultipartWrite};
9
10pin_project_lite::pin_project! {
11 #[must_use = "futures do nothing unless polled"]
13 pub struct Buffered<Wr: MultipartWrite<Part>, Part> {
14 #[pin]
15 writer: Wr,
16 capacity: usize,
17 buf: VecDeque<Part>,
18 recv: Vec<Wr::Recv>,
19 }
20}
21
22impl<Part, Wr: MultipartWrite<Part>> Buffered<Wr, Part> {
23 pub(super) fn new(writer: Wr, capacity: usize) -> Self {
24 Self {
25 writer,
26 capacity,
27 buf: VecDeque::with_capacity(capacity),
28 recv: Vec::with_capacity(capacity),
29 }
30 }
31
32 pub fn into_inner(self) -> Wr {
34 self.writer
35 }
36
37 pub fn get_ref(&self) -> &Wr {
39 &self.writer
40 }
41
42 pub fn get_mut(&mut self) -> &mut Wr {
46 &mut self.writer
47 }
48
49 pub fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut Wr> {
53 self.project().writer
54 }
55
56 fn try_empty_buffer(
57 self: Pin<&mut Self>,
58 cx: &mut Context<'_>,
59 ) -> Poll<Result<(), Wr::Error>> {
60 let mut this = self.project();
61 ready!(this.writer.as_mut().poll_ready(cx))?;
63 while let Some(part) = this.buf.pop_front() {
64 let recv = this.writer.as_mut().start_send(part)?;
65 this.recv.push(recv);
66 if !this.buf.is_empty() {
67 ready!(this.writer.as_mut().poll_ready(cx))?;
71 }
72 }
73 Poll::Ready(Ok(()))
74 }
75}
76
77impl<Part, Wr: FusedMultipartWrite<Part>> FusedMultipartWrite<Part>
78 for Buffered<Wr, Part>
79{
80 fn is_terminated(&self) -> bool {
81 self.writer.is_terminated()
82 }
83}
84
85impl<Part, Wr: MultipartWrite<Part>> MultipartWrite<Part>
86 for Buffered<Wr, Part>
87{
88 type Error = Wr::Error;
89 type Output = Wr::Output;
90 type Recv = Option<Vec<Wr::Recv>>;
91
92 fn poll_ready(
93 mut self: Pin<&mut Self>,
94 cx: &mut Context<'_>,
95 ) -> Poll<Result<(), Self::Error>> {
96 if self.capacity == 0 {
97 return self.project().writer.poll_ready(cx);
98 }
99 ready!(self.as_mut().try_empty_buffer(cx))?;
100 if self.buf.len() >= self.capacity {
101 Poll::Pending
102 } else {
103 Poll::Ready(Ok(()))
104 }
105 }
106
107 fn start_send(
108 self: Pin<&mut Self>,
109 part: Part,
110 ) -> Result<Self::Recv, Self::Error> {
111 if self.capacity == 0 {
112 let recv = self.project().writer.start_send(part)?;
113 return Ok(Some(vec![recv]));
114 }
115 let this = self.project();
116 this.buf.push_back(part);
117 if this.recv.len() >= *this.capacity {
120 let new_recv = Vec::with_capacity(*this.capacity);
121 let recv = std::mem::replace(this.recv, new_recv);
122 Ok(Some(recv))
123 } else {
124 Ok(None)
125 }
126 }
127
128 fn poll_flush(
129 mut self: Pin<&mut Self>,
130 cx: &mut Context<'_>,
131 ) -> Poll<Result<(), Self::Error>> {
132 ready!(self.as_mut().try_empty_buffer(cx))?;
133 self.project().writer.poll_flush(cx)
134 }
135
136 fn poll_complete(
137 mut self: Pin<&mut Self>,
138 cx: &mut Context<'_>,
139 ) -> Poll<Result<Self::Output, Self::Error>> {
140 ready!(self.as_mut().try_empty_buffer(cx))?;
141 self.project().writer.poll_complete(cx)
142 }
143}
144
145impl<Wr, Part> Debug for Buffered<Wr, Part>
146where
147 Part: Debug,
148 Wr: Debug + MultipartWrite<Part>,
149 Wr::Recv: Debug,
150{
151 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
152 f.debug_struct("Buffered")
153 .field("writer", &self.writer)
154 .field("capacity", &self.capacity)
155 .field("buf", &self.buf)
156 .field("recv", &self.recv)
157 .finish()
158 }
159}