mod transport;
mod rustls;
use crate::http::transport::TlsTransport;
use std::result::Result;
use std::io::{Error, ErrorKind};
use std::str::FromStr;
use url::Url;
use crate::*;
use crate::config::HttpConfig;
use crate::http::transport::{Transport, TcpTransport};
use std::io::*;
use log::*;
const HTTP_SCHEMA: &str = "http";
const HTTPS_SCHEMA: &str = "https";
const SET_COOKIE: &str = "set-cookie";
const WWW_AUTHORIZE: &str = "www-authorize";
const PROXY_AUTHORIZE: &str = "proxy-authorize";
const CONTENT_LENGTH: &str = "Content-Length";
const HTTP_1_1: &str = "HTTP/1.1";
const HTTP_2_0: &str = "HTTP/2.0";
#[derive(Copy, Clone, PartialEq, Eq, Debug, Hash)]
pub enum HttpVersion {Version1_0, Version1_1, Version2_0, Version3_0}
pub(crate) fn parse_url(url: &str) -> Result<Url, Error> {
let url = Url::parse(url).or_else(
|u| Err(Error::new(ErrorKind::InvalidInput, u))
)?;
if !url.has_host() {
return Err(Error::new(ErrorKind::InvalidInput, "URL has no host"));
}
return match url.scheme().to_ascii_lowercase().as_str() {
"http" | "https" => Ok(url),
_ => Err(Error::new(ErrorKind::InvalidInput, "Invalid URL scheme: it should be http or https"))
}
}
#[derive(Copy, Clone, PartialEq, Eq, Debug, Hash)]
pub(crate) enum HttpScheme {HTTP, HTTPS}
impl FromStr for HttpScheme {
type Err = Error;
fn from_str(s: &str) -> Result<HttpScheme, Error> {
let lower_case = s.to_ascii_lowercase();
match lower_case.as_str() {
HTTP_SCHEMA => Ok(HttpScheme::HTTP),
HTTPS_SCHEMA => Ok(HttpScheme::HTTPS),
_ => Err(Error::new(ErrorKind::InvalidInput, "Schema is not HTTP"))
}
}
}
#[derive(Clone, PartialEq, Eq, Debug, Hash)]
pub (crate) struct EndPoint {
pub scheme: HttpScheme,
pub host: String,
pub port: u16,
pub default: bool
}
impl EndPoint {
pub fn from_url(url: &Url) -> Result<EndPoint, Error> {
let scheme = HttpScheme::from_str(url.scheme())?;
let default_port: u16 = if scheme == HttpScheme::HTTP { 80 } else { 443 };
let port = url.port().unwrap_or(default_port);
let host = String::from(url.host_str().ok_or(
Error::new(ErrorKind::AddrNotAvailable, "Cannot get host from url"))?);
Ok(EndPoint { scheme, host, port, default: port == default_port })
}
}
pub(crate) struct ClientConnection {
end_point : EndPoint,
proxy: Option<EndPoint>,
config: HttpConfig,
transport: Box<dyn Transport>
}
lazy_static! {
static ref EMTY_CHALLENGE: Vec<&'static str> = {
Vec::new()
};
}
impl ClientConnection {
fn new(end_point: EndPoint,
proxy: Option<EndPoint>,
config: HttpConfig,
transport: Box<dyn Transport>) -> ClientConnection {
ClientConnection {
end_point,
proxy,
config,
transport
}
}
pub(crate) fn is_open(&self) -> bool {
self.transport.is_open()
}
pub(crate) fn send(&mut self, request: &mut Request) -> Result<Response, Error> {
if let Some(ref auth) = request.auth {
let mut mutex = auth.lock().unwrap();
if mutex.support_scheme("basic") {
if let Ok(headers) =mutex.authorization(request, &EMTY_CHALLENGE) {
let request_headers = &mut request.headers;
request_headers.extend(headers);
}
}
}
self.write_request(request)?;
let mut response = self.receive(request)?;
if response.status_code == HTTP_401_UNAUTHORIZED && response.auth.len() > 0 && request.auth.is_some() {
let mut mutex = request.auth.as_ref().unwrap().lock().unwrap();
let challenge = response.auth.iter().map(|s| {s.as_str()}).collect();
if let Ok(headers) = mutex.authorization(request, &challenge) {
let request_headers = &mut request.headers;
request_headers.extend(headers);
}
self.write_request(request)?;
response = self.receive(request)?;
}
return Ok(response)
}
fn write_request(&mut self, request: &Request) -> Result<(), Error> {
if ! self.transport.is_open() {
return Err(Error::new(ErrorKind::NotConnected, "Connection is not open"));
}
let mut buffer = BufWriter::new(Vec::new());
Self::write_http11(&mut buffer, request)?;
let bytes = buffer.into_inner().unwrap();
if let Err(error) = self.transport.write(bytes.as_slice()) {
self.transport.close();
return Err(error);
}
if let Err(error) = self.transport.flush() {
self.transport.close();
return Err(error);
}
if log_enabled!(Level::Debug) {
let string = String::from_utf8(bytes).unwrap();
debug!("{}", string);
}
Ok(())
}
fn close(&mut self) {
self.transport.close();
}
fn write_http11(writer: &mut impl Write, request: &Request) -> Result<(), Error> {
let endpoint = request.endpoint()?;
write!(writer, "{} {}",
request.method,
request.path())?;
if !request.params.is_empty() {
write!(writer, "?")?;
for (key, value) in &request.params {
write!(writer, "{}={}", key, value)?;
}
}
write!(writer, " HTTP/1.1\r\n")?;
if endpoint.default {
write!(writer, "Host: {}\r\n", &endpoint.host)?;
} else {
write!(writer, "Host: {}:{}\r\n", &endpoint.host, endpoint.port)?;
}
if request.cookies.len() > 0 {
write!(writer, "Cookie: ")?;
let mut first = true;
for (key, value) in &request.cookies {
if first {
first = false;
write!(writer, "{}={}", key, value)?;
} else {
write!(writer, "; {}={}", key, value)?;
}
}
write!(writer, "\r\n")?;
}
for (key, value) in &request.headers {
write!(writer, "{}: {}\r\n", key, value)?;
}
if request.factory.is_some() && !request.headers.contains_key("Connection") {
write!(writer, "Connection: close\r\n")?;
}
if request.has_body() {
let body = &request.body;
write!(writer, "Content-Length: {}\r\n\r\n", body.len())?; writer.write(body)?;
} else {
write!(writer, "\r\n")?; }
return Ok(())
}
pub fn receive(&mut self, request: &Request) -> Result<Response, Error> {
let host = &request.endpoint()?.host;
let path = request.path();
debug!("Receiving Response");
let mut line = String::new();
let mut buffer = BufReader::new(self.transport.as_mut());
buffer.read_line(&mut line)?;
if line.len() == 0 {
self.transport.close();
error!("HTTPConnection::receive Connection is closed");
return Err(Error::new(ErrorKind::BrokenPipe, "Connection closed"));
}
debug!("{}", line);
let (_, status) = Self::parse_http11_response_status(&line)?;
let mut response = Response::new(status);
loop {
line.clear();
match buffer.read_line(&mut line) {
Ok(n) => {
debug!("{}", line);
if n == 0 {
return Ok(response); }
let trimmed = line.trim();
if trimmed.is_empty() { break;
}
let (key, value) = Self::parse_http11_header(trimmed)?;
let key_lc = key.to_lowercase();
match key_lc.as_str() {
SET_COOKIE => {
let cookie = Cookie::parse(&value, host, path)?;
response.cookies.push(cookie);
},
WWW_AUTHORIZE => response.auth.push(value),
PROXY_AUTHORIZE => response.proxy_auth.push(value),
_ => {
response.headers.insert(key, value);
}
}
}
Err(e) => {
self.transport.close();
return Err(e)
}
}
}
if response.headers.contains_key(CONTENT_LENGTH) {
debug!("Reading body");
let length = usize::from_str(response.headers.get(CONTENT_LENGTH).unwrap())
.or_else(|_| Err(Error::new(ErrorKind::InvalidData,
"Content-Length value is not a number")))?;
debug!("Reading body of {} bytes", length);
let mut data: Vec<u8> = vec![0;length];
buffer.read_exact(data.as_mut_slice())?;
response.body = data;
debug!("Body length is {} bytes", response.body.len());
} else { let mut data: Vec<u8> = Vec::new();
match buffer.read_to_end(&mut data) {
Ok(n) => {
if n == 0 {
response.body = data;
}
},
Err(e) => return Err(e)
}
}
if let Some(ref cookie_jar) = request.jar {
let mut jar = cookie_jar.as_ref().lock().unwrap();
for cookie in &response.cookies {
jar.cookie(cookie.clone(), host);
}
}
if response.headers.contains_key("Connection") &&
response.headers.get("Connection").unwrap() == "close" {
self.transport.close();
}
return Ok(response);
}
fn parse_http11_header(line: &str) -> Result<(String, String), Error> {
if let Some(index) = line.find(':') {
let key = line[0..index].to_string();
let value = String::from(line[index+1..].trim());
return Ok((key, value))
} else {
Err(Error::new(ErrorKind::InvalidData, format!("Malformed HTTP header: {}", line)))
}
}
fn parse_http11_response_status(line: &str) -> Result<(HttpVersion, HttpStatusCode), Error> {
let mut iter = line.split_whitespace();
return if let Some(version_str) = iter.next() {
let version = Self::parse_http_version(version_str)?;
if let Some(status_str) = iter.next() {
let status = Self::parse_http_status(status_str)?;
Ok((version, status))
} else {
Err(Error::new(ErrorKind::InvalidData, "Error parsing HTTP status"))
}
} else {
Err(Error::new(ErrorKind::InvalidData, "Error parsing HTTP version"))
}
}
fn parse_http_version(version: &str) -> Result<HttpVersion, Error> {
match version {
HTTP_1_1 => Ok(HttpVersion::Version1_1),
HTTP_2_0 => Ok(HttpVersion::Version2_0),
_ => Err(Error::new(ErrorKind::InvalidData, format!("Not supported HTTP version {}", version)))
}
}
fn parse_http_status(status: &str) -> Result<u16, Error> {
match status.parse::<u16>() {
Ok(value) => Ok(value),
Err(_) => Err(Error::new(ErrorKind::InvalidData,
format!("Unknown HTTP status code {}", status)))
}
}
}
impl Drop for ClientConnection {
fn drop(&mut self) {
if self.transport.is_open() { self.close()
}
}
}
pub(crate) struct ClientConnectionFactory {
connections: HashMap<EndPoint, Arc<Mutex<ClientConnection>>>
}
impl ClientConnectionFactory {
pub(crate) fn new () -> ClientConnectionFactory{
ClientConnectionFactory {
connections: HashMap::new()
}
}
fn make_transport(end_point: &EndPoint,
config: &HttpConfig) -> Result<Box<dyn Transport>, Error> {
if end_point.scheme == HttpScheme::HTTP {
Ok(Box::new(TcpTransport::open(&end_point.host, end_point.port, config)?))
} else {
Ok(Box::new(TlsTransport::open(&end_point.host, end_point.port, config)?))
}
}
pub(crate) fn get_connection(&mut self,
endpoint: &EndPoint,
config: &HttpConfig) -> Result<Arc<Mutex<ClientConnection>>, Error>
{
if self.connections.contains_key(endpoint) {
let connection = Arc::clone(self.connections.get(endpoint).as_ref().unwrap());
if connection.lock().unwrap().is_open() {
return Ok(connection);
}
}
let transport = Self::make_transport(endpoint, config)?;
let fresh_connection = Arc::new(Mutex::new(ClientConnection::new(endpoint.clone(), None, config.clone(),transport)));
self.connections.insert(endpoint.clone(), fresh_connection.clone());
return Ok(fresh_connection);
}
pub(crate) fn client_connection(endpoint: &EndPoint,
config: &HttpConfig) -> Result<Arc<Mutex<ClientConnection>>, Error>
{
let transport = Self::make_transport(endpoint, config)?;
Ok(Arc::new(Mutex::new(ClientConnection::new(endpoint.clone(), None, config.clone(),transport))))
}
}