use std::task::{Poll, ready};
use std::{future::poll_fn, io, io::Write, pin::Pin, task, time::Instant};
use crate::http::body::{Body, BodySize, MessageBody};
use crate::http::error::PayloadError;
use crate::http::header::{HOST, HeaderValue};
use crate::http::{Payload, PayloadStream, ResponseHead, Version, h1};
use crate::io::{IoBoxed, RecvError};
use crate::time::{Millis, timeout_checked};
use crate::util::{BufMut, Bytes, BytesMut, Stream};
use super::connection::{Connection, ConnectionType};
use super::error::{ClientError, ConnectError};
use super::{ClientCodec, ClientPayloadCodec, ClientRawRequest, pool::Acquired};
pub(super) async fn send_request(
io: IoBoxed,
mut req: ClientRawRequest,
body: Body,
created: Instant,
timeout: Millis,
pool: Option<Acquired>,
) -> Result<(ResponseHead, Payload), ClientError> {
if !req.head.headers.contains_key(HOST)
&& let Some(host) = req.head.uri.host()
{
let mut wrt = BytesMut::with_capacity(host.len() + 5).writer();
let _ = match req.head.uri.port_u16() {
None | Some(80 | 443) => write!(wrt, "{host}"),
Some(port) => write!(wrt, "{host}:{port}"),
};
match HeaderValue::from_shared(wrt.get_mut().take()) {
Ok(value) => req.head.headers.insert(HOST, value),
Err(e) => log::error!("Cannot set HOST header {e}"),
}
}
log::trace!("sending http1 request {req:?} body size: {:?}", body.size());
let codec = ClientCodec::new(true, io.shared().get());
io.send(req.into(), &codec).await?;
log::trace!("http1 request has been sent");
match body.size() {
BodySize::None | BodySize::Empty | BodySize::Sized(0) => (),
_ => {
send_body(body, &io, &codec).await?;
}
}
log::trace!("reading http1 response");
let fut = async {
if let Some(result) = io.recv(&codec).await? {
log::trace!(
"http1 response is received, type: {:?}, response: {result:#?}",
codec.message_type()
);
Ok(result)
} else {
Err(ClientError::from(ConnectError::Disconnected(None)))
}
};
let head = timeout_checked(timeout, fut)
.await
.map_err(|()| ClientError::Timeout)
.and_then(|res| res)?;
if codec.message_type() == h1::MessageType::None {
release_connection(io, !codec.keepalive(), created, pool);
Ok((head, Payload::None))
} else {
let pl: PayloadStream = Box::pin(PlStream::new(
io,
codec,
created,
pool,
head.version == Version::HTTP_10,
));
Ok((head, pl.into()))
}
}
pub(super) async fn send_body(
mut body: Body,
io: &IoBoxed,
codec: &ClientCodec,
) -> Result<(), ClientError> {
loop {
if let Some(result) = poll_fn(|cx| body.poll_next_chunk(cx)).await {
io.encode(h1::Message::Chunk(Some(result?)), codec)?;
io.flush(false).await?;
} else {
io.encode(h1::Message::Chunk(None), codec)?;
break;
}
}
io.flush(true).await?;
Ok(())
}
pub(super) struct PlStream {
io: Option<IoBoxed>,
codec: ClientPayloadCodec,
created: Instant,
http_10: bool,
pool: Option<Acquired>,
}
impl PlStream {
fn new(
io: IoBoxed,
codec: ClientCodec,
created: Instant,
pool: Option<Acquired>,
http_10: bool,
) -> Self {
PlStream {
io: Some(io),
codec: codec.into_payload_codec(),
created,
pool,
http_10,
}
}
}
impl Stream for PlStream {
type Item = Result<Bytes, PayloadError>;
fn poll_next(
mut self: Pin<&mut Self>,
cx: &mut task::Context<'_>,
) -> Poll<Option<Self::Item>> {
let mut this = self.as_mut();
loop {
let item = ready!(this.io.as_ref().unwrap().poll_recv(&this.codec, cx));
return Poll::Ready(Some(match item {
Ok(chunk) => {
if let Some(chunk) = chunk {
Ok(chunk)
} else {
release_connection(
this.io.take().unwrap(),
!this.codec.keepalive(),
this.created,
this.pool.take(),
);
return Poll::Ready(None);
}
}
Err(RecvError::KeepAlive) => {
Err(io::Error::new(io::ErrorKind::TimedOut, "Keep-alive").into())
}
Err(RecvError::WriteBackpressure) => {
ready!(this.io.as_ref().unwrap().poll_flush(cx, false))?;
continue;
}
Err(RecvError::Decoder(err)) => Err(err),
Err(RecvError::PeerGone(Some(err))) => {
Err(PayloadError::Incomplete(Some(err)))
}
Err(RecvError::PeerGone(None)) => {
if this.http_10 {
return Poll::Ready(None);
}
Err(PayloadError::Incomplete(None))
}
}));
}
}
}
fn release_connection(
io: IoBoxed,
force_close: bool,
created: Instant,
mut pool: Option<Acquired>,
) {
if force_close || io.is_closed() || io.with_read_buf(|buf| !buf.is_empty()) {
if let Some(mut pool) = pool.take() {
pool.release(Connection::new(ConnectionType::H1(io), created, None), true);
}
} else if let Some(mut pool) = pool.take() {
pool.release(
Connection::new(ConnectionType::H1(io), created, None),
false,
);
}
}