multipart_write/write/
buffered.rs1use crate::{FusedMultipartWrite, MultipartWrite};
2
3use futures_core::ready;
4use std::collections::VecDeque;
5use std::fmt::{self, Debug, Formatter};
6use std::pin::Pin;
7use std::task::{Context, Poll};
8
9pin_project_lite::pin_project! {
10 #[must_use = "futures do nothing unless polled"]
14 pub struct Buffered<Wr, Part> {
15 #[pin]
16 writer: Wr,
17 capacity: usize,
18 buf: VecDeque<Part>,
19 }
20}
21
22impl<Wr, Part> Buffered<Wr, Part> {
23 pub(super) fn new(writer: Wr, capacity: usize) -> Self {
24 Self {
25 writer,
26 capacity,
27 buf: VecDeque::with_capacity(capacity),
28 }
29 }
30
31 pub fn get_ref(&self) -> &Wr {
33 &self.writer
34 }
35
36 pub fn get_mut(&mut self) -> &mut Wr {
40 &mut self.writer
41 }
42
43 pub fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut Wr> {
47 self.project().writer
48 }
49
50 fn try_empty_buffer(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Wr::Error>>
51 where
52 Wr: MultipartWrite<Part>,
53 {
54 let mut this = self.project();
55
56 ready!(this.writer.as_mut().poll_ready(cx))?;
57 while let Some(part) = this.buf.pop_front() {
58 this.writer.as_mut().start_send(part)?;
59 if !this.buf.is_empty() {
60 ready!(this.writer.as_mut().poll_ready(cx))?;
61 }
62 }
63 Poll::Ready(Ok(()))
64 }
65}
66
67impl<Wr, Part> FusedMultipartWrite<Part> for Buffered<Wr, Part>
68where
69 Wr: FusedMultipartWrite<Part>,
70{
71 fn is_terminated(&self) -> bool {
72 self.writer.is_terminated()
73 }
74}
75
76impl<Wr, Part> MultipartWrite<Part> for Buffered<Wr, Part>
77where
78 Wr: MultipartWrite<Part>,
79{
80 type Ret = ();
81 type Output = Wr::Output;
82 type Error = Wr::Error;
83
84 fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
85 if self.capacity == 0 {
86 return self.project().writer.poll_ready(cx);
87 }
88 let _ = self.as_mut().try_empty_buffer(cx)?;
89 if self.buf.len() >= self.capacity {
90 Poll::Pending
91 } else {
92 Poll::Ready(Ok(()))
93 }
94 }
95
96 fn start_send(self: Pin<&mut Self>, part: Part) -> Result<Self::Ret, Self::Error> {
97 if self.capacity == 0 {
98 let _ = self.project().writer.start_send(part)?;
99 } else {
100 self.project().buf.push_back(part);
101 }
102 Ok(())
103 }
104
105 fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
106 ready!(self.as_mut().try_empty_buffer(cx))?;
107 self.project().writer.poll_flush(cx)
108 }
109
110 fn poll_complete(
111 mut self: Pin<&mut Self>,
112 cx: &mut Context<'_>,
113 ) -> Poll<Result<Self::Output, Self::Error>> {
114 ready!(self.as_mut().try_empty_buffer(cx))?;
115 self.project().writer.poll_complete(cx)
116 }
117}
118
119impl<Wr: Debug, Part: Debug> Debug for Buffered<Wr, Part> {
120 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
121 f.debug_struct("Buffered")
122 .field("writer", &self.writer)
123 .field("capacity", &self.capacity)
124 .field("buf", &self.buf)
125 .finish()
126 }
127}