ntex 3.7.2

Framework for composable network services
Documentation
use std::task::{Poll, ready};
use std::{future::poll_fn, io, io::Write, pin::Pin, task, time::Instant};

use crate::http::body::{Body, BodySize, MessageBody};
use crate::http::error::PayloadError;
use crate::http::header::{HOST, HeaderValue};
use crate::http::{Payload, PayloadStream, ResponseHead, Version, h1};
use crate::io::{IoBoxed, RecvError};
use crate::time::{Millis, timeout_checked};
use crate::util::{BufMut, Bytes, BytesMut, Stream};

use super::connection::{Connection, ConnectionType};
use super::error::{ClientError, ConnectError};
use super::{ClientCodec, ClientPayloadCodec, ClientRawRequest, pool::Acquired};

pub(super) async fn send_request(
    io: IoBoxed,
    mut req: ClientRawRequest,
    body: Body,
    created: Instant,
    timeout: Millis,
    pool: Option<Acquired>,
) -> Result<(ResponseHead, Payload), ClientError> {
    // set request host header
    if !req.head.headers.contains_key(HOST)
        && let Some(host) = req.head.uri.host()
    {
        let mut wrt = BytesMut::with_capacity(host.len() + 5).writer();

        let _ = match req.head.uri.port_u16() {
            None | Some(80 | 443) => write!(wrt, "{host}"),
            Some(port) => write!(wrt, "{host}:{port}"),
        };

        match HeaderValue::from_shared(wrt.get_mut().take()) {
            Ok(value) => req.head.headers.insert(HOST, value),
            Err(e) => log::error!("Cannot set HOST header {e}"),
        }
    }

    log::trace!("sending http1 request {req:?} body size: {:?}", body.size());

    // send request
    let codec = ClientCodec::new(true, io.shared().get());
    io.send(req.into(), &codec).await?;

    log::trace!("http1 request has been sent");

    // send request body
    match body.size() {
        BodySize::None | BodySize::Empty | BodySize::Sized(0) => (),
        _ => {
            send_body(body, &io, &codec).await?;
        }
    }

    log::trace!("reading http1 response");

    // read response and init read body
    let fut = async {
        if let Some(result) = io.recv(&codec).await? {
            log::trace!(
                "http1 response is received, type: {:?}, response: {result:#?}",
                codec.message_type()
            );
            Ok(result)
        } else {
            Err(ClientError::from(ConnectError::Disconnected(None)))
        }
    };

    let head = timeout_checked(timeout, fut)
        .await
        .map_err(|()| ClientError::Timeout)
        .and_then(|res| res)?;

    if codec.message_type() == h1::MessageType::None {
        release_connection(io, !codec.keepalive(), created, pool);
        Ok((head, Payload::None))
    } else {
        let pl: PayloadStream = Box::pin(PlStream::new(
            io,
            codec,
            created,
            pool,
            head.version == Version::HTTP_10,
        ));
        Ok((head, pl.into()))
    }
}

/// send request body to the peer
pub(super) async fn send_body(
    mut body: Body,
    io: &IoBoxed,
    codec: &ClientCodec,
) -> Result<(), ClientError> {
    loop {
        if let Some(result) = poll_fn(|cx| body.poll_next_chunk(cx)).await {
            io.encode(h1::Message::Chunk(Some(result?)), codec)?;
            io.flush(false).await?;
        } else {
            io.encode(h1::Message::Chunk(None), codec)?;
            break;
        }
    }
    io.flush(true).await?;

    Ok(())
}

pub(super) struct PlStream {
    io: Option<IoBoxed>,
    codec: ClientPayloadCodec,
    created: Instant,
    http_10: bool,
    pool: Option<Acquired>,
}

impl PlStream {
    fn new(
        io: IoBoxed,
        codec: ClientCodec,
        created: Instant,
        pool: Option<Acquired>,
        http_10: bool,
    ) -> Self {
        PlStream {
            io: Some(io),
            codec: codec.into_payload_codec(),
            created,
            pool,
            http_10,
        }
    }
}

impl Stream for PlStream {
    type Item = Result<Bytes, PayloadError>;

    fn poll_next(
        mut self: Pin<&mut Self>,
        cx: &mut task::Context<'_>,
    ) -> Poll<Option<Self::Item>> {
        let mut this = self.as_mut();
        loop {
            let item = ready!(this.io.as_ref().unwrap().poll_recv(&this.codec, cx));
            return Poll::Ready(Some(match item {
                Ok(chunk) => {
                    if let Some(chunk) = chunk {
                        Ok(chunk)
                    } else {
                        release_connection(
                            this.io.take().unwrap(),
                            !this.codec.keepalive(),
                            this.created,
                            this.pool.take(),
                        );
                        return Poll::Ready(None);
                    }
                }
                Err(RecvError::KeepAlive) => {
                    Err(io::Error::new(io::ErrorKind::TimedOut, "Keep-alive").into())
                }
                Err(RecvError::WriteBackpressure) => {
                    ready!(this.io.as_ref().unwrap().poll_flush(cx, false))?;
                    continue;
                }
                Err(RecvError::Decoder(err)) => Err(err),
                Err(RecvError::PeerGone(Some(err))) => {
                    Err(PayloadError::Incomplete(Some(err)))
                }
                Err(RecvError::PeerGone(None)) => {
                    if this.http_10 {
                        return Poll::Ready(None);
                    }
                    Err(PayloadError::Incomplete(None))
                }
            }));
        }
    }
}

fn release_connection(
    io: IoBoxed,
    force_close: bool,
    created: Instant,
    mut pool: Option<Acquired>,
) {
    if force_close || io.is_closed() || io.with_read_buf(|buf| !buf.is_empty()) {
        if let Some(mut pool) = pool.take() {
            pool.release(Connection::new(ConnectionType::H1(io), created, None), true);
        }
    } else if let Some(mut pool) = pool.take() {
        pool.release(
            Connection::new(ConnectionType::H1(io), created, None),
            false,
        );
    }
}