hreq_h1/server.rs
1//! Server implementation of the HTTP/1.1 protocol.
2//!
3//! # Example
4//!
5//! # Example
6//!
7//! ```rust, no_run
8//! use hreq_h1::server;
9//! use std::error::Error;
10//! use async_std::net::TcpListener;
11//! use http::{Response, StatusCode};
12//!
13//! #[async_std::main]
14//! async fn main() -> Result<(), Box<dyn Error>> {
15//! let mut listener = TcpListener::bind("127.0.0.1:3000").await?;
16//!
17//! // Accept all incoming TCP connections.
18//! loop {
19//! if let Ok((socket, _peer_addr)) = listener.accept().await {
20//!
21//! // Spawn a new task to process each connection individually
22//! async_std::task::spawn(async move {
23//! let mut h1 = server::handshake(socket);
24//!
25//! // Handle incoming requests from this socket, one by one.
26//! while let Some(request) = h1.accept().await {
27//! let (req, mut respond) = request.unwrap();
28//!
29//! println!("Receive request: {:?}", req);
30//!
31//! // Build a response with no body, since
32//! // that is sent later.
33//! let response = Response::builder()
34//! .status(StatusCode::OK)
35//! .body(())
36//! .unwrap();
37//!
38//! // Send the response back to the client
39//! let mut send_body = respond
40//! .send_response(response, false).await.unwrap();
41//!
42//! send_body.send_data(b"Hello world!", true)
43//! .await.unwrap();
44//! }
45//! });
46//! }
47//! }
48//!
49//! Ok(())
50//! }
51//!
52//!
53
54use crate::buf_reader::BufIo;
55use crate::fast_buf::FastBuf;
56use crate::http11::{poll_for_crlfcrlf, try_parse_req, write_http1x_res, READ_BUF_INIT_SIZE};
57use crate::limit::allow_reuse;
58use crate::limit::{LimitRead, LimitWrite};
59use crate::mpsc::{Receiver, Sender};
60use crate::share::is_closed_kind;
61use crate::Error;
62use crate::RecvStream;
63use crate::SendStream;
64use crate::{AsyncRead, AsyncWrite};
65use futures_util::future::poll_fn;
66use futures_util::ready;
67use std::fmt;
68use std::io;
69use std::pin::Pin;
70use std::sync::{Arc, Mutex};
71use std::task::{Context, Poll};
72
73/// Buffer size when writing a request.
74const MAX_RESPONSE_SIZE: usize = 8192;
75
76/// Max buffer size when reading a body.
77const MAX_BODY_READ_SIZE: u64 = 8 * 1024 * 1024;
78
79// The state and I/O of the connection is driven by the async calls from the various entities
80// involved in accepting and responding to requests.
81//
82// 1. Connection::accept() drives while there is no current request.
83// 2. RecvStream::poll_read() and SendResponse::send_response() while reading a request body and
84// responding to a request.
85// 3. SendStream::send_data() while a response body is being sent.
86
87/// "handshake" to create a connection.
88///
89/// See [module level doc](index.html) for an example.
90pub fn handshake<S>(io: S) -> Connection<S>
91where
92 S: AsyncRead + AsyncWrite + Unpin + 'static,
93{
94 let inner = Arc::new(Mutex::new(Codec::new(io)));
95 let (send, recv) = Receiver::new(1);
96 let drive = SyncDriveExternal(Arc::new(Box::new(inner.clone())), send);
97
98 Connection(inner, drive, recv)
99}
100
101/// Server connection for accepting incoming requests.
102///
103/// See [module level doc](index.html) for an example.
104pub struct Connection<S>(Arc<Mutex<Codec<S>>>, SyncDriveExternal, Receiver<()>);
105
106impl<S> Connection<S>
107where
108 S: AsyncRead + AsyncWrite + Unpin,
109{
110 /// Accept a new incoming request to handle.
111 pub async fn accept(
112 &mut self,
113 ) -> Option<Result<(http::Request<RecvStream>, SendResponse), Error>> {
114 poll_fn(|cx| Pin::new(&mut *self).poll_accept(cx))
115 .await
116 .map(|v| v.map_err(|x| x.into()))
117 }
118
119 /// Wait until the connection has sent/flushed all data and is ok to drop.
120 pub async fn close(mut self) {
121 poll_fn(|cx| Pin::new(&mut self).poll_close(cx)).await;
122 }
123
124 fn poll_accept(
125 self: Pin<&mut Self>,
126 cx: &mut Context,
127 ) -> Poll<Option<Result<(http::Request<RecvStream>, SendResponse), io::Error>>> {
128 let this = self.get_mut();
129
130 // This will register on previous SyncDriveExternal being dropped.
131 ready!(this.1.poll_pending_external(cx, &this.2));
132
133 let drive_external = this.1.clone();
134
135 let mut lock = this.0.lock().unwrap();
136
137 lock.poll_server(cx, Some(drive_external), true)
138 }
139
140 fn poll_close(self: Pin<&mut Self>, cx: &mut Context) -> Poll<()> {
141 let mut lock = self.0.lock().unwrap();
142
143 // It doesn't matter what the return value is, we just need it to not be pending.
144 ready!(lock.poll_server(cx, None, true));
145
146 ().into()
147 }
148}
149
150/// Handle to send a response and body back for a single request.
151///
152/// See [module level doc](index.html) for an example.
153pub struct SendResponse {
154 drive_external: SyncDriveExternal,
155 tx_res: Sender<(http::Response<()>, bool, Receiver<(Vec<u8>, bool)>)>,
156 req_expects_no_body: bool,
157}
158
159impl SendResponse {
160 /// Send a response to a request. Notice that the body is sent separately afterwards.
161 ///
162 /// The lib will infer that there will be no response body if there is a `content-length: 0`
163 /// header or a status code that should not have a body (1xx, 204, 304).
164 ///
165 /// `no_body` is an alternative way, in addition to headers and status, to inform the library
166 /// there will be no body to send.
167 ///
168 /// It's an error to send a body when the status or headers indicate there should not be one.
169 pub async fn send_response(
170 self,
171 response: http::Response<()>,
172 no_body: bool,
173 ) -> Result<SendStream, Error> {
174 trace!("Send response: {:?}", response);
175
176 // bounded to get back pressure
177 let (tx_body, rx_body) = Receiver::new(1);
178
179 let limit = LimitWrite::from_headers(response.headers());
180
181 let status = response.status();
182
183 // https://tools.ietf.org/html/rfc7230#page-31
184 // any response with a 1xx (Informational), 204 (No Content), or
185 // 304 (Not Modified) status code is always terminated by the first
186 // empty line after the header fields, regardless of the header fields
187 // present in the message, and thus cannot contain a message body.
188 let ended = no_body
189 || self.req_expects_no_body
190 || limit.is_no_body()
191 || status.is_informational()
192 || status == http::StatusCode::NO_CONTENT
193 || status == http::StatusCode::NOT_MODIFIED;
194
195 let drive_external = Some(self.drive_external.clone());
196
197 let send = SendStream::new(tx_body, limit, ended, drive_external);
198
199 if !self.tx_res.send((response, ended, rx_body)) {
200 return Err(io::Error::new(io::ErrorKind::Other, "Connection closed").into());
201 }
202
203 poll_fn(|cx| self.drive_external.poll_drive_external(cx)).await?;
204
205 Ok(send)
206 }
207}
208pub(crate) struct Codec<S> {
209 io: BufIo<S>,
210 state: State,
211}
212
213impl<S> Codec<S>
214where
215 S: AsyncRead + AsyncWrite + Unpin,
216{
217 fn new(io: S) -> Self {
218 Codec {
219 io: BufIo::with_capacity(READ_BUF_INIT_SIZE, io),
220 state: State::RecvReq(RecvReq),
221 }
222 }
223}
224
225impl<S> Codec<S>
226where
227 S: AsyncRead + AsyncWrite + Unpin,
228{
229 fn poll_server(
230 &mut self,
231 cx: &mut Context,
232 want_next_req: Option<SyncDriveExternal>,
233 register_on_user_input: bool,
234 ) -> Poll<Option<Result<(http::Request<RecvStream>, SendResponse), io::Error>>> {
235 // Any error bubbling up closes the connection.
236 match self.drive(cx, want_next_req, register_on_user_input) {
237 Poll::Ready(Some(Err(e))) => {
238 debug!("Close on error: {:?}", e);
239
240 trace!("{:?} => Closed", self.state);
241 self.state = State::Closed;
242
243 Some(Err(e)).into()
244 }
245 r => r,
246 }
247 }
248
249 fn drive(
250 &mut self,
251 cx: &mut Context,
252 want_next_req: Option<SyncDriveExternal>,
253 register_on_user_input: bool,
254 ) -> Poll<Option<Result<(http::Request<RecvStream>, SendResponse), io::Error>>> {
255 loop {
256 ready!(Pin::new(&mut self.io).poll_finish_pending_write(cx))?;
257
258 match &mut self.state {
259 State::RecvReq(h) => {
260 if let Some(want_next_req) = want_next_req {
261 let (next_req, next_state) =
262 ready!(h.poll_next_req(cx, &mut self.io, want_next_req))?;
263
264 trace!("RecvReq => {:?}", next_state);
265 self.state = next_state;
266
267 if let Some(next_req) = next_req {
268 return Some(Ok(next_req)).into();
269 } else {
270 return None.into();
271 }
272 } else {
273 // poll_drive() called with the intention of just driving server state
274 // and not to handle the next read request.
275 return None.into();
276 }
277 }
278 State::SendRes(h) => {
279 let next_state =
280 ready!(h.poll_bidirect(cx, &mut self.io, register_on_user_input))?;
281
282 trace!("SendRes => {:?}", next_state);
283 self.state = next_state;
284 }
285 State::SendBody(h) => {
286 let next_state =
287 ready!(h.poll_send_body(cx, &mut self.io, register_on_user_input))?;
288
289 trace!("SendBody => {:?}", next_state);
290 self.state = next_state;
291 }
292 State::Closed => {
293 // Nothing to do
294 return None.into();
295 }
296 }
297 }
298 }
299}
300
301enum State {
302 /// Receive next request.
303 RecvReq(RecvReq),
304 /// Send response, and (if appropriate) receive request body.
305 SendRes(Bidirect),
306 /// Send response body.
307 SendBody(BodySender),
308 /// Closed, error or cleanly.
309 Closed,
310}
311
312impl fmt::Debug for State {
313 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
314 match self {
315 State::RecvReq(_) => write!(f, "RecvReq"),
316 State::SendRes(_) => write!(f, "SendRes"),
317 State::SendBody(_) => write!(f, "SendBody"),
318 State::Closed => write!(f, "Closed"),
319 }
320 }
321}
322
323/// Waiting for the next request to arrive.
324///
325/// Reads a buffer for 2 x crlf to know we got an entire request header.
326struct RecvReq;
327
328impl RecvReq {
329 fn poll_next_req<S>(
330 &mut self,
331 cx: &mut Context,
332 io: &mut BufIo<S>,
333 drive_external: SyncDriveExternal,
334 ) -> Poll<Result<(Option<(http::Request<RecvStream>, SendResponse)>, State), io::Error>>
335 where
336 S: AsyncRead + AsyncWrite + Unpin,
337 {
338 let req = match ready!(poll_for_crlfcrlf(cx, io, try_parse_req)).and_then(|x| x) {
339 Ok(v) => v,
340 Err(e) => {
341 if is_closed_kind(e.kind()) {
342 // remote just hung up before sending request, that's ok.
343 return Ok((None, State::Closed)).into();
344 } else {
345 return Err(e).into();
346 }
347 }
348 };
349
350 if req.is_none() {
351 return Err(io::Error::new(
352 io::ErrorKind::InvalidData,
353 "Failed to parse request",
354 ))
355 .into();
356 }
357 let req = req.expect("Didn't read full request");
358
359 // Limiter to read the correct body amount from the socket.
360 let limit = LimitRead::from_headers(req.headers(), false);
361
362 let request_allows_reuse = allow_reuse(req.headers(), req.version());
363
364 let no_req_body = limit.is_no_body();
365
366 // https://tools.ietf.org/html/rfc7230#page-31
367 // Any response to a HEAD request ... is always terminated by the first
368 // empty line after the header fields, regardless of the header fields
369 // present in the message, and thus cannot contain a message body.
370 let req_expects_no_body = req.method() == http::Method::HEAD;
371
372 // bound channel to get backpressure
373 let (tx_body, rx_body) = Receiver::new(1);
374
375 let (tx_res, rx_res) = Receiver::new(1);
376
377 // Prepare the new "package" to be delivered out of the poll loop.
378 let package = {
379 //
380 let recv = RecvStream::new(rx_body, no_req_body, Some(drive_external.clone()));
381
382 let (parts, _) = req.into_parts();
383 let req = http::Request::from_parts(parts, recv);
384
385 let send = SendResponse {
386 drive_external,
387 tx_res,
388 req_expects_no_body,
389 };
390
391 (req, send)
392 };
393
394 // Drop tx_body straight away if headers indicate we are not expecting any request body.
395 let tx_body = if limit.is_no_body() {
396 None
397 } else {
398 Some(tx_body)
399 };
400
401 let cur_read_size = limit.body_size().unwrap_or(8192).min(MAX_BODY_READ_SIZE) as usize;
402
403 let bidirect = Bidirect {
404 limit,
405 request_allows_reuse,
406 tx_body,
407 rx_res: Some(rx_res),
408 holder: None,
409 cur_read_size,
410 };
411
412 Ok((Some(package), State::SendRes(bidirect))).into()
413 }
414}
415
416/// Both receive a request body (if headers indicate it), and
417/// send a response which is obtained from the library user.
418struct Bidirect {
419 // limiter/dechunker for reading incoming request body.
420 limit: LimitRead,
421 // remember this for when we are to go back into state RecvReq
422 request_allows_reuse: bool,
423 // send body chunks from socket to this sender.
424 tx_body: Option<Sender<io::Result<Vec<u8>>>>,
425 // receive a response (once), from this to pass to socket.
426 rx_res: Option<Receiver<(http::Response<()>, bool, Receiver<(Vec<u8>, bool)>)>>,
427 // Holder of data from rx_res used to receive/write a response body.
428 holder: Option<(bool, LimitWrite, Receiver<(Vec<u8>, bool)>)>,
429 // The current read buffer size for receving the request body.
430 cur_read_size: usize,
431}
432
433impl Bidirect {
434 fn poll_bidirect<S>(
435 &mut self,
436 cx: &mut Context,
437 io: &mut BufIo<S>,
438 register_on_user_input: bool,
439 ) -> Poll<Result<State, io::Error>>
440 where
441 S: AsyncRead + AsyncWrite + Unpin,
442 {
443 // Alternate between attempting to send a user response and receving more body chunks.
444 loop {
445 // We keep on looping until both these are None which signals
446 // the bidirect state is done.
447 if self.rx_res.is_none() && self.tx_body.is_none() {
448 break;
449 }
450
451 let mut send_resp_pending = false;
452
453 // Handle user sending a response.
454 if self.rx_res.is_some() {
455 // register_on_user_input means we should register a Waker when polling for a response
456 // from the user. We should not register two wakers for the same Context, which means
457 // if we get Pending while register_on_user_input is false, we can proceed to also drive IO.
458 match self.poll_send_resp(cx, io, register_on_user_input) {
459 Poll::Pending => {
460 send_resp_pending = true;
461 }
462 Poll::Ready(v) => v?,
463 }
464 }
465
466 if send_resp_pending && (register_on_user_input || self.tx_body.is_none()) {
467 // If register_on_user_input:
468 // A Waker is registered in mpsc::Receiver::poll_recv.
469 // We cannot continue with IO since that would risk
470 // registering wakers in multiple places.
471 //
472 // If self.tx_body.is_none() we can't make progress on
473 // IO, and send_resp will not make progress by anything less
474 // than user input.
475
476 return Poll::Pending;
477 }
478
479 // Read request body from socket and propagate to user.
480 if self.tx_body.is_some() {
481 ready!(self.poll_read_body(cx, io))?;
482 }
483 }
484
485 // invariant: we must have the details required in holder.
486 let (no_body, limit, rx_body) = self.holder.take().expect("Holder of rx_body");
487
488 let next_state = if no_body || limit.is_no_body() {
489 if self.request_allows_reuse {
490 trace!("No body to send");
491 State::RecvReq(RecvReq)
492 } else {
493 trace!("Request does not allow reuse");
494 State::Closed
495 }
496 } else {
497 State::SendBody(BodySender {
498 request_allows_reuse: self.request_allows_reuse,
499 rx_body,
500 })
501 };
502
503 Ok(next_state).into()
504 }
505
506 fn poll_send_resp<S>(
507 &mut self,
508 cx: &mut Context,
509 io: &mut BufIo<S>,
510 register_on_user_input: bool,
511 ) -> Poll<Result<(), io::Error>>
512 where
513 S: AsyncRead + AsyncWrite + Unpin,
514 {
515 // We shouldn't be here unless we have rx_res.
516 let rx_res = self.rx_res.as_ref().unwrap();
517
518 if let Some((res, end, rx_body)) =
519 ready!(Pin::new(rx_res).poll_recv(cx, register_on_user_input))
520 {
521 // We got a response from the user.
522
523 // Remember things for the next state, SendBody
524 let limit = LimitWrite::from_headers(res.headers());
525 self.holder = Some((end, limit, rx_body));
526
527 let mut buf = FastBuf::with_capacity(MAX_RESPONSE_SIZE);
528
529 let mut write_to = buf.borrow();
530
531 let amount = write_http1x_res(&res, &mut write_to[..])?;
532
533 // If write_http1x_res reports the correct number of bytes written to
534 // the buffer, then this extend is safe.
535 unsafe {
536 write_to.extend(amount);
537 }
538
539 let mut to_send = Some(&buf[..]);
540
541 // invariant: poll_drive deals with pending outgoing io before anything
542 // else. at this point we should not have any pending write io.
543 assert!(io.can_poll_write());
544
545 match Pin::new(io).poll_write_all(cx, &mut to_send, true) {
546 Poll::Pending => {
547 // invariant: Pending without "taking" all to_send bytes is a fault in BufIo
548 assert!(to_send.is_none());
549 }
550 Poll::Ready(v) => v?,
551 }
552
553 // Remove rx_res since we don't need anything more from it. This makes
554 // poll_bidirect() not go into poll_send_resp anymore.
555 self.rx_res.take();
556 } else {
557 // The user dropped the SendResponse instance before sending a response.
558 // This is a user fault.
559 return Err(Error::User(
560 "SendResponse dropped before sending any response".to_string(),
561 )
562 .into_io())
563 .into();
564 }
565
566 Ok(()).into()
567 }
568
569 fn poll_read_body<S>(
570 &mut self,
571 cx: &mut Context,
572 io: &mut BufIo<S>,
573 ) -> Poll<Result<(), io::Error>>
574 where
575 S: AsyncRead + AsyncWrite + Unpin,
576 {
577 // We shouldn't be here unless we have tx_body.
578 let tx_body = self.tx_body.as_mut().unwrap();
579
580 // Ensure we can send off any incoming read chunk to the user. This makes for flow control.
581 if !ready!(Pin::new(&*tx_body).poll_ready(cx, true)) {
582 // User has dropped the RecvStream. That's ok, we will just discard
583 // the entire incoming body.
584 }
585
586 io.ensure_read_capacity(self.cur_read_size);
587
588 let buf = ready!(Pin::new(&mut *io).poll_fill_buf(cx, false))?;
589
590 if buf.is_empty() {
591 // End of incoming data before we have fulfilled the LimitRead.
592 // configured by the headers.
593 return Err(io::Error::new(
594 io::ErrorKind::UnexpectedEof,
595 "EOF before complete body received",
596 ))
597 .into();
598 }
599
600 let available_bytes = buf.len();
601
602 let chunk = if self.limit.can_read_entire_vec() && io.can_take_read_buf() {
603 // This is an optimization. If we're using a content-length and not
604 // chunked, we can sometimes take the entire input buffer and therefore
605 // avoiding some data copying.
606
607 let chunk = io.take_read_buf();
608
609 // To keep counting the size of the chunks
610 self.limit.accept_entire_vec(&chunk);
611
612 chunk
613 } else {
614 let mut chunk = FastBuf::with_capacity(available_bytes);
615
616 let mut read_into = chunk.borrow();
617
618 let amount = ready!(self.limit.poll_read(cx, io, &mut read_into[..]))?;
619
620 // If poll_read is reporting the correct amount of bytes read into buf,
621 // then this extend is safe.
622 unsafe {
623 read_into.extend(amount);
624 }
625
626 chunk.into_vec()
627 };
628
629 trace!("Received body chunk len={}", chunk.len());
630
631 if !chunk.is_empty() {
632 tx_body.send(Ok(chunk));
633 } else if !self.limit.is_complete() {
634 // https://tools.ietf.org/html/rfc7230#page-32
635 // If the sender closes the connection or
636 // the recipient times out before the indicated number of octets are
637 // received, the recipient MUST consider the message to be
638 // incomplete and close the connection.
639
640 trace!("Close because read body is not complete");
641 const EOF: io::ErrorKind = io::ErrorKind::UnexpectedEof;
642 return Err(io::Error::new(EOF, "Partial body")).into();
643 }
644
645 if self.limit.is_complete() {
646 // Remove tx_body Sender which indicates to the RecvStream that there is
647 // no more body chunks to come.
648 self.tx_body.take();
649 }
650
651 Ok(()).into()
652 }
653}
654
655/// Sender of a response body.
656struct BodySender {
657 request_allows_reuse: bool,
658 rx_body: Receiver<(Vec<u8>, bool)>,
659}
660
661impl BodySender {
662 fn poll_send_body<S>(
663 &mut self,
664 cx: &mut Context,
665 io: &mut BufIo<S>,
666 register_on_user_input: bool,
667 ) -> Poll<Result<State, io::Error>>
668 where
669 S: AsyncRead + AsyncWrite + Unpin,
670 {
671 // Keep try to send body chunks until we got no more to send or Pending.
672 loop {
673 // Always abort on Pending, but register_on_user_input controls whether this resulted in
674 // any Waker being registered. This makes for flow control.
675 let next = ready!(Pin::new(&self.rx_body).poll_recv(cx, register_on_user_input));
676
677 // Pending writes must have been dealt with already at the beginning of poll_drive().
678 assert!(io.can_poll_write());
679
680 if let Some((chunk, end)) = next {
681 let mut buf = Some(&chunk[..]);
682
683 match Pin::new(&mut *io).poll_write_all(cx, &mut buf, end) {
684 Poll::Pending => {
685 // invariant: The buffer must still been taken by poll_write.
686 assert!(buf.is_none());
687 return Poll::Pending;
688 }
689 Poll::Ready(v) => v?,
690 }
691
692 if end {
693 let next_state = if self.request_allows_reuse {
694 trace!("Finished sending body");
695 State::RecvReq(RecvReq)
696 } else {
697 trace!("Request does not allow reuse");
698 State::Closed
699 };
700
701 return Ok(next_state).into();
702 }
703 } else {
704 // This is a fault, we are expecting more body chunks and
705 // the SendStream was dropped.
706 warn!("SendStream dropped before sending end_of_body");
707
708 return Err(io::Error::new(
709 io::ErrorKind::Other,
710 "Unexpected end of body",
711 ))
712 .into();
713 }
714 }
715 }
716}
717
718impl<S> std::fmt::Debug for Connection<S> {
719 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
720 write!(f, "Connection")
721 }
722}
723
724impl fmt::Debug for SendResponse {
725 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
726 write!(f, "SendResponse")
727 }
728}
729
730// These unsafe require some explanation. We want to be able to call Codec<S>::poll_drive
731// from both RecvStream and SendStream, however we don't want those two to be
732// generic over S. That leads us down the path of dynamic dispatch and "hiding"
733// S behind a Box<dyn DriveExternal>. So we implement that trait for Arc<Mutex<Codec<S>>>,
734// sorted... but oh not.
735//
736// If we put Box<dyn DriveExternal> as a property in SendStream, rust will later "discover"
737// this when it in an async context like async_std::spawn. Rust will say that DriveExternal
738// is not Sync/Send and if we try to constrain it, that will in turn propagate to S, and
739// we _don't_ want S to require Sync/Send.
740//
741// However. We always put S behind Arc<Mutex<Codec<S>>> and our treatment of S is
742// absolutely Sync/Send because of that mutex. That leads us to wrapping
743// Box<dyn DriveExternal> in some struct we can "unsafe impl Sync" for, and that's
744// SyncDriveExternal.
745unsafe impl Send for SyncDriveExternal {}
746unsafe impl Sync for SyncDriveExternal {}
747
748#[derive(Clone)]
749pub(crate) struct SyncDriveExternal(Arc<Box<dyn DriveExternal>>, Sender<()>);
750
751impl SyncDriveExternal {
752 // count_external() tells us how many Arc<Box<dyn DriveExternal>> exists.
753 // When we are not actively handling a request, this is 1.
754 //
755 // When a Sender is dropped (inside the cloned Box<dyn DriveExternal> in RecvStream
756 // and SendStream), it wakes the Receiver, and we use this as a mechanism to "monitor"
757 // when SyncDriveExternal instances are being dropped.
758 fn poll_pending_external(&mut self, cx: &mut Context, recv: &Receiver<()>) -> Poll<()> {
759 let external = self.count_external();
760 trace!("poll_pending_external: {}", external);
761 if self.count_external() == 1 {
762 trace!("poll_pending_external: Ready");
763 ().into()
764 } else {
765 match Pin::new(recv).poll_recv(cx, true) {
766 Poll::Pending => {
767 trace!("poll_pending_external Pending");
768 Poll::Pending
769 }
770 Poll::Ready(_) => {
771 // invariant: there is always a Sender in MakeDriveExternal, and they
772 // never send anything.
773 unreachable!()
774 }
775 }
776 }
777 }
778
779 fn count_external(&self) -> usize {
780 Arc::weak_count(&self.0) + Arc::strong_count(&self.0)
781 }
782}
783
784impl DriveExternal for SyncDriveExternal {
785 fn poll_drive_external(&self, cx: &mut Context) -> Poll<Result<(), io::Error>> {
786 self.0.poll_drive_external(cx)
787 }
788}
789
790pub(crate) trait DriveExternal {
791 fn poll_drive_external(&self, cx: &mut Context) -> Poll<Result<(), io::Error>>;
792}
793
794impl<S> DriveExternal for Arc<Mutex<Codec<S>>>
795where
796 S: AsyncRead + AsyncWrite + Unpin,
797{
798 fn poll_drive_external(&self, cx: &mut Context) -> Poll<Result<(), io::Error>> {
799 let mut lock = self.lock().unwrap();
800
801 match lock.poll_server(cx, None, false) {
802 Poll::Pending => {
803 let pending_io = lock.io.pending_rx() || lock.io.pending_tx();
804
805 trace!("pending_io: {}", pending_io);
806
807 // Only propagate Pending if it was due to io. We send register_on_user_input
808 // false, which means that reading user input from SendResponse and SendStream
809 // will not have registered a Waker. Pending due to IO most propagate as Pending.
810 if pending_io {
811 Poll::Pending
812 } else {
813 Ok(()).into()
814 }
815 }
816 Poll::Ready(Some(Ok(_))) => {
817 // invariant: want_next_req is false, this should not happend.
818 unreachable!("Got next request in poll_drive_external");
819 }
820 // Propagate error
821 Poll::Ready(Some(Err(e))) => Err(e).into(),
822 //
823 Poll::Ready(None) => Ok(()).into(),
824 }
825 }
826}