hreq_h1/
server.rs

1//! Server implementation of the HTTP/1.1 protocol.
2//!
3//! # Example
4//!
5//! # Example
6//!
7//! ```rust, no_run
8//! use hreq_h1::server;
9//! use std::error::Error;
10//! use async_std::net::TcpListener;
11//! use http::{Response, StatusCode};
12//!
13//! #[async_std::main]
14//! async fn main() -> Result<(), Box<dyn Error>> {
15//!     let mut listener = TcpListener::bind("127.0.0.1:3000").await?;
16//!
17//!     // Accept all incoming TCP connections.
18//!     loop {
19//!         if let Ok((socket, _peer_addr)) = listener.accept().await {
20//!
21//!             // Spawn a new task to process each connection individually
22//!             async_std::task::spawn(async move {
23//!                 let mut h1 = server::handshake(socket);
24//!
25//!                 // Handle incoming requests from this socket, one by one.
26//!                 while let Some(request) = h1.accept().await {
27//!                     let (req, mut respond) = request.unwrap();
28//!
29//!                     println!("Receive request: {:?}", req);
30//!
31//!                     // Build a response with no body, since
32//!                     // that is sent later.
33//!                     let response = Response::builder()
34//!                         .status(StatusCode::OK)
35//!                         .body(())
36//!                         .unwrap();
37//!
38//!                     // Send the response back to the client
39//!                     let mut send_body = respond
40//!                         .send_response(response, false).await.unwrap();
41//!
42//!                     send_body.send_data(b"Hello world!", true)
43//!                         .await.unwrap();
44//!                 }
45//!             });
46//!         }
47//!     }
48//!
49//!    Ok(())
50//! }
51//!
52//!
53
54use crate::buf_reader::BufIo;
55use crate::fast_buf::FastBuf;
56use crate::http11::{poll_for_crlfcrlf, try_parse_req, write_http1x_res, READ_BUF_INIT_SIZE};
57use crate::limit::allow_reuse;
58use crate::limit::{LimitRead, LimitWrite};
59use crate::mpsc::{Receiver, Sender};
60use crate::share::is_closed_kind;
61use crate::Error;
62use crate::RecvStream;
63use crate::SendStream;
64use crate::{AsyncRead, AsyncWrite};
65use futures_util::future::poll_fn;
66use futures_util::ready;
67use std::fmt;
68use std::io;
69use std::pin::Pin;
70use std::sync::{Arc, Mutex};
71use std::task::{Context, Poll};
72
73/// Buffer size when writing a request.
74const MAX_RESPONSE_SIZE: usize = 8192;
75
76/// Max buffer size when reading a body.
77const MAX_BODY_READ_SIZE: u64 = 8 * 1024 * 1024;
78
79// The state and I/O of the connection is driven by the async calls from the various entities
80// involved in accepting and responding to requests.
81//
82// 1. Connection::accept() drives while there is no current request.
83// 2. RecvStream::poll_read() and SendResponse::send_response() while reading a request body and
84//    responding to a request.
85// 3. SendStream::send_data() while a response body is being sent.
86
87/// "handshake" to create a connection.
88///
89/// See [module level doc](index.html) for an example.
90pub fn handshake<S>(io: S) -> Connection<S>
91where
92    S: AsyncRead + AsyncWrite + Unpin + 'static,
93{
94    let inner = Arc::new(Mutex::new(Codec::new(io)));
95    let (send, recv) = Receiver::new(1);
96    let drive = SyncDriveExternal(Arc::new(Box::new(inner.clone())), send);
97
98    Connection(inner, drive, recv)
99}
100
101/// Server connection for accepting incoming requests.
102///
103/// See [module level doc](index.html) for an example.
104pub struct Connection<S>(Arc<Mutex<Codec<S>>>, SyncDriveExternal, Receiver<()>);
105
106impl<S> Connection<S>
107where
108    S: AsyncRead + AsyncWrite + Unpin,
109{
110    /// Accept a new incoming request to handle.
111    pub async fn accept(
112        &mut self,
113    ) -> Option<Result<(http::Request<RecvStream>, SendResponse), Error>> {
114        poll_fn(|cx| Pin::new(&mut *self).poll_accept(cx))
115            .await
116            .map(|v| v.map_err(|x| x.into()))
117    }
118
119    /// Wait until the connection has sent/flushed all data and is ok to drop.
120    pub async fn close(mut self) {
121        poll_fn(|cx| Pin::new(&mut self).poll_close(cx)).await;
122    }
123
124    fn poll_accept(
125        self: Pin<&mut Self>,
126        cx: &mut Context,
127    ) -> Poll<Option<Result<(http::Request<RecvStream>, SendResponse), io::Error>>> {
128        let this = self.get_mut();
129
130        // This will register on previous SyncDriveExternal being dropped.
131        ready!(this.1.poll_pending_external(cx, &this.2));
132
133        let drive_external = this.1.clone();
134
135        let mut lock = this.0.lock().unwrap();
136
137        lock.poll_server(cx, Some(drive_external), true)
138    }
139
140    fn poll_close(self: Pin<&mut Self>, cx: &mut Context) -> Poll<()> {
141        let mut lock = self.0.lock().unwrap();
142
143        // It doesn't matter what the return value is, we just need it to not be pending.
144        ready!(lock.poll_server(cx, None, true));
145
146        ().into()
147    }
148}
149
150/// Handle to send a response and body back for a single request.
151///
152/// See [module level doc](index.html) for an example.
153pub struct SendResponse {
154    drive_external: SyncDriveExternal,
155    tx_res: Sender<(http::Response<()>, bool, Receiver<(Vec<u8>, bool)>)>,
156    req_expects_no_body: bool,
157}
158
159impl SendResponse {
160    /// Send a response to a request. Notice that the body is sent separately afterwards.
161    ///
162    /// The lib will infer that there will be no response body if there is a `content-length: 0`
163    /// header or a status code that should not have a body (1xx, 204, 304).
164    ///
165    /// `no_body` is an alternative way, in addition to headers and status, to inform the library
166    /// there will be no body to send.
167    ///
168    /// It's an error to send a body when the status or headers indicate there should not be one.
169    pub async fn send_response(
170        self,
171        response: http::Response<()>,
172        no_body: bool,
173    ) -> Result<SendStream, Error> {
174        trace!("Send response: {:?}", response);
175
176        // bounded to get back pressure
177        let (tx_body, rx_body) = Receiver::new(1);
178
179        let limit = LimitWrite::from_headers(response.headers());
180
181        let status = response.status();
182
183        // https://tools.ietf.org/html/rfc7230#page-31
184        // any response with a 1xx (Informational), 204 (No Content), or
185        // 304 (Not Modified) status code is always terminated by the first
186        // empty line after the header fields, regardless of the header fields
187        // present in the message, and thus cannot contain a message body.
188        let ended = no_body
189            || self.req_expects_no_body
190            || limit.is_no_body()
191            || status.is_informational()
192            || status == http::StatusCode::NO_CONTENT
193            || status == http::StatusCode::NOT_MODIFIED;
194
195        let drive_external = Some(self.drive_external.clone());
196
197        let send = SendStream::new(tx_body, limit, ended, drive_external);
198
199        if !self.tx_res.send((response, ended, rx_body)) {
200            return Err(io::Error::new(io::ErrorKind::Other, "Connection closed").into());
201        }
202
203        poll_fn(|cx| self.drive_external.poll_drive_external(cx)).await?;
204
205        Ok(send)
206    }
207}
208pub(crate) struct Codec<S> {
209    io: BufIo<S>,
210    state: State,
211}
212
213impl<S> Codec<S>
214where
215    S: AsyncRead + AsyncWrite + Unpin,
216{
217    fn new(io: S) -> Self {
218        Codec {
219            io: BufIo::with_capacity(READ_BUF_INIT_SIZE, io),
220            state: State::RecvReq(RecvReq),
221        }
222    }
223}
224
225impl<S> Codec<S>
226where
227    S: AsyncRead + AsyncWrite + Unpin,
228{
229    fn poll_server(
230        &mut self,
231        cx: &mut Context,
232        want_next_req: Option<SyncDriveExternal>,
233        register_on_user_input: bool,
234    ) -> Poll<Option<Result<(http::Request<RecvStream>, SendResponse), io::Error>>> {
235        // Any error bubbling up closes the connection.
236        match self.drive(cx, want_next_req, register_on_user_input) {
237            Poll::Ready(Some(Err(e))) => {
238                debug!("Close on error: {:?}", e);
239
240                trace!("{:?} => Closed", self.state);
241                self.state = State::Closed;
242
243                Some(Err(e)).into()
244            }
245            r => r,
246        }
247    }
248
249    fn drive(
250        &mut self,
251        cx: &mut Context,
252        want_next_req: Option<SyncDriveExternal>,
253        register_on_user_input: bool,
254    ) -> Poll<Option<Result<(http::Request<RecvStream>, SendResponse), io::Error>>> {
255        loop {
256            ready!(Pin::new(&mut self.io).poll_finish_pending_write(cx))?;
257
258            match &mut self.state {
259                State::RecvReq(h) => {
260                    if let Some(want_next_req) = want_next_req {
261                        let (next_req, next_state) =
262                            ready!(h.poll_next_req(cx, &mut self.io, want_next_req))?;
263
264                        trace!("RecvReq => {:?}", next_state);
265                        self.state = next_state;
266
267                        if let Some(next_req) = next_req {
268                            return Some(Ok(next_req)).into();
269                        } else {
270                            return None.into();
271                        }
272                    } else {
273                        // poll_drive() called with the intention of just driving server state
274                        // and not to handle the next read request.
275                        return None.into();
276                    }
277                }
278                State::SendRes(h) => {
279                    let next_state =
280                        ready!(h.poll_bidirect(cx, &mut self.io, register_on_user_input))?;
281
282                    trace!("SendRes => {:?}", next_state);
283                    self.state = next_state;
284                }
285                State::SendBody(h) => {
286                    let next_state =
287                        ready!(h.poll_send_body(cx, &mut self.io, register_on_user_input))?;
288
289                    trace!("SendBody => {:?}", next_state);
290                    self.state = next_state;
291                }
292                State::Closed => {
293                    // Nothing to do
294                    return None.into();
295                }
296            }
297        }
298    }
299}
300
301enum State {
302    /// Receive next request.
303    RecvReq(RecvReq),
304    /// Send response, and (if appropriate) receive request body.
305    SendRes(Bidirect),
306    /// Send response body.
307    SendBody(BodySender),
308    /// Closed, error or cleanly.
309    Closed,
310}
311
312impl fmt::Debug for State {
313    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
314        match self {
315            State::RecvReq(_) => write!(f, "RecvReq"),
316            State::SendRes(_) => write!(f, "SendRes"),
317            State::SendBody(_) => write!(f, "SendBody"),
318            State::Closed => write!(f, "Closed"),
319        }
320    }
321}
322
323/// Waiting for the next request to arrive.
324///
325/// Reads a buffer for 2 x crlf to know we got an entire request header.
326struct RecvReq;
327
328impl RecvReq {
329    fn poll_next_req<S>(
330        &mut self,
331        cx: &mut Context,
332        io: &mut BufIo<S>,
333        drive_external: SyncDriveExternal,
334    ) -> Poll<Result<(Option<(http::Request<RecvStream>, SendResponse)>, State), io::Error>>
335    where
336        S: AsyncRead + AsyncWrite + Unpin,
337    {
338        let req = match ready!(poll_for_crlfcrlf(cx, io, try_parse_req)).and_then(|x| x) {
339            Ok(v) => v,
340            Err(e) => {
341                if is_closed_kind(e.kind()) {
342                    // remote just hung up before sending request, that's ok.
343                    return Ok((None, State::Closed)).into();
344                } else {
345                    return Err(e).into();
346                }
347            }
348        };
349
350        if req.is_none() {
351            return Err(io::Error::new(
352                io::ErrorKind::InvalidData,
353                "Failed to parse request",
354            ))
355            .into();
356        }
357        let req = req.expect("Didn't read full request");
358
359        // Limiter to read the correct body amount from the socket.
360        let limit = LimitRead::from_headers(req.headers(), false);
361
362        let request_allows_reuse = allow_reuse(req.headers(), req.version());
363
364        let no_req_body = limit.is_no_body();
365
366        // https://tools.ietf.org/html/rfc7230#page-31
367        // Any response to a HEAD request ... is always terminated by the first
368        // empty line after the header fields, regardless of the header fields
369        // present in the message, and thus cannot contain a message body.
370        let req_expects_no_body = req.method() == http::Method::HEAD;
371
372        // bound channel to get backpressure
373        let (tx_body, rx_body) = Receiver::new(1);
374
375        let (tx_res, rx_res) = Receiver::new(1);
376
377        // Prepare the new "package" to be delivered out of the poll loop.
378        let package = {
379            //
380            let recv = RecvStream::new(rx_body, no_req_body, Some(drive_external.clone()));
381
382            let (parts, _) = req.into_parts();
383            let req = http::Request::from_parts(parts, recv);
384
385            let send = SendResponse {
386                drive_external,
387                tx_res,
388                req_expects_no_body,
389            };
390
391            (req, send)
392        };
393
394        // Drop tx_body straight away if headers indicate we are not expecting any request body.
395        let tx_body = if limit.is_no_body() {
396            None
397        } else {
398            Some(tx_body)
399        };
400
401        let cur_read_size = limit.body_size().unwrap_or(8192).min(MAX_BODY_READ_SIZE) as usize;
402
403        let bidirect = Bidirect {
404            limit,
405            request_allows_reuse,
406            tx_body,
407            rx_res: Some(rx_res),
408            holder: None,
409            cur_read_size,
410        };
411
412        Ok((Some(package), State::SendRes(bidirect))).into()
413    }
414}
415
416/// Both receive a request body (if headers indicate it), and
417/// send a response which is obtained from the library user.
418struct Bidirect {
419    // limiter/dechunker for reading incoming request body.
420    limit: LimitRead,
421    // remember this for when we are to go back into state RecvReq
422    request_allows_reuse: bool,
423    // send body chunks from socket to this sender.
424    tx_body: Option<Sender<io::Result<Vec<u8>>>>,
425    // receive a response (once), from this to pass to socket.
426    rx_res: Option<Receiver<(http::Response<()>, bool, Receiver<(Vec<u8>, bool)>)>>,
427    // Holder of data from rx_res used to receive/write a response body.
428    holder: Option<(bool, LimitWrite, Receiver<(Vec<u8>, bool)>)>,
429    // The current read buffer size for receving the request body.
430    cur_read_size: usize,
431}
432
433impl Bidirect {
434    fn poll_bidirect<S>(
435        &mut self,
436        cx: &mut Context,
437        io: &mut BufIo<S>,
438        register_on_user_input: bool,
439    ) -> Poll<Result<State, io::Error>>
440    where
441        S: AsyncRead + AsyncWrite + Unpin,
442    {
443        // Alternate between attempting to send a user response and receving more body chunks.
444        loop {
445            // We keep on looping until both these are None which signals
446            // the bidirect state is done.
447            if self.rx_res.is_none() && self.tx_body.is_none() {
448                break;
449            }
450
451            let mut send_resp_pending = false;
452
453            // Handle user sending a response.
454            if self.rx_res.is_some() {
455                // register_on_user_input means we should register a Waker when polling for a response
456                // from the user. We should not register two wakers for the same Context, which means
457                // if we get Pending while register_on_user_input is false, we can proceed to also drive IO.
458                match self.poll_send_resp(cx, io, register_on_user_input) {
459                    Poll::Pending => {
460                        send_resp_pending = true;
461                    }
462                    Poll::Ready(v) => v?,
463                }
464            }
465
466            if send_resp_pending && (register_on_user_input || self.tx_body.is_none()) {
467                // If register_on_user_input:
468                // A Waker is registered in mpsc::Receiver::poll_recv.
469                // We cannot continue with IO since that would risk
470                // registering wakers in multiple places.
471                //
472                // If self.tx_body.is_none() we can't make progress on
473                // IO, and send_resp will not make progress by anything less
474                // than user input.
475
476                return Poll::Pending;
477            }
478
479            // Read request body from socket and propagate to user.
480            if self.tx_body.is_some() {
481                ready!(self.poll_read_body(cx, io))?;
482            }
483        }
484
485        // invariant: we must have the details required in holder.
486        let (no_body, limit, rx_body) = self.holder.take().expect("Holder of rx_body");
487
488        let next_state = if no_body || limit.is_no_body() {
489            if self.request_allows_reuse {
490                trace!("No body to send");
491                State::RecvReq(RecvReq)
492            } else {
493                trace!("Request does not allow reuse");
494                State::Closed
495            }
496        } else {
497            State::SendBody(BodySender {
498                request_allows_reuse: self.request_allows_reuse,
499                rx_body,
500            })
501        };
502
503        Ok(next_state).into()
504    }
505
506    fn poll_send_resp<S>(
507        &mut self,
508        cx: &mut Context,
509        io: &mut BufIo<S>,
510        register_on_user_input: bool,
511    ) -> Poll<Result<(), io::Error>>
512    where
513        S: AsyncRead + AsyncWrite + Unpin,
514    {
515        // We shouldn't be here unless we have rx_res.
516        let rx_res = self.rx_res.as_ref().unwrap();
517
518        if let Some((res, end, rx_body)) =
519            ready!(Pin::new(rx_res).poll_recv(cx, register_on_user_input))
520        {
521            // We got a response from the user.
522
523            // Remember things for the next state, SendBody
524            let limit = LimitWrite::from_headers(res.headers());
525            self.holder = Some((end, limit, rx_body));
526
527            let mut buf = FastBuf::with_capacity(MAX_RESPONSE_SIZE);
528
529            let mut write_to = buf.borrow();
530
531            let amount = write_http1x_res(&res, &mut write_to[..])?;
532
533            // If write_http1x_res reports the correct number of bytes written to
534            // the buffer, then this extend is safe.
535            unsafe {
536                write_to.extend(amount);
537            }
538
539            let mut to_send = Some(&buf[..]);
540
541            // invariant: poll_drive deals with pending outgoing io before anything
542            //            else. at this point we should not have any pending write io.
543            assert!(io.can_poll_write());
544
545            match Pin::new(io).poll_write_all(cx, &mut to_send, true) {
546                Poll::Pending => {
547                    // invariant: Pending without "taking" all to_send bytes is a fault in BufIo
548                    assert!(to_send.is_none());
549                }
550                Poll::Ready(v) => v?,
551            }
552
553            // Remove rx_res since we don't need anything more from it. This makes
554            // poll_bidirect() not go into poll_send_resp anymore.
555            self.rx_res.take();
556        } else {
557            // The user dropped the SendResponse instance before sending a response.
558            // This is a user fault.
559            return Err(Error::User(
560                "SendResponse dropped before sending any response".to_string(),
561            )
562            .into_io())
563            .into();
564        }
565
566        Ok(()).into()
567    }
568
569    fn poll_read_body<S>(
570        &mut self,
571        cx: &mut Context,
572        io: &mut BufIo<S>,
573    ) -> Poll<Result<(), io::Error>>
574    where
575        S: AsyncRead + AsyncWrite + Unpin,
576    {
577        // We shouldn't be here unless we have tx_body.
578        let tx_body = self.tx_body.as_mut().unwrap();
579
580        // Ensure we can send off any incoming read chunk to the user. This makes for flow control.
581        if !ready!(Pin::new(&*tx_body).poll_ready(cx, true)) {
582            // User has dropped the RecvStream. That's ok, we will just discard
583            // the entire incoming body.
584        }
585
586        io.ensure_read_capacity(self.cur_read_size);
587
588        let buf = ready!(Pin::new(&mut *io).poll_fill_buf(cx, false))?;
589
590        if buf.is_empty() {
591            // End of incoming data before we have fulfilled the LimitRead.
592            // configured by the headers.
593            return Err(io::Error::new(
594                io::ErrorKind::UnexpectedEof,
595                "EOF before complete body received",
596            ))
597            .into();
598        }
599
600        let available_bytes = buf.len();
601
602        let chunk = if self.limit.can_read_entire_vec() && io.can_take_read_buf() {
603            // This is an optimization. If we're using a content-length and not
604            // chunked, we can sometimes take the entire input buffer and therefore
605            // avoiding some data copying.
606
607            let chunk = io.take_read_buf();
608
609            // To keep counting the size of the chunks
610            self.limit.accept_entire_vec(&chunk);
611
612            chunk
613        } else {
614            let mut chunk = FastBuf::with_capacity(available_bytes);
615
616            let mut read_into = chunk.borrow();
617
618            let amount = ready!(self.limit.poll_read(cx, io, &mut read_into[..]))?;
619
620            // If poll_read is reporting the correct amount of bytes read into buf,
621            // then this extend is safe.
622            unsafe {
623                read_into.extend(amount);
624            }
625
626            chunk.into_vec()
627        };
628
629        trace!("Received body chunk len={}", chunk.len());
630
631        if !chunk.is_empty() {
632            tx_body.send(Ok(chunk));
633        } else if !self.limit.is_complete() {
634            // https://tools.ietf.org/html/rfc7230#page-32
635            // If the sender closes the connection or
636            // the recipient times out before the indicated number of octets are
637            // received, the recipient MUST consider the message to be
638            // incomplete and close the connection.
639
640            trace!("Close because read body is not complete");
641            const EOF: io::ErrorKind = io::ErrorKind::UnexpectedEof;
642            return Err(io::Error::new(EOF, "Partial body")).into();
643        }
644
645        if self.limit.is_complete() {
646            // Remove tx_body Sender which indicates to the RecvStream that there is
647            // no more body chunks to come.
648            self.tx_body.take();
649        }
650
651        Ok(()).into()
652    }
653}
654
655/// Sender of a response body.
656struct BodySender {
657    request_allows_reuse: bool,
658    rx_body: Receiver<(Vec<u8>, bool)>,
659}
660
661impl BodySender {
662    fn poll_send_body<S>(
663        &mut self,
664        cx: &mut Context,
665        io: &mut BufIo<S>,
666        register_on_user_input: bool,
667    ) -> Poll<Result<State, io::Error>>
668    where
669        S: AsyncRead + AsyncWrite + Unpin,
670    {
671        // Keep try to send body chunks until we got no more to send or Pending.
672        loop {
673            // Always abort on Pending, but register_on_user_input controls whether this resulted in
674            // any Waker being registered. This makes for flow control.
675            let next = ready!(Pin::new(&self.rx_body).poll_recv(cx, register_on_user_input));
676
677            // Pending writes must have been dealt with already at the beginning of poll_drive().
678            assert!(io.can_poll_write());
679
680            if let Some((chunk, end)) = next {
681                let mut buf = Some(&chunk[..]);
682
683                match Pin::new(&mut *io).poll_write_all(cx, &mut buf, end) {
684                    Poll::Pending => {
685                        // invariant: The buffer must still been taken by poll_write.
686                        assert!(buf.is_none());
687                        return Poll::Pending;
688                    }
689                    Poll::Ready(v) => v?,
690                }
691
692                if end {
693                    let next_state = if self.request_allows_reuse {
694                        trace!("Finished sending body");
695                        State::RecvReq(RecvReq)
696                    } else {
697                        trace!("Request does not allow reuse");
698                        State::Closed
699                    };
700
701                    return Ok(next_state).into();
702                }
703            } else {
704                // This is a fault, we are expecting more body chunks and
705                // the SendStream was dropped.
706                warn!("SendStream dropped before sending end_of_body");
707
708                return Err(io::Error::new(
709                    io::ErrorKind::Other,
710                    "Unexpected end of body",
711                ))
712                .into();
713            }
714        }
715    }
716}
717
718impl<S> std::fmt::Debug for Connection<S> {
719    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
720        write!(f, "Connection")
721    }
722}
723
724impl fmt::Debug for SendResponse {
725    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
726        write!(f, "SendResponse")
727    }
728}
729
730// These unsafe require some explanation. We want to be able to call Codec<S>::poll_drive
731// from both RecvStream and SendStream, however we don't want those two to be
732// generic over S. That leads us down the path of dynamic dispatch and "hiding"
733// S behind a Box<dyn DriveExternal>. So we implement that trait for Arc<Mutex<Codec<S>>>,
734// sorted... but oh not.
735//
736// If we put Box<dyn DriveExternal> as a property in SendStream, rust will later "discover"
737// this when it in an async context like async_std::spawn. Rust will say that DriveExternal
738// is not Sync/Send and if we try to constrain it, that will in turn propagate to S, and
739// we _don't_ want S to require Sync/Send.
740//
741// However. We always put S behind Arc<Mutex<Codec<S>>> and our treatment of S is
742// absolutely Sync/Send because of that mutex. That leads us to wrapping
743// Box<dyn DriveExternal> in some struct we can "unsafe impl Sync" for, and that's
744// SyncDriveExternal.
745unsafe impl Send for SyncDriveExternal {}
746unsafe impl Sync for SyncDriveExternal {}
747
748#[derive(Clone)]
749pub(crate) struct SyncDriveExternal(Arc<Box<dyn DriveExternal>>, Sender<()>);
750
751impl SyncDriveExternal {
752    // count_external() tells us how many Arc<Box<dyn DriveExternal>> exists.
753    // When we are not actively handling a request, this is 1.
754    //
755    // When a Sender is dropped (inside the cloned Box<dyn DriveExternal> in RecvStream
756    // and SendStream), it wakes the Receiver, and we use this as a mechanism to "monitor"
757    // when SyncDriveExternal instances are being dropped.
758    fn poll_pending_external(&mut self, cx: &mut Context, recv: &Receiver<()>) -> Poll<()> {
759        let external = self.count_external();
760        trace!("poll_pending_external: {}", external);
761        if self.count_external() == 1 {
762            trace!("poll_pending_external: Ready");
763            ().into()
764        } else {
765            match Pin::new(recv).poll_recv(cx, true) {
766                Poll::Pending => {
767                    trace!("poll_pending_external Pending");
768                    Poll::Pending
769                }
770                Poll::Ready(_) => {
771                    // invariant: there is always a Sender in MakeDriveExternal, and they
772                    // never send anything.
773                    unreachable!()
774                }
775            }
776        }
777    }
778
779    fn count_external(&self) -> usize {
780        Arc::weak_count(&self.0) + Arc::strong_count(&self.0)
781    }
782}
783
784impl DriveExternal for SyncDriveExternal {
785    fn poll_drive_external(&self, cx: &mut Context) -> Poll<Result<(), io::Error>> {
786        self.0.poll_drive_external(cx)
787    }
788}
789
790pub(crate) trait DriveExternal {
791    fn poll_drive_external(&self, cx: &mut Context) -> Poll<Result<(), io::Error>>;
792}
793
794impl<S> DriveExternal for Arc<Mutex<Codec<S>>>
795where
796    S: AsyncRead + AsyncWrite + Unpin,
797{
798    fn poll_drive_external(&self, cx: &mut Context) -> Poll<Result<(), io::Error>> {
799        let mut lock = self.lock().unwrap();
800
801        match lock.poll_server(cx, None, false) {
802            Poll::Pending => {
803                let pending_io = lock.io.pending_rx() || lock.io.pending_tx();
804
805                trace!("pending_io: {}", pending_io);
806
807                // Only propagate Pending if it was due to io. We send register_on_user_input
808                // false, which means that reading user input from SendResponse and SendStream
809                // will not have registered a Waker. Pending due to IO most propagate as Pending.
810                if pending_io {
811                    Poll::Pending
812                } else {
813                    Ok(()).into()
814                }
815            }
816            Poll::Ready(Some(Ok(_))) => {
817                // invariant: want_next_req is false, this should not happend.
818                unreachable!("Got next request in poll_drive_external");
819            }
820            // Propagate error
821            Poll::Ready(Some(Err(e))) => Err(e).into(),
822            //
823            Poll::Ready(None) => Ok(()).into(),
824        }
825    }
826}