hreq_h1/
share.rs

1use crate::fast_buf::ConsumeBuf;
2use crate::fast_buf::FastBuf;
3use crate::limit::LimitWrite;
4use crate::mpsc::{Receiver, Sender};
5use crate::server::{DriveExternal, SyncDriveExternal};
6use crate::AsyncRead;
7use crate::Error;
8use futures_util::future::poll_fn;
9use futures_util::ready;
10use std::fmt;
11use std::io;
12use std::io::Read;
13use std::mem;
14use std::pin::Pin;
15use std::task::{Context, Poll};
16
17/// Send some body data to a remote peer.
18///
19/// Obtained either via a [`client::SendRequest`] or a [`server::SendResponse`].
20///
21/// [`client::SendRequest`]: client/struct.SendRequest.html
22/// [`server::SendResponse`]: server/struct.SendResponse.html
23pub struct SendStream {
24    tx_body: Sender<(Vec<u8>, bool)>,
25    limit: LimitWrite,
26    ended: bool,
27    drive_external: Option<SyncDriveExternal>,
28}
29
30impl SendStream {
31    pub(crate) fn new(
32        tx_body: Sender<(Vec<u8>, bool)>,
33        limit: LimitWrite,
34        ended: bool,
35        drive_external: Option<SyncDriveExternal>,
36    ) -> Self {
37        SendStream {
38            tx_body,
39            limit,
40            ended,
41            drive_external,
42        }
43    }
44
45    /// Send one chunk of data. Use `end_of_body` to signal end of data.
46    ///
47    /// When the body is constrained by a `content-length` header, this will only accept
48    /// the amount of bytes specified in the header. If there is too much data, the
49    /// function will error with a `Error::User`.
50    ///
51    /// For `transfer-encoding: chunked`, call to this function corresponds to one "chunk".
52    pub async fn send_data(&mut self, data: &[u8], end_of_body: bool) -> Result<(), Error> {
53        let data = Data::Shared(data);
54
55        self.do_send(data, end_of_body).await?;
56
57        Ok(())
58    }
59
60    /// Send one chunk of data. Use `end_of_body` to signal end of data.
61    ///
62    /// This is an optimization which together with a `content-length` shortcuts
63    /// some unnecessary copying of data.
64    ///
65    /// When the body is constrained by a `content-length` header, this will only accept
66    /// the amount of bytes specified in the header. If there is too much data, the
67    /// function will error with a `Error::User`.
68    ///
69    /// For `transfer-encoding: chunked`, call to this function corresponds to one "chunk".
70    pub async fn send_data_owned(&mut self, data: Vec<u8>, end_of_body: bool) -> Result<(), Error> {
71        let data = Data::Owned(data);
72
73        self.do_send(data, end_of_body).await?;
74
75        Ok(())
76    }
77
78    async fn do_send(&mut self, mut data: Data<'_>, end_of_body: bool) -> Result<(), Error> {
79        trace!("Send len={} end_of_body={}", data.len(), end_of_body);
80
81        poll_fn(|cx| self.poll_drive_server(cx)).await?;
82        poll_fn(|cx| Pin::new(&mut *self).poll_send_data(cx, &mut data, end_of_body)).await?;
83        poll_fn(|cx| self.poll_drive_server(cx)).await?;
84
85        // If content is ended, we effectively "flush", by keep doing poll_drive_external
86        // until we have driven all content through. This is only needed when we have
87        // drive_external (server side), since it means we are "driving" the connection
88        // from this very send action.
89        if self.ended && self.drive_external.is_some() {
90            while !self.tx_body.is_empty() {
91                poll_fn(|cx| self.poll_drive_server(cx)).await?;
92            }
93        }
94
95        Ok(())
96    }
97
98    fn poll_drive_server(&mut self, cx: &mut Context) -> Poll<Result<(), io::Error>> {
99        if let Some(drive_external) = &self.drive_external {
100            drive_external.poll_drive_external(cx)
101        } else {
102            Ok(()).into()
103        }
104    }
105
106    /// Send some body data.
107    ///
108    /// `end` controls whether this is the last body chunk to send. It's an error
109    /// to send more data after `end` is `true`.
110    fn poll_send_data(
111        self: Pin<&mut Self>,
112        cx: &mut Context,
113        data: &mut Data,
114        end: bool,
115    ) -> Poll<Result<(), Error>> {
116        let this = self.get_mut();
117
118        if this.ended && end && data.is_empty() {
119            // this is a noop
120            return Ok(()).into();
121        }
122
123        if this.ended {
124            warn!("Body data is not expected");
125            return Err(Error::User("Body data is not expected".into())).into();
126        }
127
128        if !ready!(Pin::new(&this.tx_body).poll_ready(cx, true)) {
129            return Err(
130                io::Error::new(io::ErrorKind::ConnectionAborted, "Connection closed").into(),
131            )
132            .into();
133        }
134
135        let to_send = if data.is_owned() && this.limit.can_write_entire_vec() {
136            // This is an optmization when sending owned data. We can pass the
137            // Vec<u8> straight into the this.tx_body.send without copying the
138            // data into a FastBuf first.
139
140            let data = data.take_owned();
141
142            // so limit counters are correct
143            this.limit.accept_entire_vec(&data);
144
145            data
146        } else {
147            // This branch handles shared data as well as chunked body transfer.
148
149            let capacity = data.len() + this.limit.overhead();
150            let mut chunk = FastBuf::with_capacity(capacity);
151            this.limit.write(&data[..], &mut chunk)?;
152
153            if end {
154                this.ended = true;
155                this.limit.finish(&mut chunk)?;
156            }
157
158            chunk.into_vec()
159        };
160
161        let sent = this.tx_body.send((to_send, end));
162
163        if !sent {
164            return Err(
165                io::Error::new(io::ErrorKind::ConnectionAborted, "Connection closed").into(),
166            )
167            .into();
168        }
169
170        Ok(()).into()
171    }
172}
173
174/// Receives a body from the remote peer.
175///
176/// Obtained from either a [`client::ResponseFuture`] or [`server::Connection`].
177///
178/// [`client::ResponseFuture`]: client/struct.ResponseFuture.html
179/// [`server::Connection`]: server/struct.Connection.html
180pub struct RecvStream {
181    rx_body: Receiver<io::Result<Vec<u8>>>,
182    ready: Option<ConsumeBuf>,
183    ended: bool,
184    drive_external: Option<SyncDriveExternal>,
185}
186
187impl RecvStream {
188    pub(crate) fn new(
189        rx_body: Receiver<io::Result<Vec<u8>>>,
190        ended: bool,
191        drive_external: Option<SyncDriveExternal>,
192    ) -> Self {
193        RecvStream {
194            rx_body,
195            ready: None,
196            ended,
197            drive_external,
198        }
199    }
200
201    /// Read some body data into a given buffer.
202    ///
203    /// Ends when returned size is `0`.
204    pub async fn read(&mut self, buf: &mut [u8]) -> Result<usize, Error> {
205        Ok(poll_fn(move |cx| Pin::new(&mut *self).poll_read(cx, buf)).await?)
206    }
207
208    /// Returns `true` if there is no more data to receive.
209    ///
210    /// Specifically any further call to `read` will result in `0` bytes read.
211    pub fn is_end_stream(&self) -> bool {
212        self.ended
213    }
214
215    fn poll_drive_server(&mut self, cx: &mut Context) -> Poll<Result<(), io::Error>> {
216        if let Some(drive_external) = &self.drive_external {
217            drive_external.poll_drive_external(cx)
218        } else {
219            Ok(()).into()
220        }
221    }
222
223    #[doc(hidden)]
224    /// Poll for some body data.
225    fn poll_body_data(
226        self: Pin<&mut Self>,
227        cx: &mut Context,
228        buf: &mut [u8],
229    ) -> Poll<io::Result<usize>> {
230        let this = self.get_mut();
231
232        if this.ended {
233            return Ok(0).into();
234        }
235
236        loop {
237            // First ship out ready data already received.
238            if let Some(ready) = &mut this.ready {
239                let amt = (&ready[..]).read(buf)?;
240
241                ready.consume(amt);
242
243                if ready.is_empty() {
244                    this.ready = None;
245                }
246
247                return Ok(amt).into();
248            }
249
250            // invariant: Should be no ready bytes if we're here.
251            assert!(this.ready.is_none());
252
253            match ready!(Pin::new(&this.rx_body).poll_recv(cx, true)) {
254                None => {
255                    // Channel is closed which indicates end of body.
256                    this.ended = true;
257                    return Ok(0).into();
258                }
259                Some(v) => {
260                    // nested io::Error
261                    let v = v?;
262                    this.ready = Some(ConsumeBuf::new(v));
263                }
264            }
265        }
266    }
267}
268
269impl AsyncRead for RecvStream {
270    fn poll_read(
271        self: Pin<&mut Self>,
272        cx: &mut Context,
273        buf: &mut [u8],
274    ) -> Poll<io::Result<usize>> {
275        let this = self.get_mut();
276
277        // can't poll data with an empty buffer
278        assert!(!buf.is_empty(), "poll_read with empty buf");
279
280        ready!(this.poll_drive_server(cx))?;
281
282        Pin::new(this).poll_body_data(cx, buf)
283    }
284}
285
286impl fmt::Debug for SendStream {
287    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
288        write!(f, "SendStream")
289    }
290}
291
292impl fmt::Debug for RecvStream {
293    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
294        write!(f, "RecvStream")
295    }
296}
297
298enum Data<'a> {
299    Shared(&'a [u8]),
300    Owned(Vec<u8>),
301    Empty,
302}
303
304impl<'a> Data<'a> {
305    fn is_owned(&self) -> bool {
306        if let Data::Owned(_) = self {
307            return true;
308        }
309        false
310    }
311
312    pub fn is_empty(&self) -> bool {
313        match self {
314            Data::Shared(v) => v.is_empty(),
315            Data::Owned(v) => v.is_empty(),
316            Data::Empty => true,
317        }
318    }
319
320    pub fn len(&self) -> usize {
321        match self {
322            Data::Shared(v) => v.len(),
323            Data::Owned(v) => v.len(),
324            Data::Empty => 0,
325        }
326    }
327
328    pub fn take_owned(&mut self) -> Vec<u8> {
329        if self.is_owned() {
330            if let Data::Owned(v) = mem::replace(self, Data::Empty) {
331                return v;
332            }
333        }
334        panic!("Can't take_owned");
335    }
336}
337
338impl<'a> std::ops::Deref for Data<'a> {
339    type Target = [u8];
340    fn deref(&self) -> &Self::Target {
341        match self {
342            Data::Shared(v) => &v[..],
343            Data::Owned(v) => &v[..],
344            Data::Empty => panic!("Can't deref a Data::Empty"),
345        }
346    }
347}
348
349/// Check if kind indicates the other side closed the connection.
350pub(crate) fn is_closed_kind(kind: io::ErrorKind) -> bool {
351    kind == io::ErrorKind::UnexpectedEof
352        || kind == io::ErrorKind::ConnectionReset
353        || kind == io::ErrorKind::ConnectionAborted
354}