use std::collections::HashMap;
use std::mem;
use std::net::SocketAddr;
use std::pin::Pin;
use async_std::io::{self, Write};
use async_std::task::Context;
use async_std::task::Poll;
use futures::future::{BoxFuture, Future};
use log::Level::Debug;
use super::constants;
use super::http;
use super::request::Request;
use super::response::{Response, ResponseBuilder};
use super::results::CabotResult;
#[derive(Default)]
pub struct Client {
verbose: bool,
ipv4: bool,
ipv6: bool,
authorities: HashMap<String, SocketAddr>,
read_timeout: u64,
connect_timeout: u64,
dns_timeout: u64,
request_timeout: u64,
max_redir: u8,
}
impl<'a> Client {
pub fn new() -> Self {
Client {
verbose: false,
ipv4: true,
ipv6: true,
authorities: HashMap::new(),
dns_timeout: constants::DNS_LOOKUP_TIMEOUT * 1000,
connect_timeout: constants::CONNECT_TIMEOUT * 1000,
read_timeout: constants::READ_TIMEOUT * 1000,
request_timeout: constants::REQUEST_TIMEOUT * 1000,
max_redir: constants::NUMBER_OF_REDIRECT,
}
}
pub fn set_ip_version(&mut self, ipv4: bool, ipv6: bool) {
self.ipv4 = ipv4;
self.ipv6 = ipv6;
}
pub fn add_authority(&mut self, authority: &str, sock_addr: &SocketAddr) {
self.authorities
.insert(authority.to_owned(), sock_addr.clone());
}
pub fn set_dns_timeout(&mut self, timeout: u64) {
self.dns_timeout = timeout * 1000;
}
pub fn set_connect_timeout(&mut self, timeout: u64) {
self.connect_timeout = timeout * 1000;
}
pub fn set_read_timeout(&mut self, timeout: u64) {
self.read_timeout = timeout * 1000;
}
pub fn set_request_timeout(&mut self, timeout: u64) {
self.request_timeout = timeout * 1000;
}
pub fn set_dns_timeout_ms(&mut self, timeout: u64) {
self.dns_timeout = timeout;
}
pub fn set_connect_timeout_ms(&mut self, timeout: u64) {
self.connect_timeout = timeout;
}
pub fn set_read_timeout_ms(&mut self, timeout: u64) {
self.read_timeout = timeout;
}
pub fn set_request_timeout_ms(&mut self, timeout: u64) {
self.request_timeout = timeout;
}
pub fn set_max_redir(&mut self, max_redir: u8) {
self.max_redir = max_redir;
}
pub async fn execute(&self, request: &Request) -> CabotResult<Response> {
self.execute_fut(request).await
}
pub fn execute_box(&'a self, request: &'a Request) -> BoxFuture<'a, CabotResult<Response>> {
let fut = Box::pin(self.execute_fut(request));
Box::pin(ResponseFuture { fut })
}
fn execute_fut(
&'a self,
request: &'a Request,
) -> impl Future<Output = CabotResult<Response>> + 'a {
async move {
let mut out = CabotLibWrite::new();
http::http_query(
request,
&mut out,
&self.authorities,
self.verbose,
self.ipv4,
self.ipv6,
self.dns_timeout,
self.connect_timeout,
self.read_timeout,
self.request_timeout,
self.max_redir,
)
.await?;
out.response()
}
}
}
pub struct ResponseFuture<'a> {
fut: Pin<Box<dyn Future<Output = CabotResult<Response>> + 'a>>,
}
unsafe impl<'a> Send for ResponseFuture<'a> {}
impl<'a> Future for ResponseFuture<'a> {
type Output = CabotResult<Response>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
Pin::new(&mut self.fut).poll(cx)
}
}
struct CabotLibWrite {
header_read: bool,
body_buffer: Vec<u8>,
response_builder: ResponseBuilder,
}
impl CabotLibWrite {
pub fn new() -> Self {
CabotLibWrite {
response_builder: ResponseBuilder::new(),
body_buffer: Vec::new(),
header_read: false,
}
}
fn split_headers(&mut self, buf: &[u8]) {
let mut builder = ResponseBuilder::new();
if let Some(pos) = buf.iter().position(|&x| x == b'\n') {
let (status_line, hdrs) = buf.split_at(pos);
let status_line = String::from_utf8_lossy(status_line);
builder = builder.set_status_line(status_line.trim_end());
let mut header = "".to_owned();
for hdr in hdrs.split(|&x| x == b'\n') {
let hdr = String::from_utf8_lossy(hdr);
if hdr.starts_with(' ') || hdr.starts_with('\t') {
debug!("Obsolete line folded header reveived in {}", header);
header.push_str(" ");
header.push_str(hdr.trim());
} else {
let clean_hdr = header.trim();
if clean_hdr.len() > 0 {
builder = builder.add_header(clean_hdr.trim());
header.clear();
}
header.push_str(hdr.trim());
}
}
let clean_hdr = header.trim();
if clean_hdr.len() > 0 {
builder = builder.add_header(clean_hdr.trim());
}
self.response_builder = builder;
}
}
pub fn response(&self) -> CabotResult<Response> {
self.response_builder.build()
}
}
impl Write for CabotLibWrite {
fn poll_write(self: Pin<&mut Self>, _cx: &mut Context, buf: &[u8]) -> Poll<io::Result<usize>> {
let self_ = Pin::get_mut(self);
if !self_.header_read {
self_.split_headers(&buf);
self_.header_read = true;
Poll::Ready(Ok(0))
} else {
self_.body_buffer.extend_from_slice(&buf);
Poll::Ready(Ok(buf.len()))
}
}
fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<io::Result<()>> {
let self_ = Pin::get_mut(self);
if log_enabled!(Debug) {
let body = String::from_utf8_lossy(self_.body_buffer.as_slice());
debug!("Adding body {:?}", body);
}
let builder = mem::replace(&mut self_.response_builder, ResponseBuilder::new());
self_.response_builder = builder.set_body(self_.body_buffer.as_slice());
Poll::Ready(Ok(()))
}
fn poll_close(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<io::Result<()>> {
Poll::Ready(Err(io::Error::new(io::ErrorKind::Other, "Not Implemented")))
}
}
#[cfg(test)]
mod tests {
use super::*;
use async_std;
use async_std::prelude::*;
#[async_std::test]
async fn test_build_http_response_from_string() -> std::io::Result<()> {
let response = [
vec![
"HTTP/1.1 200 Ok",
"Content-Type: text/plain",
"Content-Length: 12",
]
.join("\r\n"),
vec!["Hello World!"].join("\r\n"),
];
let mut out = CabotLibWrite::new();
out.write(response[0].as_bytes()).await.unwrap();
out.write(response[1].as_bytes()).await.unwrap();
out.flush().await.unwrap();
let response = out.response().unwrap();
assert_eq!(response.http_version(), "HTTP/1.1");
assert_eq!(response.status_code(), 200);
assert_eq!(response.status_line(), "200 Ok");
let headers: &[&str] = &["Content-Type: text/plain", "Content-Length: 12"];
assert_eq!(response.headers(), headers);
assert_eq!(
response.body_as_string().unwrap(),
"Hello World!".to_owned()
);
Ok(())
}
#[async_std::test]
async fn test_build_http_header_obsolete_line_folding() -> std::io::Result<()> {
let response = [
vec![
"HTTP/1.1 200 Ok",
"ows: https://tools.ietf.org/html/rfc7230",
" #section-3.2.4",
"Content-Length: 12",
]
.join("\r\n"),
vec!["Hello World!"].join("\r\n"),
];
let mut out = CabotLibWrite::new();
out.write(response[0].as_bytes()).await.unwrap();
out.write(response[1].as_bytes()).await.unwrap();
out.flush().await.unwrap();
let response = out.response().unwrap();
assert_eq!(response.http_version(), "HTTP/1.1");
assert_eq!(response.status_code(), 200);
assert_eq!(response.status_line(), "200 Ok");
let headers: &[&str] = &[
"ows: https://tools.ietf.org/html/rfc7230 #section-3.2.4",
"Content-Length: 12",
];
assert_eq!(response.headers(), headers);
assert_eq!(
response.body_as_string().unwrap(),
"Hello World!".to_owned()
);
Ok(())
}
#[async_std::test]
async fn test_build_http_header_obsolete_line_folding_tab() -> std::io::Result<()> {
let response = [
vec![
"HTTP/1.1 200 Ok",
"ows: https://tools.ietf.org/html/rfc7230",
"\t#section-3.2.4",
"Content-Length: 12",
]
.join("\r\n"),
vec!["Hello World!"].join("\r\n"),
];
let mut out = CabotLibWrite::new();
out.write(response[0].as_bytes()).await.unwrap();
out.write(response[1].as_bytes()).await.unwrap();
out.flush().await.unwrap();
let response = out.response().unwrap();
assert_eq!(response.http_version(), "HTTP/1.1");
assert_eq!(response.status_code(), 200);
assert_eq!(response.status_line(), "200 Ok");
let headers: &[&str] = &[
"ows: https://tools.ietf.org/html/rfc7230 #section-3.2.4",
"Content-Length: 12",
];
assert_eq!(response.headers(), headers);
assert_eq!(
response.body_as_string().unwrap(),
"Hello World!".to_owned()
);
Ok(())
}
#[async_std::test]
async fn test_build_http_no_response_body() -> std::io::Result<()> {
let response = vec![
"HTTP/1.1 302 Moved",
"Location: https://tools.ietf.org/html/rfc7230#section-3.3",
]
.join("\r\n");
let mut out = CabotLibWrite::new();
out.write(response.as_bytes()).await.unwrap();
out.flush().await.unwrap();
let response = out.response().unwrap();
assert_eq!(response.http_version(), "HTTP/1.1");
assert_eq!(response.status_code(), 302);
assert_eq!(response.status_line(), "302 Moved");
let headers: &[&str] = &["Location: https://tools.ietf.org/html/rfc7230#section-3.3"];
assert_eq!(response.headers(), headers);
assert_eq!(response.body_as_string().unwrap(), "");
Ok(())
}
}