use crate::error::Error as EmblyError;
use crate::http_proto::httpproto::{HeaderList, Http};
use crate::proto;
use crate::task;
use crate::Conn;
use failure::Error;
use http;
use http::header::{HeaderName, HeaderValue};
use http::response::Parts;
use http::status::StatusCode;
use http::HttpTryFrom;
pub use http::Request;
pub use http::Response;
use httparse;
use std::future::Future;
use std::io;
use std::io::Read;
use std::io::Write;
use std::sync::Arc;
use std::sync::Mutex;
#[derive(Debug, Default)]
pub struct Body {
conn: Conn,
content_length: Option<usize>,
read_count: usize,
read_buf: Vec<u8>,
}
struct Interior {
body: Body,
parts: Parts,
write_buf: Vec<u8>,
}
impl Body {
pub fn bytes(&mut self) -> Result<Vec<u8>, Error> {
let mut out: Vec<u8> = self.read_buf.drain(..).collect();
if self.content_length.is_none() || self.content_length.unwrap() == 0 {
return Ok(out);
}
self.read_count = out.len();
if self.read_count == self.content_length.unwrap() {
Ok(out)
} else {
while self.read_count < self.content_length.unwrap() {
let mut http = proto::next_message(&mut self.conn)?;
self.read_count += http.body.len();
out.append(&mut http.body);
}
Ok(out)
}
}
}
impl Interior {
fn header<K, V>(&mut self, key: K, value: V) -> Result<(), Error>
where
HeaderName: HttpTryFrom<K>,
HeaderValue: HttpTryFrom<V>,
{
match HeaderName::try_from(key) {
Ok(key) => match HeaderValue::try_from(value) {
Ok(value) => {
self.parts.headers.insert(key, value);
Ok(())
}
Err(e) => Err(EmblyError::Http(e.into()).into()),
},
Err(e) => Err(EmblyError::Http(e.into()).into()),
}
}
}
pub struct ResponseWriter {
interior: Arc<Mutex<Interior>>,
headers_written: bool,
function_returned: bool,
}
impl Clone for ResponseWriter {
fn clone(&self) -> Self {
Self {
headers_written: self.headers_written,
function_returned: self.function_returned,
interior: self.interior.clone(),
}
}
}
impl ResponseWriter {
fn new(body: Body) -> Self {
let (p, _) = Response::new(()).into_parts();
Self {
headers_written: false,
function_returned: false,
interior: Arc::new(Mutex::new(Interior {
body: body,
parts: p,
write_buf: Vec::new(),
})),
}
}
pub fn header<K, V>(&mut self, key: K, value: V) -> Result<(), Error>
where
HeaderName: HttpTryFrom<K>,
HeaderValue: HttpTryFrom<V>,
{
self.interior.lock().unwrap().header(key, value)
}
pub fn status<T>(&mut self, status: T) -> Result<(), Error>
where
StatusCode: HttpTryFrom<T>,
{
match StatusCode::try_from(status) {
Ok(s) => {
self.interior.lock().unwrap().parts.status = s;
Ok(())
}
Err(e) => Err(EmblyError::Http(e.into()).into()),
}
}
}
impl io::Write for ResponseWriter {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
self.interior.lock().unwrap().write_buf.write(buf)
}
fn flush(&mut self) -> io::Result<()> {
Ok(())
}
}
pub trait Flusher {
fn flush_response(&mut self) -> Result<(), Error>;
}
impl Flusher for ResponseWriter {
fn flush_response(&mut self) -> Result<(), Error> {
let mut http_msg = Http::default();
let mut interior = self.interior.lock().unwrap();
if !self.headers_written {
http_msg.status = interior.parts.status.as_u16() as i32;
for (name, values) in interior.parts.headers.drain() {
let mut list = HeaderList::default();
for value in values {
list.header.push(value.to_str()?.to_string());
}
http_msg.headers.insert(name.as_str().to_string(), list);
}
self.headers_written = true;
}
if self.function_returned {
http_msg.eof = true;
}
http_msg.body = interior.write_buf.drain(..).collect();
proto::write_msg(&mut interior.body.conn, http_msg)?;
Ok(())
}
}
fn build_request_from_comm(c: &mut Conn) -> Result<Request<Body>, Error> {
c.wait()?;
let id = c.id;
let http = proto::next_message(c)?;
let mut request = http_proto_to_request(http);
let body = request.body_mut();
body.conn.id = id;
Ok(request)
}
const MAX_HEADERS: usize = 100;
fn http_proto_to_request(http: Http) -> Request<Body> {
let mut request = Request::builder();
request.uri(http.uri);
let mut body = Body::default();
request.method(format!("{:?}", http.method).as_str());
for (h, values) in http.headers {
if h == "Content-Length" {
let cl: usize = values.header[0]
.parse()
.expect("content length should be an int");
body.content_length = Some(cl);
}
for v in values.header {
request.header(&h, v);
}
}
body.read_buf = http.body;
request.body(body).expect("should be able to create a body")
}
#[allow(dead_code)]
fn reader_to_response<R: Read>(mut c: R) -> Result<Response<Body>, Error> {
let mut headers: Vec<httparse::Header> = vec![httparse::EMPTY_HEADER; MAX_HEADERS];
let mut buf: Vec<u8> = Vec::new();
let mut res = httparse::Response::new(&mut headers);
c.read_to_end(&mut buf)?;
let result = res.parse(&buf)?;
if result.is_partial() {
return Err(EmblyError::InvalidHttpRequest.into());
}
let mut response = Response::builder();
if let Some(code) = res.code {
response.status(code);
}
for h in &headers {
if h.name.is_empty() && h.value.is_empty() {
break;
}
response.header(h.name, h.value);
}
let mut body = Body::default();
body.read_buf = buf[result.unwrap()..].to_vec();
Ok(response.body(body)?)
}
pub fn run<F>(to_run: fn(Request<Body>, ResponseWriter) -> F)
where
F: Future<Output = ()> + 'static,
{
let mut c = Conn::new(1);
let r = build_request_from_comm(&mut c).expect("http request should be valid");
let mut body = Body::default();
body.conn = c.clone();
let mut resp = ResponseWriter::new(body);
task::Task::spawn(Box::pin(to_run(r, resp.clone())));
resp.function_returned = true;
resp.flush_response().expect("should be able to flush");
}
pub fn run_catch_error<F>(to_run: fn(Request<Body>, ResponseWriter) -> F)
where
F: Future<Output = Result<(), Error>> + 'static,
{
let mut c = Conn::new(1);
let r = build_request_from_comm(&mut c).expect("http request should be valid");
let mut body = Body::default();
body.conn = c.clone();
let mut resp = ResponseWriter::new(body);
let user_resp = resp.clone();
let mut error_resp = resp.clone();
task::Task::spawn(Box::pin(async move {
match to_run(r, user_resp).await {
Ok(_) => {}
Err(err) => {
println!("got error: {}", err);
error_resp.status(500).unwrap();
error_resp.write(&format!("{}", err).as_bytes()).unwrap();
}
}
}));
resp.function_returned = true;
resp.flush_response().expect("should be able to flush");
}
#[cfg(test)]
mod tests {
use super::*;
}