#[cfg(any(
feature = "rustls-aws",
feature = "rustls-ring",
feature = "native-tls"
))]
use alloc::string::ToString;
use alloc::{boxed::Box, string::String, vec, vec::Vec};
use std::io::{self, Read, Write};
#[cfg(any(
feature = "rustls-aws",
feature = "rustls-ring",
feature = "native-tls"
))]
use pimalaya_stream::{std::stream::StreamStd, tls::Tls};
use thiserror::Error;
use url::Url;
use crate::{
coroutine::*,
rfc1945::send::*,
rfc9110::{
headers::TRANSFER_ENCODING,
request::HttpRequest,
response::HttpResponse,
send::{HttpSendOutput, HttpSendYield},
},
rfc9112::{chunk_stream::*, read_headers::*, send::*},
sse::frame::*,
};
const READ_BUFFER_SIZE: usize = 16 * 1024;
pub fn default_alpn() -> Vec<String> {
vec![String::from("http/1.1")]
}
#[derive(Debug, Error)]
pub enum HttpClientStdError {
#[error(transparent)]
Http10Send(#[from] Http10SendError),
#[error(transparent)]
Http11Send(#[from] Http11SendError),
#[error(transparent)]
Io(#[from] io::Error),
#[cfg(any(
feature = "rustls-aws",
feature = "rustls-ring",
feature = "native-tls"
))]
#[error(transparent)]
Tls(#[from] anyhow::Error),
#[cfg(any(
feature = "rustls-aws",
feature = "rustls-ring",
feature = "native-tls"
))]
#[error("HTTP URL `{0}` has no host")]
UrlMissingHost(String),
#[cfg(any(
feature = "rustls-aws",
feature = "rustls-ring",
feature = "native-tls"
))]
#[error("HTTP URL `{0}` has unsupported scheme `{1}` (expected `http` or `https`)")]
UrlUnsupportedScheme(String, String),
#[error("HTTP server redirected to `{url}` (status `{code}`)")]
UnexpectedRedirect { url: Url, code: u16 },
#[error("HTTP streaming requires `Transfer-Encoding: chunked` (got status `{0}`)")]
StreamingNotChunked(u16),
#[error(transparent)]
ChunkStream(#[from] Http11ReadChunksStreamError),
}
pub struct HttpClientStd {
stream: Box<dyn HttpStream>,
}
impl HttpClientStd {
pub fn new<S: Read + Write + Send + 'static>(stream: S) -> Self {
Self {
stream: Box::new(stream),
}
}
#[cfg(any(
feature = "rustls-aws",
feature = "rustls-ring",
feature = "native-tls"
))]
pub fn connect(url: &Url, tls: &Tls) -> Result<Self, HttpClientStdError> {
let host = url
.host_str()
.ok_or_else(|| HttpClientStdError::UrlMissingHost(url.to_string()))?;
let stream = match url.scheme() {
"http" => StreamStd::connect_tcp(host, url.port_or_known_default().unwrap_or(80))?,
"https" => {
StreamStd::connect_tls(host, url.port_or_known_default().unwrap_or(443), tls)?
}
scheme => {
return Err(HttpClientStdError::UrlUnsupportedScheme(
url.to_string(),
scheme.to_string(),
));
}
};
Ok(Self {
stream: Box::new(stream),
})
}
pub fn set_stream<S: Read + Write + Send + 'static>(&mut self, stream: S) {
self.stream = Box::new(stream);
}
pub fn run<C, T, E>(&mut self, mut coroutine: C) -> Result<T, HttpClientStdError>
where
C: HttpCoroutine<Yield = HttpYield, Return = Result<T, E>>,
HttpClientStdError: From<E>,
{
let mut buf = [0u8; READ_BUFFER_SIZE];
let mut arg: Option<&[u8]> = None;
loop {
match coroutine.resume(arg.take()) {
HttpCoroutineState::Complete(Ok(out)) => return Ok(out),
HttpCoroutineState::Complete(Err(err)) => return Err(err.into()),
HttpCoroutineState::Yielded(HttpYield::WantsRead) => {
let n = self.stream.read(&mut buf)?;
arg = Some(&buf[..n]);
}
HttpCoroutineState::Yielded(HttpYield::WantsWrite(bytes)) => {
self.stream.write_all(&bytes)?;
arg = None;
}
}
}
}
pub fn send(&mut self, request: HttpRequest) -> Result<HttpSendOutput, HttpClientStdError> {
let mut coroutine = Http11Send::new(request);
let mut buf = [0u8; READ_BUFFER_SIZE];
let mut arg: Option<&[u8]> = None;
loop {
match coroutine.resume(arg.take()) {
HttpCoroutineState::Complete(Ok(out)) => return Ok(out),
HttpCoroutineState::Complete(Err(err)) => return Err(err.into()),
HttpCoroutineState::Yielded(HttpSendYield::WantsRead) => {
let n = self.stream.read(&mut buf)?;
arg = Some(&buf[..n]);
}
HttpCoroutineState::Yielded(HttpSendYield::WantsWrite(bytes)) => {
self.stream.write_all(&bytes)?;
arg = None;
}
HttpCoroutineState::Yielded(HttpSendYield::WantsRedirect {
url, response, ..
}) => {
return Err(HttpClientStdError::UnexpectedRedirect {
url,
code: *response.status,
});
}
}
}
}
pub fn send_http10(
&mut self,
request: HttpRequest,
) -> Result<HttpSendOutput, HttpClientStdError> {
let mut coroutine = Http10Send::new(request);
let mut buf = [0u8; READ_BUFFER_SIZE];
let mut arg: Option<&[u8]> = None;
loop {
match coroutine.resume(arg.take()) {
HttpCoroutineState::Complete(Ok(out)) => return Ok(out),
HttpCoroutineState::Complete(Err(err)) => return Err(err.into()),
HttpCoroutineState::Yielded(HttpSendYield::WantsRead) => {
let n = self.stream.read(&mut buf)?;
arg = Some(&buf[..n]);
}
HttpCoroutineState::Yielded(HttpSendYield::WantsWrite(bytes)) => {
self.stream.write_all(&bytes)?;
arg = None;
}
HttpCoroutineState::Yielded(HttpSendYield::WantsRedirect {
url, response, ..
}) => {
return Err(HttpClientStdError::UnexpectedRedirect {
url,
code: *response.status,
});
}
}
}
}
}
impl HttpClientStd {
pub fn send_streaming(self, request: HttpRequest) -> Result<SseStream, HttpClientStdError> {
let HttpClientStd { mut stream } = self;
let req_bytes = request.to_http_11_vec();
stream.write_all(&req_bytes)?;
let mut read_headers = Http11ReadHeaders::default();
let mut buf = [0u8; READ_BUFFER_SIZE];
let mut arg: Option<&[u8]> = None;
let out = loop {
match read_headers.resume(arg.take()) {
HttpCoroutineState::Complete(Ok(out)) => break out,
HttpCoroutineState::Complete(Err(err)) => {
return Err(Http11SendError::from(err).into());
}
HttpCoroutineState::Yielded(HttpYield::WantsRead) => {
let n = stream.read(&mut buf)?;
if n == 0 {
return Err(Http11SendError::Eof.into());
}
arg = Some(&buf[..n]);
}
HttpCoroutineState::Yielded(HttpYield::WantsWrite(_)) => {
unreachable!("Http11ReadHeaders never writes");
}
}
};
let chunked = out
.response
.header(TRANSFER_ENCODING)
.is_some_and(|enc| enc.eq_ignore_ascii_case("chunked"));
if !chunked {
return Err(HttpClientStdError::StreamingNotChunked(
*out.response.status,
));
}
Ok(SseStream {
stream,
chunk_stream: Http11ReadChunksStream::default(),
sse_parser: SseFrameParser::default(),
pending: None,
preread: out.remaining,
response: out.response,
keep_alive: out.keep_alive,
done: false,
})
}
}
pub struct SseStream {
stream: Box<dyn HttpStream>,
chunk_stream: Http11ReadChunksStream,
sse_parser: SseFrameParser,
pending: Option<Vec<u8>>,
preread: Vec<u8>,
response: HttpResponse,
keep_alive: bool,
done: bool,
}
impl SseStream {
pub fn response(&self) -> &HttpResponse {
&self.response
}
pub fn keep_alive(&self) -> bool {
self.keep_alive
}
pub fn last_event_id(&self) -> Option<&str> {
self.sse_parser.last_event_id()
}
pub fn next_frame(&mut self) -> Result<Option<SseFrame>, HttpClientStdError> {
if self.done {
return Ok(None);
}
loop {
let arg = self.pending.take();
match self.sse_parser.resume(arg.as_deref()) {
HttpCoroutineState::Yielded(SseFrameParserYield::Frame(frame)) => {
return Ok(Some(frame));
}
HttpCoroutineState::Yielded(SseFrameParserYield::WantsBytes) => {
match self.pull_chunk()? {
Some(body) => self.pending = Some(body),
None => {
self.done = true;
return Ok(None);
}
}
}
HttpCoroutineState::Complete(never) => match never {},
}
}
}
pub fn close(self) {
drop(self);
}
fn pull_chunk(&mut self) -> Result<Option<Vec<u8>>, HttpClientStdError> {
let mut tmp = [0u8; READ_BUFFER_SIZE];
let preread = core::mem::take(&mut self.preread);
let mut arg: Option<&[u8]> = if preread.is_empty() {
None
} else {
Some(&preread)
};
loop {
match self.chunk_stream.resume(arg.take()) {
HttpCoroutineState::Yielded(Http11ReadChunksStreamYield::Frame { body }) => {
return Ok(Some(body));
}
HttpCoroutineState::Complete(Ok(_remaining)) => return Ok(None),
HttpCoroutineState::Yielded(Http11ReadChunksStreamYield::WantsRead) => {
let n = self.stream.read(&mut tmp)?;
if n == 0 {
return Ok(None);
}
arg = Some(&tmp[..n]);
}
HttpCoroutineState::Complete(Err(err)) => return Err(err.into()),
}
}
}
}
impl Iterator for SseStream {
type Item = Result<SseFrame, HttpClientStdError>;
fn next(&mut self) -> Option<Self::Item> {
match self.next_frame() {
Ok(Some(frame)) => Some(Ok(frame)),
Ok(None) => None,
Err(err) => Some(Err(err)),
}
}
}
trait HttpStream: Read + Write + Send {}
impl<T: Read + Write + Send + ?Sized> HttpStream for T {}