use crate::error::{HeaderMap, IntoHeaderPair, LingerError};
use bytes::Bytes;
use futures_core::Stream;
use futures_util::StreamExt;
use std::fmt;
use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;
pub type BodyStream = Pin<Box<dyn Stream<Item = Result<Bytes, LingerError>> + Send>>;
pub enum HttpRequestBody {
Bytes(Bytes),
Stream(BodyStream),
}
impl HttpRequestBody {
pub fn is_stream(&self) -> bool {
matches!(self, Self::Stream(_))
}
pub fn as_bytes(&self) -> Option<&[u8]> {
match self {
Self::Bytes(bytes) => Some(bytes),
Self::Stream(_) => None,
}
}
pub fn into_stream(self) -> BodyStream {
match self {
Self::Bytes(bytes) => {
Box::pin(futures_util::stream::once(async move { Ok(bytes) })) as BodyStream
}
Self::Stream(stream) => stream,
}
}
}
impl fmt::Debug for HttpRequestBody {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::Bytes(bytes) => write!(f, "<{} bytes>", bytes.len()),
Self::Stream(_) => f.write_str("<stream>"),
}
}
}
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
#[non_exhaustive]
pub enum HttpMethod {
Get,
Post,
Delete,
}
pub struct HttpRequest {
method: HttpMethod,
url: String,
path: String,
headers: HeaderMap,
body: Option<HttpRequestBody>,
}
impl HttpRequest {
pub fn new(method: HttpMethod, base_url: impl AsRef<str>, path: impl Into<String>) -> Self {
let path = path.into();
let url = format!("{}{}", base_url.as_ref().trim_end_matches('/'), path);
Self {
method,
url,
path,
headers: HeaderMap::new(),
body: None,
}
}
pub fn method(&self) -> HttpMethod {
self.method
}
pub fn url(&self) -> &str {
&self.url
}
pub fn path(&self) -> &str {
&self.path
}
pub fn insert_header(&mut self, name: impl Into<String>, value: impl Into<String>) {
self.headers.insert(name, value);
}
pub fn header(&self, name: &str) -> Option<&str> {
self.headers.get(name)
}
pub fn headers(&self) -> &HeaderMap {
&self.headers
}
pub fn set_body(&mut self, body: impl Into<Bytes>) {
self.body = Some(HttpRequestBody::Bytes(body.into()));
}
pub fn set_body_stream<S>(&mut self, body: S)
where
S: Stream<Item = Result<Bytes, LingerError>> + Send + 'static,
{
self.body = Some(HttpRequestBody::Stream(Box::pin(body)));
}
pub fn body(&self) -> Option<&[u8]> {
self.body.as_ref().and_then(HttpRequestBody::as_bytes)
}
pub fn body_is_stream(&self) -> bool {
self.body.as_ref().is_some_and(HttpRequestBody::is_stream)
}
pub fn into_body(self) -> Option<HttpRequestBody> {
self.body
}
}
impl fmt::Debug for HttpRequest {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("HttpRequest")
.field("method", &self.method)
.field("url", &self.url)
.field("path", &self.path)
.field("headers", &self.headers)
.field("body", &self.body.as_ref())
.finish()
}
}
enum HttpBody {
Bytes(Bytes),
Stream(BodyStream),
}
pub struct HttpResponse {
status: u16,
headers: HeaderMap,
body: HttpBody,
}
impl HttpResponse {
pub fn from_bytes<I, P>(status: u16, headers: I, body: impl Into<Bytes>) -> Self
where
I: IntoIterator<Item = P>,
P: IntoHeaderPair,
{
Self {
status,
headers: HeaderMap::from_pairs(headers),
body: HttpBody::Bytes(body.into()),
}
}
pub fn from_stream<I, P, S>(status: u16, headers: I, body: S) -> Self
where
I: IntoIterator<Item = P>,
P: IntoHeaderPair,
S: Stream<Item = Result<Bytes, LingerError>> + Send + 'static,
{
Self {
status,
headers: HeaderMap::from_pairs(headers),
body: HttpBody::Stream(Box::pin(body)),
}
}
pub fn status(&self) -> u16 {
self.status
}
pub fn headers(&self) -> &HeaderMap {
&self.headers
}
pub fn into_parts(self) -> (u16, HeaderMap, BodyStream) {
let body = match self.body {
HttpBody::Bytes(bytes) => {
Box::pin(futures_util::stream::once(async move { Ok(bytes) })) as BodyStream
}
HttpBody::Stream(stream) => stream,
};
(self.status, self.headers, body)
}
pub fn into_body_stream(self) -> BodyStream {
self.into_parts().2
}
pub async fn into_bytes(self) -> Result<(u16, HeaderMap, Bytes), LingerError> {
let (status, headers, mut stream) = self.into_parts();
let mut body = Vec::new();
while let Some(chunk) = stream.next().await {
body.extend_from_slice(&chunk?);
}
Ok((status, headers, Bytes::from(body)))
}
}
pub trait Transport: Send + Sync {
fn send(
&self,
request: HttpRequest,
) -> Pin<Box<dyn Future<Output = Result<HttpResponse, LingerError>> + Send + '_>>;
}
#[derive(Clone)]
pub struct SharedTransport {
inner: Arc<dyn Transport>,
}
impl SharedTransport {
pub fn new<T>(transport: T) -> Self
where
T: Transport + 'static,
{
Self {
inner: Arc::new(transport),
}
}
pub fn send(
&self,
request: HttpRequest,
) -> Pin<Box<dyn Future<Output = Result<HttpResponse, LingerError>> + Send + '_>> {
self.inner.send(request)
}
}
impl fmt::Debug for SharedTransport {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("SharedTransport").finish_non_exhaustive()
}
}
#[cfg(feature = "reqwest-transport")]
#[derive(Clone, Debug)]
pub struct ReqwestTransport {
client: reqwest::Client,
}
#[cfg(feature = "reqwest-transport")]
impl Default for ReqwestTransport {
fn default() -> Self {
Self {
client: reqwest::Client::new(),
}
}
}
#[cfg(feature = "reqwest-transport")]
impl ReqwestTransport {
pub fn new(client: reqwest::Client) -> Self {
Self { client }
}
}
#[cfg(feature = "reqwest-transport")]
impl Transport for ReqwestTransport {
fn send(
&self,
request: HttpRequest,
) -> Pin<Box<dyn Future<Output = Result<HttpResponse, LingerError>> + Send + '_>> {
Box::pin(async move {
let method = match request.method {
HttpMethod::Get => reqwest::Method::GET,
HttpMethod::Post => reqwest::Method::POST,
HttpMethod::Delete => reqwest::Method::DELETE,
};
let mut builder = self.client.request(method, request.url);
for (name, value) in request.headers.iter() {
builder = builder.header(name, value);
}
if let Some(body) = request.body {
builder = match body {
HttpRequestBody::Bytes(bytes) => builder.body(bytes),
HttpRequestBody::Stream(stream) => {
builder.body(reqwest::Body::wrap_stream(stream))
}
};
}
let response = builder
.send()
.await
.map_err(|error| LingerError::transport(error.to_string()))?;
let status = response.status().as_u16();
let headers =
HeaderMap::from_pairs(response.headers().iter().filter_map(|(name, value)| {
value.to_str().ok().map(|value| (name.as_str(), value))
}));
let stream = response
.bytes_stream()
.map(|chunk| chunk.map_err(|error| LingerError::transport(error.to_string())));
Ok(HttpResponse::from_stream(status, headers.iter(), stream))
})
}
}