1use crate::fast_buf::ConsumeBuf;
2use crate::fast_buf::FastBuf;
3use crate::limit::LimitWrite;
4use crate::mpsc::{Receiver, Sender};
5use crate::server::{DriveExternal, SyncDriveExternal};
6use crate::AsyncRead;
7use crate::Error;
8use futures_util::future::poll_fn;
9use futures_util::ready;
10use std::fmt;
11use std::io;
12use std::io::Read;
13use std::mem;
14use std::pin::Pin;
15use std::task::{Context, Poll};
16
17pub struct SendStream {
24 tx_body: Sender<(Vec<u8>, bool)>,
25 limit: LimitWrite,
26 ended: bool,
27 drive_external: Option<SyncDriveExternal>,
28}
29
30impl SendStream {
31 pub(crate) fn new(
32 tx_body: Sender<(Vec<u8>, bool)>,
33 limit: LimitWrite,
34 ended: bool,
35 drive_external: Option<SyncDriveExternal>,
36 ) -> Self {
37 SendStream {
38 tx_body,
39 limit,
40 ended,
41 drive_external,
42 }
43 }
44
45 pub async fn send_data(&mut self, data: &[u8], end_of_body: bool) -> Result<(), Error> {
53 let data = Data::Shared(data);
54
55 self.do_send(data, end_of_body).await?;
56
57 Ok(())
58 }
59
60 pub async fn send_data_owned(&mut self, data: Vec<u8>, end_of_body: bool) -> Result<(), Error> {
71 let data = Data::Owned(data);
72
73 self.do_send(data, end_of_body).await?;
74
75 Ok(())
76 }
77
78 async fn do_send(&mut self, mut data: Data<'_>, end_of_body: bool) -> Result<(), Error> {
79 trace!("Send len={} end_of_body={}", data.len(), end_of_body);
80
81 poll_fn(|cx| self.poll_drive_server(cx)).await?;
82 poll_fn(|cx| Pin::new(&mut *self).poll_send_data(cx, &mut data, end_of_body)).await?;
83 poll_fn(|cx| self.poll_drive_server(cx)).await?;
84
85 if self.ended && self.drive_external.is_some() {
90 while !self.tx_body.is_empty() {
91 poll_fn(|cx| self.poll_drive_server(cx)).await?;
92 }
93 }
94
95 Ok(())
96 }
97
98 fn poll_drive_server(&mut self, cx: &mut Context) -> Poll<Result<(), io::Error>> {
99 if let Some(drive_external) = &self.drive_external {
100 drive_external.poll_drive_external(cx)
101 } else {
102 Ok(()).into()
103 }
104 }
105
106 fn poll_send_data(
111 self: Pin<&mut Self>,
112 cx: &mut Context,
113 data: &mut Data,
114 end: bool,
115 ) -> Poll<Result<(), Error>> {
116 let this = self.get_mut();
117
118 if this.ended && end && data.is_empty() {
119 return Ok(()).into();
121 }
122
123 if this.ended {
124 warn!("Body data is not expected");
125 return Err(Error::User("Body data is not expected".into())).into();
126 }
127
128 if !ready!(Pin::new(&this.tx_body).poll_ready(cx, true)) {
129 return Err(
130 io::Error::new(io::ErrorKind::ConnectionAborted, "Connection closed").into(),
131 )
132 .into();
133 }
134
135 let to_send = if data.is_owned() && this.limit.can_write_entire_vec() {
136 let data = data.take_owned();
141
142 this.limit.accept_entire_vec(&data);
144
145 data
146 } else {
147 let capacity = data.len() + this.limit.overhead();
150 let mut chunk = FastBuf::with_capacity(capacity);
151 this.limit.write(&data[..], &mut chunk)?;
152
153 if end {
154 this.ended = true;
155 this.limit.finish(&mut chunk)?;
156 }
157
158 chunk.into_vec()
159 };
160
161 let sent = this.tx_body.send((to_send, end));
162
163 if !sent {
164 return Err(
165 io::Error::new(io::ErrorKind::ConnectionAborted, "Connection closed").into(),
166 )
167 .into();
168 }
169
170 Ok(()).into()
171 }
172}
173
174pub struct RecvStream {
181 rx_body: Receiver<io::Result<Vec<u8>>>,
182 ready: Option<ConsumeBuf>,
183 ended: bool,
184 drive_external: Option<SyncDriveExternal>,
185}
186
187impl RecvStream {
188 pub(crate) fn new(
189 rx_body: Receiver<io::Result<Vec<u8>>>,
190 ended: bool,
191 drive_external: Option<SyncDriveExternal>,
192 ) -> Self {
193 RecvStream {
194 rx_body,
195 ready: None,
196 ended,
197 drive_external,
198 }
199 }
200
201 pub async fn read(&mut self, buf: &mut [u8]) -> Result<usize, Error> {
205 Ok(poll_fn(move |cx| Pin::new(&mut *self).poll_read(cx, buf)).await?)
206 }
207
208 pub fn is_end_stream(&self) -> bool {
212 self.ended
213 }
214
215 fn poll_drive_server(&mut self, cx: &mut Context) -> Poll<Result<(), io::Error>> {
216 if let Some(drive_external) = &self.drive_external {
217 drive_external.poll_drive_external(cx)
218 } else {
219 Ok(()).into()
220 }
221 }
222
223 #[doc(hidden)]
224 fn poll_body_data(
226 self: Pin<&mut Self>,
227 cx: &mut Context,
228 buf: &mut [u8],
229 ) -> Poll<io::Result<usize>> {
230 let this = self.get_mut();
231
232 if this.ended {
233 return Ok(0).into();
234 }
235
236 loop {
237 if let Some(ready) = &mut this.ready {
239 let amt = (&ready[..]).read(buf)?;
240
241 ready.consume(amt);
242
243 if ready.is_empty() {
244 this.ready = None;
245 }
246
247 return Ok(amt).into();
248 }
249
250 assert!(this.ready.is_none());
252
253 match ready!(Pin::new(&this.rx_body).poll_recv(cx, true)) {
254 None => {
255 this.ended = true;
257 return Ok(0).into();
258 }
259 Some(v) => {
260 let v = v?;
262 this.ready = Some(ConsumeBuf::new(v));
263 }
264 }
265 }
266 }
267}
268
269impl AsyncRead for RecvStream {
270 fn poll_read(
271 self: Pin<&mut Self>,
272 cx: &mut Context,
273 buf: &mut [u8],
274 ) -> Poll<io::Result<usize>> {
275 let this = self.get_mut();
276
277 assert!(!buf.is_empty(), "poll_read with empty buf");
279
280 ready!(this.poll_drive_server(cx))?;
281
282 Pin::new(this).poll_body_data(cx, buf)
283 }
284}
285
286impl fmt::Debug for SendStream {
287 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
288 write!(f, "SendStream")
289 }
290}
291
292impl fmt::Debug for RecvStream {
293 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
294 write!(f, "RecvStream")
295 }
296}
297
298enum Data<'a> {
299 Shared(&'a [u8]),
300 Owned(Vec<u8>),
301 Empty,
302}
303
304impl<'a> Data<'a> {
305 fn is_owned(&self) -> bool {
306 if let Data::Owned(_) = self {
307 return true;
308 }
309 false
310 }
311
312 pub fn is_empty(&self) -> bool {
313 match self {
314 Data::Shared(v) => v.is_empty(),
315 Data::Owned(v) => v.is_empty(),
316 Data::Empty => true,
317 }
318 }
319
320 pub fn len(&self) -> usize {
321 match self {
322 Data::Shared(v) => v.len(),
323 Data::Owned(v) => v.len(),
324 Data::Empty => 0,
325 }
326 }
327
328 pub fn take_owned(&mut self) -> Vec<u8> {
329 if self.is_owned() {
330 if let Data::Owned(v) = mem::replace(self, Data::Empty) {
331 return v;
332 }
333 }
334 panic!("Can't take_owned");
335 }
336}
337
338impl<'a> std::ops::Deref for Data<'a> {
339 type Target = [u8];
340 fn deref(&self) -> &Self::Target {
341 match self {
342 Data::Shared(v) => &v[..],
343 Data::Owned(v) => &v[..],
344 Data::Empty => panic!("Can't deref a Data::Empty"),
345 }
346 }
347}
348
349pub(crate) fn is_closed_kind(kind: io::ErrorKind) -> bool {
351 kind == io::ErrorKind::UnexpectedEof
352 || kind == io::ErrorKind::ConnectionReset
353 || kind == io::ErrorKind::ConnectionAborted
354}