use hyper;
use std::collections::HashMap;
use std::env;
use std::io;
use std::io::{Read, Write};
use std::ops;
use std::str::FromStr;
use std::sync::mpsc;
use codecs::{InputOutputCodec, DefaultCodec, HttpCodec};
use coercions::{InputCoercible, OutputCoercible};
use context::RuntimeContext;
use errors::FunctionError;
use hyper_utils::{clone_response, exit_code_from_response, write_request_full};
fn stateless(_: &RuntimeContext) -> Result<(), FunctionError> {
Ok(())
}
pub const STATELESS: fn(&RuntimeContext) -> Result<(), FunctionError> = stateless;
pub struct Function<S: Sized> {
initializer: fn(&RuntimeContext) -> Result<S, FunctionError>,
}
impl<S: Sized> Function<S> {
pub fn new(func: fn(&RuntimeContext) -> Result<S, FunctionError>) -> Function<S> {
Function { initializer: func }
}
pub fn run<T, U>(self, func: fn(&mut S, T) -> Result<U, FunctionError>) -> i32
where
T: InputCoercible,
U: OutputCoercible,
{
self.run_impl(
func,
env::vars(),
Box::new(io::stdin()),
&mut io::stdout(),
&mut io::stderr(),
None,
)
}
fn run_impl<I, T, U>(
self,
func: fn(&mut S, T) -> Result<U, FunctionError>,
environment: I,
input: Box<Read + Send>,
output: &mut Write,
error_log: &mut Write,
responses_hook: Option<mpsc::Sender<hyper::Response>>, ) -> i32
where
I: Iterator<Item = (String, String)>,
T: InputCoercible,
U: OutputCoercible,
{
let env = environment.fold(HashMap::new(), |mut e, kv| {
e.insert(kv.0, kv.1);
e
});
let rc = RuntimeContext::with_environment(&env);
let mut codec: Box<InputOutputCodec<Item = Result<hyper::Request, FunctionError>>> =
match env.get("FN_FORMAT") {
Some(format) => {
match format.as_ref() {
"" | "default" => Box::new(DefaultCodec::new(input, &env)),
"http" => Box::new(HttpCodec::new(Box::new(input))),
_ => {
error_log
.write(&format!("Unrecognized function format '{}'\n", format)
.as_bytes())
.unwrap();
return 2;
}
}
}
None => Box::new(DefaultCodec::new(input, &env)),
};
let initializer = self.initializer;
let mut state = match initializer(&rc) {
Ok(s) => s,
Err(e) => {
let resp = match responses_hook {
Some(ref hook) => {
let (r1, r2) = clone_response(e.into());
hook.send(r2).unwrap();
r1
}
None => e.into(),
};
match codec.try_write(resp, output) {
Ok(_) => (),
Err(e) => {
error_log.write(&format!("{}\n", e).into_bytes()).unwrap();
}
}
return 2;
}
};
let mut last_status = 0;
while let Some(maybe_evt) = codec.next() {
let mut resp = match maybe_evt {
Ok(req) => {
match T::try_decode(req) {
Ok(i) => {
let maybe_res = func(&mut state, i);
match maybe_res {
Ok(res) => {
match U::try_encode(res) {
Ok(o) => o,
Err(e) => e.into(),
}
}
Err(e) => e.into(),
}
}
Err(e) => e.into(),
}
}
Err(e) => e.into(),
};
last_status = exit_code_from_response(&resp);
resp = match responses_hook {
Some(ref hook) => {
let (r1, r2) = clone_response(resp);
hook.send(r2).unwrap();
r1
}
None => resp,
};
match codec.try_write(resp, output) {
Ok(_) => (),
Err(e) => {
error_log.write(&format!("{}\n", e).into_bytes()).unwrap();
last_status = 2;
}
}
if last_status > 1 {
break;
}
}
last_status
}
}
pub struct FunctionTestbench<S: Sized> {
initializer: fn(&RuntimeContext) -> Result<S, FunctionError>,
environment: HashMap<String, String>,
requests: Vec<hyper::Request>,
responses: Vec<hyper::Response>,
test_out: Vec<u8>,
test_err: Vec<u8>,
}
impl<S: Sized> FunctionTestbench<S> {
pub fn new(code: fn(&RuntimeContext) -> Result<S, FunctionError>) -> FunctionTestbench<S> {
FunctionTestbench {
initializer: code,
environment: HashMap::new(),
requests: Vec::new(),
responses: Vec::new(),
test_out: Vec::new(),
test_err: Vec::new(),
}
}
pub fn set_config(&mut self, key: &str, value: &str) {
self.environment.insert(key.to_string(), value.to_string());
}
pub fn with_config(mut self, key: &str, value: &str) -> Self {
self.set_config(key, value);
self
}
pub fn enqueue(&mut self, req: hyper::Request) -> &mut Self {
self.requests.push(req);
self
}
pub fn enqueue_simple(&mut self, body: &str) -> &mut Self {
let mut req = hyper::Request::new(hyper::Method::Get, hyper::Uri::from_str("/").unwrap());
req.headers_mut().set(hyper::header::ContentLength(
body.as_bytes().len() as u64,
));
req.set_body(body.to_string());
self.requests.push(req);
self
}
pub fn run<T, U>(&mut self, code: fn(&mut S, T) -> Result<U, FunctionError>) -> i32
where
T: InputCoercible,
U: OutputCoercible,
{
self.responses.clear();
self.test_out.clear();
self.test_err.clear();
let mut mock_in = io::Cursor::new(Vec::new());
for r in self.requests.drain(ops::RangeFull) {
write_request_full(r, &mut mock_in).unwrap();
}
mock_in.set_position(0);
let mut mock_out = io::Cursor::new(Vec::new());
let mut mock_err = io::Cursor::new(Vec::new());
let mut env = self.environment.clone();
env.insert("FN_FORMAT".to_string(), "http".to_string());
let function_under_test = Function::new(self.initializer);
let (responses_tx, responses_rx) = mpsc::channel();
let exit_code = function_under_test.run_impl(
code,
env.drain(),
Box::new(mock_in),
&mut mock_out,
&mut mock_err,
Some(responses_tx),
);
loop {
match responses_rx.try_recv() {
Ok(r) => self.responses.push(r),
Err(_) => {
break;
}
}
}
mock_out.set_position(0);
mock_err.set_position(0);
self.test_out = mock_out.into_inner();
self.test_err = mock_err.into_inner();
exit_code
}
pub fn responses(&self) -> &Vec<hyper::Response> {
&self.responses
}
pub fn drain_responses(&mut self) -> Vec<hyper::Response> {
self.responses.drain(ops::RangeFull).collect()
}
pub fn output(&self) -> &Vec<u8> {
&self.test_out
}
pub fn errlog(&self) -> &Vec<u8> {
&self.test_err
}
}