use bytes::Bytes;
use futures::{Stream, TryStreamExt as _};
use http::HeaderMap;
use pin_project_lite::pin_project;
use reqwest::{Client, Method, RequestBuilder};
use std::error::Error as _;
use std::io::{self, Error};
use std::ops::Not as _;
use std::pin::Pin;
use std::sync::LazyLock;
use std::task::{Context, Poll};
use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
use tokio::sync::mpsc;
use tokio_util::io::StreamReader;
use crate::{EtagResolvable, HashReaderDetector, HashReaderMut};
fn get_http_client() -> Client {
static CLIENT: LazyLock<Client> = LazyLock::new(Client::new);
CLIENT.clone()
}
static HTTP_DEBUG_LOG: bool = false;
#[inline(always)]
fn http_debug_log(args: std::fmt::Arguments) {
if HTTP_DEBUG_LOG {
println!("{args}");
}
}
macro_rules! http_log {
($($arg:tt)*) => {
http_debug_log(format_args!($($arg)*));
};
}
pin_project! {
pub struct HttpReader {
url:String,
method: Method,
headers: HeaderMap,
#[pin]
inner: StreamReader<Pin<Box<dyn Stream<Item=std::io::Result<Bytes>>+Send+Sync>>, Bytes>,
}
}
impl HttpReader {
pub async fn new(url: String, method: Method, headers: HeaderMap, body: Option<Vec<u8>>) -> io::Result<Self> {
Self::with_capacity(url, method, headers, body, 0).await
}
pub async fn with_capacity(
url: String,
method: Method,
headers: HeaderMap,
body: Option<Vec<u8>>,
_read_buf_size: usize,
) -> io::Result<Self> {
let client = get_http_client();
let head_resp = client.head(&url).headers(headers.clone()).send().await;
match head_resp {
Ok(resp) => {
http_log!("[HttpReader::new] HEAD status: {}", resp.status());
if !resp.status().is_success() {
return Err(Error::other(format!("HEAD failed: url: {}, status {}", url, resp.status())));
}
}
Err(e) => {
http_log!("[HttpReader::new] HEAD error: {e}");
return Err(Error::other(e.source().map(|s| s.to_string()).unwrap_or_else(|| e.to_string())));
}
}
let client = get_http_client();
let mut request: RequestBuilder = client.request(method.clone(), url.clone()).headers(headers.clone());
if let Some(body) = body {
request = request.body(body);
}
let resp = request
.send()
.await
.map_err(|e| Error::other(format!("HttpReader HTTP request error: {e}")))?;
if resp.status().is_success().not() {
return Err(Error::other(format!(
"HttpReader HTTP request failed with non-200 status {}",
resp.status()
)));
}
let stream = resp
.bytes_stream()
.map_err(|e| Error::other(format!("HttpReader stream error: {e}")));
Ok(Self {
inner: StreamReader::new(Box::pin(stream)),
url,
method,
headers,
})
}
pub fn url(&self) -> &str {
&self.url
}
pub fn method(&self) -> &Method {
&self.method
}
pub fn headers(&self) -> &HeaderMap {
&self.headers
}
}
impl AsyncRead for HttpReader {
fn poll_read(mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut ReadBuf<'_>) -> Poll<std::io::Result<()>> {
Pin::new(&mut self.inner).poll_read(cx, buf)
}
}
impl EtagResolvable for HttpReader {
fn is_etag_reader(&self) -> bool {
false
}
fn try_resolve_etag(&mut self) -> Option<String> {
None
}
}
impl HashReaderDetector for HttpReader {
fn is_hash_reader(&self) -> bool {
false
}
fn as_hash_reader_mut(&mut self) -> Option<&mut dyn HashReaderMut> {
None
}
}
struct ReceiverStream {
receiver: mpsc::Receiver<Option<Bytes>>,
}
impl Stream for ReceiverStream {
type Item = Result<Bytes, std::io::Error>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let poll = Pin::new(&mut self.receiver).poll_recv(cx);
match poll {
Poll::Ready(Some(Some(bytes))) => Poll::Ready(Some(Ok(bytes))),
Poll::Ready(Some(None)) => Poll::Ready(None), Poll::Ready(None) => Poll::Ready(None),
Poll::Pending => Poll::Pending,
}
}
}
pin_project! {
pub struct HttpWriter {
url:String,
method: Method,
headers: HeaderMap,
err_rx: tokio::sync::oneshot::Receiver<std::io::Error>,
sender: tokio::sync::mpsc::Sender<Option<Bytes>>,
handle: tokio::task::JoinHandle<std::io::Result<()>>,
finish:bool,
}
}
impl HttpWriter {
pub async fn new(url: String, method: Method, headers: HeaderMap) -> io::Result<Self> {
let url_clone = url.clone();
let method_clone = method.clone();
let headers_clone = headers.clone();
let client = get_http_client();
let resp = client.put(&url).headers(headers.clone()).body(Vec::new()).send().await;
match resp {
Ok(resp) => {
if !resp.status().is_success() {
return Err(Error::other(format!("Empty PUT failed: status {}", resp.status())));
}
}
Err(e) => {
return Err(Error::other(format!("Empty PUT failed: {e}")));
}
}
let (sender, receiver) = tokio::sync::mpsc::channel::<Option<Bytes>>(8);
let (err_tx, err_rx) = tokio::sync::oneshot::channel::<io::Error>();
let handle = tokio::spawn(async move {
let stream = ReceiverStream { receiver };
let body = reqwest::Body::wrap_stream(stream);
let client = get_http_client();
let request = client
.request(method_clone, url_clone.clone())
.headers(headers_clone.clone())
.body(body);
let response = request.send().await;
match response {
Ok(resp) => {
if !resp.status().is_success() {
let _ = err_tx.send(Error::other(format!(
"HttpWriter HTTP request failed with non-200 status {}",
resp.status()
)));
return Err(Error::other(format!("HTTP request failed with non-200 status {}", resp.status())));
}
}
Err(e) => {
let _ = err_tx.send(Error::other(format!("HTTP request failed: {e}")));
return Err(Error::other(format!("HTTP request failed: {e}")));
}
}
Ok(())
});
Ok(Self {
url,
method,
headers,
err_rx,
sender,
handle,
finish: false,
})
}
pub fn url(&self) -> &str {
&self.url
}
pub fn method(&self) -> &Method {
&self.method
}
pub fn headers(&self) -> &HeaderMap {
&self.headers
}
}
impl AsyncWrite for HttpWriter {
fn poll_write(mut self: Pin<&mut Self>, _cx: &mut Context<'_>, buf: &[u8]) -> Poll<io::Result<usize>> {
if let Ok(e) = Pin::new(&mut self.err_rx).try_recv() {
return Poll::Ready(Err(e));
}
self.sender
.try_send(Some(Bytes::copy_from_slice(buf)))
.map_err(|e| Error::other(format!("HttpWriter send error: {e}")))?;
Poll::Ready(Ok(buf.len()))
}
fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
Poll::Ready(Ok(()))
}
fn poll_shutdown(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
if !self.finish {
self.sender
.try_send(None)
.map_err(|e| Error::other(format!("HttpWriter shutdown error: {e}")))?;
self.finish = true;
}
use futures::FutureExt;
match Pin::new(&mut self.get_mut().handle).poll_unpin(_cx) {
Poll::Ready(Ok(_)) => {
}
Poll::Ready(Err(e)) => {
return Poll::Ready(Err(Error::other(format!("HTTP request failed: {e}"))));
}
Poll::Pending => {
return Poll::Pending;
}
}
Poll::Ready(Ok(()))
}
}