use futures;
use hyper;
use uuid;
use std::borrow::Cow;
use std::collections::HashMap;
use std::io::{Read, Write, BufReader};
use std::net;
use std::str::FromStr;
use std::sync::mpsc;
use std::thread;
use errors::FunctionError;
use hyper_utils::{write_response_body, write_response_full};
pub trait InputOutputCodec
: Iterator<Item = Result<hyper::Request, FunctionError>> {
fn try_write(&mut self, resp: hyper::Response, writer: &mut Write)
-> Result<(), FunctionError>;
}
pub struct DefaultCodec<'a> {
input: Box<Read>,
environment: &'a HashMap<String, String>,
read: bool,
}
impl<'a> DefaultCodec<'a> {
pub fn new(input: Box<Read>, environment: &'a HashMap<String, String>) -> DefaultCodec<'a> {
DefaultCodec {
input: input,
environment: environment,
read: false,
}
}
}
impl<'a> Iterator for DefaultCodec<'a> {
type Item = Result<hyper::Request, FunctionError>;
fn next(&mut self) -> Option<Result<hyper::Request, FunctionError>> {
match self.read {
true => None,
false => {
self.read = true;
let mut body = Vec::new();
match self.input.read_to_end(&mut body) {
Ok(_) => {
let method = match self.environment.get("FN_METHOD") {
Some(s) => {
match hyper::Method::from_str(s) {
Ok(m) => m,
Err(_) => {
return Some(Err(FunctionError::other(
"Fatal: FN_METHOD set to an invalid HTTP method.",
)))
}
}
}
None => {
return Some(Err(FunctionError::other("Fatal: FN_METHOD not set.")))
}
};
let uri = match self.environment.get("FN_REQUEST_URL") {
Some(s) => {
match hyper::Uri::from_str(s) {
Ok(u) => u,
Err(_) => {
return Some(Err(FunctionError::other(
"Fatal: FN_REQUEST_URL set to an invalid URL.",
)))
}
}
}
None => {
return Some(
Err(FunctionError::other("Fatal: FN_REQUEST_URL not set.")),
)
}
};
let version = hyper::HttpVersion::Http11;
let mut req = hyper::Request::new(method, uri);
req.set_version(version);
const HEADER_PREFIX: &'static str = "fn_header_";
self.environment
.iter()
.filter(|kv| kv.0.to_lowercase().starts_with(HEADER_PREFIX))
.fold(HashMap::new(), |mut hs, kv| {
let k: String = kv.0.clone()
.split_off(HEADER_PREFIX.len())
.replace("_", "-");
hs.insert(k, kv.1.clone());
hs
})
.iter()
.fold(req.headers_mut(), |hs, kv| {
hs.append_raw(
Cow::Owned(String::from(kv.0.as_str())),
kv.1.as_str(),
);
hs
});
req.set_body(hyper::Body::from(body));
Some(Ok(req))
}
Err(e) => Some(Err(FunctionError::io(e))),
}
}
}
}
}
impl<'a> InputOutputCodec for DefaultCodec<'a> {
fn try_write(
&mut self,
resp: hyper::Response,
writer: &mut Write,
) -> Result<(), FunctionError> {
write_response_body(resp, writer)
}
}
pub struct HttpCodec {
event_rx: mpsc::Receiver<Option<Result<hyper::Request, FunctionError>>>,
}
impl HttpCodec {
pub fn new(input: Box<Read + Send>) -> HttpCodec {
let (event_tx, event_rx) = mpsc::channel();
let event_tx_clone = event_tx.clone();
let shutdown_key_uuid = uuid::Uuid::new_v4();
let shutdown_value_uuid = uuid::Uuid::new_v4();
let codec = HttpCodec { event_rx: event_rx };
let mut loopback_addr = "127.0.0.1:0".parse().unwrap();
let (ready_tx, ready_rx) = mpsc::channel();
thread::spawn(move || {
let server = hyper::server::Http::new()
.bind(&loopback_addr, move || {
Ok(ChannelPoster {
event_tx: event_tx.clone(),
shutdown_key_uuid: shutdown_key_uuid,
shutdown_value_uuid: shutdown_value_uuid,
})
})
.unwrap();
ready_tx.send(server.local_addr().unwrap()).unwrap();
let _ = server.run();
});
loopback_addr = ready_rx.recv().unwrap();
let stream = match net::TcpStream::connect(loopback_addr) {
Ok(s) => s,
Err(e) => {
event_tx_clone
.send(Some(Err(FunctionError::io(e))))
.unwrap();
return codec;
}
};
let mut stream_for_push = match stream.try_clone() {
Ok(s) => s,
Err(e) => {
event_tx_clone
.send(Some(Err(FunctionError::io(e))))
.unwrap();
return codec;
}
};
let stream_for_pull = match stream.try_clone() {
Ok(s) => s,
Err(e) => {
event_tx_clone
.send(Some(Err(FunctionError::io(e))))
.unwrap();
return codec;
}
};
thread::spawn(move || {
let bufinput = BufReader::new(input);
bufinput.bytes().fold((), |_, maybe| {
match maybe {
Ok(b) => {
match stream_for_push.write(&[b]) {
Ok(_) => (),
Err(e) => {
event_tx_clone
.send(Some(Err(FunctionError::io(e))))
.unwrap();
}
}
}
Err(e) => {
event_tx_clone
.send(Some(Err(FunctionError::io(e))))
.unwrap();
}
};
});
match stream_for_push.write(
format!(
"HEAD * HTTP/1.1\r\n{}: {}\r\n\r\n",
shutdown_key_uuid.hyphenated().to_string(),
shutdown_value_uuid.hyphenated().to_string()
).as_bytes(),
) {
Ok(_) => (),
Err(e) => {
event_tx_clone
.send(Some(Err(FunctionError::io(e))))
.unwrap();
}
}
stream_for_push.flush().unwrap();
});
thread::spawn(move || { stream_for_pull.bytes().count(); });
codec
}
}
impl Iterator for HttpCodec {
type Item = Result<hyper::Request, FunctionError>;
fn next(&mut self) -> Option<Result<hyper::Request, FunctionError>> {
match self.event_rx.recv() {
Ok(maybe_ie) => maybe_ie,
Err(e) => Some(Err(FunctionError::io(e))),
}
}
}
impl InputOutputCodec for HttpCodec {
fn try_write(
&mut self,
resp: hyper::Response,
writer: &mut Write,
) -> Result<(), FunctionError> {
write_response_full(resp, writer)
}
}
struct ChannelPoster {
event_tx: mpsc::Sender<Option<Result<hyper::Request, FunctionError>>>,
shutdown_key_uuid: uuid::Uuid,
shutdown_value_uuid: uuid::Uuid,
}
impl hyper::server::Service for ChannelPoster {
type Request = hyper::Request;
type Response = hyper::Response;
type Error = hyper::Error;
type Future = Box<futures::Future<Item = Self::Response, Error = Self::Error>>;
fn call(&self, req: hyper::Request) -> Self::Future {
let local_tx = self.event_tx.clone();
let is_shutdown = match req.headers().get_raw(&self.shutdown_key_uuid
.hyphenated()
.to_string()) {
Some(v) => {
match v.one() {
Some(vv) => vv == self.shutdown_value_uuid.hyphenated().to_string().as_bytes(),
None => false,
}
}
None => false,
};
if is_shutdown {
let _ = local_tx.send(None);
} else {
let _ = local_tx.send(Some(Ok(req)));
}
Box::new(futures::future::ok(hyper::Response::new().with_body("OK")))
}
}