use std::{
any::Any,
convert::TryFrom,
io::{Read, Write},
str::FromStr,
};
use git_features::io::pipe;
use crate::client::{
http,
http::{reqwest::Remote, traits::PostBodyDataKind},
};
#[derive(Debug, thiserror::Error)]
#[allow(missing_docs)]
pub enum Error {
#[error(transparent)]
Reqwest(#[from] reqwest::Error),
#[error("Could not finish reading all data to post to the remote")]
ReadPostBody(#[from] std::io::Error),
#[error("Request configuration failed")]
ConfigureRequest(#[from] Box<dyn std::error::Error + Send + Sync + 'static>),
}
impl crate::IsSpuriousError for Error {
fn is_spurious(&self) -> bool {
match self {
Error::Reqwest(err) => {
err.is_timeout() || err.is_connect() || err.status().map_or(false, |status| status.is_server_error())
}
_ => false,
}
}
}
impl Default for Remote {
fn default() -> Self {
let (req_send, req_recv) = std::sync::mpsc::sync_channel(0);
let (res_send, res_recv) = std::sync::mpsc::sync_channel(0);
let handle = std::thread::spawn(move || -> Result<(), Error> {
let client = reqwest::blocking::ClientBuilder::new()
.connect_timeout(std::time::Duration::from_secs(20))
.http1_title_case_headers()
.build()?;
for Request {
url,
headers,
upload_body_kind,
config,
} in req_recv
{
let mut req_builder = if upload_body_kind.is_some() {
client.post(url)
} else {
client.get(url)
}
.headers(headers);
let (post_body_tx, mut post_body_rx) = pipe::unidirectional(0);
let (mut response_body_tx, response_body_rx) = pipe::unidirectional(0);
let (mut headers_tx, headers_rx) = pipe::unidirectional(0);
if res_send
.send(Response {
headers: headers_rx,
body: response_body_rx,
upload_body: post_body_tx,
})
.is_err()
{
break;
}
req_builder = match upload_body_kind {
Some(PostBodyDataKind::BoundedAndFitsIntoMemory) => {
let mut buf = Vec::<u8>::with_capacity(512);
post_body_rx.read_to_end(&mut buf)?;
req_builder.body(buf)
}
Some(PostBodyDataKind::Unbounded) => req_builder.body(reqwest::blocking::Body::new(post_body_rx)),
None => req_builder,
};
let mut req = req_builder.build()?;
if let Some(ref mut request_options) = config.backend.as_ref().and_then(|backend| backend.lock().ok()) {
if let Some(options) = request_options.downcast_mut::<super::Options>() {
if let Some(configure_request) = &mut options.configure_request {
configure_request(&mut req)?;
}
}
}
let mut res = match client.execute(req).and_then(|res| res.error_for_status()) {
Ok(res) => res,
Err(err) => {
let (kind, err) = match err.status() {
Some(status) => {
let kind = if status == reqwest::StatusCode::UNAUTHORIZED {
std::io::ErrorKind::PermissionDenied
} else if status.is_server_error() {
std::io::ErrorKind::ConnectionAborted
} else {
std::io::ErrorKind::Other
};
(kind, format!("Received HTTP status {}", status.as_str()))
}
None => (std::io::ErrorKind::Other, err.to_string()),
};
let err = Err(std::io::Error::new(kind, err));
headers_tx.channel.send(err).ok();
continue;
}
};
let send_headers = {
let headers = res.headers();
move || -> std::io::Result<()> {
for (name, value) in headers {
headers_tx.write_all(name.as_str().as_bytes())?;
headers_tx.write_all(b":")?;
headers_tx.write_all(value.as_bytes())?;
headers_tx.write_all(b"\n")?;
}
drop(headers_tx);
Ok(())
}
};
send_headers().ok();
if let Err(err) = std::io::copy(&mut res, &mut response_body_tx) {
response_body_tx.channel.send(Err(err)).ok();
}
}
Ok(())
});
Remote {
handle: Some(handle),
request: req_send,
response: res_recv,
config: http::Options::default(),
}
}
}
impl Remote {
fn make_request(
&mut self,
url: &str,
_base_url: &str,
headers: impl IntoIterator<Item = impl AsRef<str>>,
upload_body_kind: Option<PostBodyDataKind>,
) -> Result<http::PostResponse<pipe::Reader, pipe::Reader, pipe::Writer>, http::Error> {
let mut header_map = reqwest::header::HeaderMap::new();
for header_line in headers {
let header_line = header_line.as_ref();
let colon_pos = header_line
.find(':')
.expect("header line must contain a colon to separate key and value");
let header_name = &header_line[..colon_pos];
let value = &header_line[colon_pos + 1..];
match reqwest::header::HeaderName::from_str(header_name)
.ok()
.zip(reqwest::header::HeaderValue::try_from(value.trim()).ok())
{
Some((key, val)) => header_map.insert(key, val),
None => continue,
};
}
self.request
.send(Request {
url: url.to_owned(),
headers: header_map,
upload_body_kind,
config: self.config.clone(),
})
.expect("the remote cannot be down at this point");
let Response {
headers,
body,
upload_body,
} = match self.response.recv() {
Ok(res) => res,
Err(_) => {
let err = self
.handle
.take()
.expect("always present")
.join()
.expect("no panic")
.expect_err("no receiver means thread is down with init error");
*self = Self::default();
return Err(http::Error::InitHttpClient { source: Box::new(err) });
}
};
Ok(http::PostResponse {
post_body: upload_body,
headers,
body,
})
}
}
impl http::Http for Remote {
type Headers = pipe::Reader;
type ResponseBody = pipe::Reader;
type PostBody = pipe::Writer;
fn get(
&mut self,
url: &str,
base_url: &str,
headers: impl IntoIterator<Item = impl AsRef<str>>,
) -> Result<http::GetResponse<Self::Headers, Self::ResponseBody>, http::Error> {
self.make_request(url, base_url, headers, None).map(Into::into)
}
fn post(
&mut self,
url: &str,
base_url: &str,
headers: impl IntoIterator<Item = impl AsRef<str>>,
post_body_kind: PostBodyDataKind,
) -> Result<http::PostResponse<Self::Headers, Self::ResponseBody, Self::PostBody>, http::Error> {
self.make_request(url, base_url, headers, Some(post_body_kind))
}
fn configure(&mut self, config: &dyn Any) -> Result<(), Box<dyn std::error::Error + Send + Sync + 'static>> {
if let Some(config) = config.downcast_ref::<http::Options>() {
self.config = config.clone();
}
Ok(())
}
}
pub(crate) struct Request {
pub url: String,
pub headers: reqwest::header::HeaderMap,
pub upload_body_kind: Option<PostBodyDataKind>,
pub config: http::Options,
}
pub(crate) struct Response {
pub headers: pipe::Reader,
pub body: pipe::Reader,
pub upload_body: pipe::Writer,
}