use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
use std::net::SocketAddr;
use tiny_http::Method;
use common::prelude::*;
use common::config::HttpConfig;
use scripts::Repository;
use web::http::HttpServer;
use web::api::WebApi;
pub struct WebApp<A: ProcessorApiTrait<Repository> + 'static> {
server: HttpServer<WebApi<A>>,
addr: SocketAddr,
locked: Arc<AtomicBool>,
}
impl<A: ProcessorApiTrait<Repository>> WebApp<A> {
pub fn new(
hooks: Arc<Repository>,
config: &HttpConfig,
processor: A,
) -> Result<Self> {
let locked = Arc::new(AtomicBool::new(false));
let api = WebApi::new(
processor, hooks, locked.clone(), &config.rate_limit,
config.health_endpoint,
);
let mut server = HttpServer::new(api, config.behind_proxies);
server.add_route(Method::Get, "/health", Box::new(WebApi::get_health));
server.add_route(
Method::Get,
"/hook/?",
Box::new(WebApi::process_hook),
);
server.add_route(
Method::Post,
"/hook/?",
Box::new(WebApi::process_hook),
);
let socket = server.listen(config.bind)?;
Ok(WebApp {
server: server,
addr: socket,
locked: locked,
})
}
pub fn addr(&self) -> &SocketAddr {
&self.addr
}
pub fn lock(&self) {
self.locked.store(true, Ordering::SeqCst);
}
pub fn unlock(&self) {
self.locked.store(false, Ordering::SeqCst);
}
pub fn stop(mut self) {
self.server.stop();
}
}
#[cfg(test)]
mod tests {
use std::io::Read;
use serde_json;
use hyper::status::StatusCode;
use hyper::method::Method;
use hyper::header::Headers;
use common::prelude::*;
use utils::testing::*;
#[test]
fn test_startup() {
let testing_env = TestingEnv::new();
let mut inst = testing_env.start_web(true, 0);
let res = inst.request(Method::Get, "/").send().unwrap();
assert_eq!(res.status, StatusCode::NotFound);
inst.stop();
testing_env.cleanup();
}
#[test]
fn test_hook_call() {
let testing_env = TestingEnv::new();
let mut inst = testing_env.start_web(true, 0);
let res = inst.request(Method::Get, "/hook/invalid.sh")
.send()
.unwrap();
assert_eq!(res.status, StatusCode::NotFound);
assert!(inst.processor_input().is_none());
let res = inst.request(Method::Get, "/hook/example.sh?secret=invalid")
.send()
.unwrap();
assert_eq!(res.status, StatusCode::Forbidden);
assert!(inst.processor_input().is_none());
let res = inst.request(Method::Get, "/hook/example.sh?secret=testing")
.send()
.unwrap();
assert_eq!(res.status, StatusCode::Ok);
let input = inst.processor_input();
if let ProcessorApiCall::Queue(job, _) = input.unwrap() {
assert_eq!(job.script_name(), "example.sh");
} else {
panic!("Wrong processor input received");
}
let res =
inst.request(Method::Get, "/hook/example.sh?request_type=ping")
.send()
.unwrap();
assert_eq!(res.status, StatusCode::Ok);
assert!(inst.processor_input().is_none());
let res = inst.request(
Method::Get,
concat!(
"/hook/status-example.sh",
"?event=job_completed",
"&hook_name=trigger-status",
"&exit_code=0",
"&signal=0",
),
).send()
.unwrap();
assert_eq!(res.status, StatusCode::Forbidden);
assert!(inst.processor_input().is_none());
let res = inst.request(Method::Get, "/hook/sub/hook.sh")
.send()
.unwrap();
assert_eq!(res.status, StatusCode::Ok);
assert!(inst.processor_input().is_some());
inst.lock();
let res = inst.request(Method::Get, "/hook/example.sh?secret=testing")
.send()
.unwrap();
assert_eq!(res.status, StatusCode::ServiceUnavailable);
assert!(inst.processor_input().is_none());
inst.unlock();
let res = inst.request(Method::Get, "/hook/example.sh?secret=testing")
.send()
.unwrap();
assert_eq!(res.status, StatusCode::Ok);
assert!(inst.processor_input().is_some());
inst.stop();
testing_env.cleanup();
}
#[test]
fn test_health_disabled() {
let testing_env = TestingEnv::new();
let mut inst = testing_env.start_web(false, 0);
let res = inst.request(Method::Get, "/health").send().unwrap();
assert_eq!(res.status, StatusCode::Forbidden);
inst.stop();
testing_env.cleanup();
}
#[test]
fn test_health_enabled() {
let testing_env = TestingEnv::new();
let mut inst = testing_env.start_web(true, 0);
let mut res = inst.request(Method::Get, "/health").send().unwrap();
assert_eq!(res.status, StatusCode::Ok);
let mut content = String::new();
res.read_to_string(&mut content).unwrap();
let data = serde_json::from_str::<serde_json::Value>(&content).unwrap();
let data_obj = data.as_object().unwrap();
let result = data_obj.get("result").unwrap().as_object().unwrap();
assert_eq!(
result.get("queued_jobs").unwrap().as_u64().unwrap(),
1 as u64
);
assert_eq!(
result.get("busy_threads").unwrap().as_u64().unwrap(),
2 as u64
);
assert_eq!(
result.get("max_threads").unwrap().as_u64().unwrap(),
3 as u64
);
inst.stop();
testing_env.cleanup();
}
#[test]
fn test_behind_proxy() {
let testing_env = TestingEnv::new();
let mut inst = testing_env.start_web(true, 1);
let res = inst.request(Method::Get, "/hook/example.sh?ip=127.1.1.1")
.send()
.unwrap();
assert_eq!(res.status, StatusCode::BadRequest);
assert!(inst.processor_input().is_none());
let mut headers = Headers::new();
headers.set_raw("X-Forwarded-For", vec![b"127.1.1.1".to_vec()]);
let res = inst.request(Method::Get, "/hook/example.sh?ip=127.1.1.1")
.headers(headers)
.send()
.unwrap();
assert_eq!(res.status, StatusCode::Ok);
assert!(inst.processor_input().is_some());
inst.stop();
testing_env.cleanup();
}
}