mod generated;
#[macro_use]
extern crate wascc_codec as codec;
#[macro_use]
extern crate log;
extern crate actix_rt;
use crate::generated::core::{CapabilityConfiguration, HealthResponse};
use actix_web::dev::Body;
use actix_web::dev::Server;
use actix_web::http::{HeaderName, HeaderValue, StatusCode};
use actix_web::web::Bytes;
use actix_web::{middleware, web, App, HttpRequest, HttpResponse, HttpServer};
use codec::capabilities::{
CapabilityDescriptor, CapabilityProvider, Dispatcher, NullDispatcher, OperationDirection,
OP_GET_CAPABILITY_DESCRIPTOR,
};
use std::collections::HashMap;
use std::error::Error;
use std::sync::Arc;
use std::sync::RwLock;
use wascc_codec::core::{OP_BIND_ACTOR, OP_REMOVE_ACTOR, OP_HEALTH_REQUEST};
use wascc_codec::{deserialize, serialize};
const CAPABILITY_ID: &str = "wascc:http_server";
const VERSION: &str = env!("CARGO_PKG_VERSION");
const REVISION: u32 = 3;
const OP_HANDLE_REQUEST: &str = "HandleRequest";
#[cfg(not(feature = "static_plugin"))]
capability_provider!(HttpServerProvider, HttpServerProvider::new);
#[derive(Clone)]
pub struct HttpServerProvider {
dispatcher: Arc<RwLock<Box<dyn Dispatcher>>>,
servers: Arc<RwLock<HashMap<String, Server>>>,
}
impl HttpServerProvider {
pub fn new() -> Self {
Self::default()
}
fn terminate_server(&self, module: &str) {
{
let lock = self.servers.read().unwrap();
if !lock.contains_key(module) {
error!(
"Received request to stop server for non-configured actor {}. Igoring.",
module
);
return;
}
let server = lock.get(module).unwrap();
let _ = server.stop(true);
}
{
let mut lock = self.servers.write().unwrap();
lock.remove(module).unwrap();
}
}
fn spawn_server(&self, cfgvals: &CapabilityConfiguration) {
if self.servers.read().unwrap().contains_key(&cfgvals.module) {
return;
}
let bind_addr = match cfgvals.values.get("PORT") {
Some(v) => format!("0.0.0.0:{}", v),
None => "0.0.0.0:8080".to_string(),
};
let disp = self.dispatcher.clone();
let module_id = cfgvals.module.clone();
info!("Received HTTP Server configuration for {}", module_id);
let servers = self.servers.clone();
std::thread::spawn(move || {
let module = module_id.clone();
let sys = actix_rt::System::new(&module);
let server = HttpServer::new(move || {
App::new()
.wrap(middleware::Logger::default())
.data(disp.clone())
.data(module.clone())
.default_service(web::route().to(request_handler))
})
.bind(bind_addr)
.unwrap()
.disable_signals()
.run();
servers.write().unwrap().insert(module_id.clone(), server);
let _ = sys.run();
});
}
fn get_descriptor(&self) -> Result<Vec<u8>, Box<dyn Error + Sync + Send>> {
Ok(serialize(
CapabilityDescriptor::builder()
.id(CAPABILITY_ID)
.name("Default waSCC HTTP Server Provider (Actix)")
.long_description("A fast, multi-threaded HTTP server for waSCC actors")
.version(VERSION)
.revision(REVISION)
.with_operation(
OP_HANDLE_REQUEST,
OperationDirection::ToActor,
"Delivers an HTTP request to an actor and expects an HTTP response in return",
)
.build(),
)?)
}
}
impl Default for HttpServerProvider {
fn default() -> Self {
match env_logger::try_init() {
Ok(_) => {}
Err(_) => {}
};
HttpServerProvider {
dispatcher: Arc::new(RwLock::new(Box::new(NullDispatcher::new()))),
servers: Arc::new(RwLock::new(HashMap::new())),
}
}
}
impl CapabilityProvider for HttpServerProvider {
fn configure_dispatch(
&self,
dispatcher: Box<dyn Dispatcher>,
) -> Result<(), Box<dyn Error + Sync + Send>> {
info!("Dispatcher configured.");
let mut lock = self.dispatcher.write().unwrap();
*lock = dispatcher;
Ok(())
}
fn handle_call(
&self,
actor: &str,
op: &str,
msg: &[u8],
) -> Result<Vec<u8>, Box<dyn Error + Sync + Send>> {
trace!("Handling operation `{}` from `{}`", op, actor);
match op {
OP_BIND_ACTOR if actor == "system" => {
self.spawn_server(&deserialize(msg)?);
Ok(vec![])
}
OP_REMOVE_ACTOR if actor == "system" => {
let cfgvals = deserialize::<CapabilityConfiguration>(msg)?;
info!("Removing actor configuration for {}", cfgvals.module);
self.terminate_server(&cfgvals.module);
Ok(vec![])
}
OP_HEALTH_REQUEST if actor == "system" => {
Ok(serialize(HealthResponse{healthy: true, message: "".to_string()})?)
}
OP_GET_CAPABILITY_DESCRIPTOR if actor == "system" => self.get_descriptor(),
_ => Err("bad dispatch".into()),
}
}
fn stop(&self) {
let server_list: Vec<_> = {
let lock = self.servers.read().unwrap();
lock.keys().cloned().collect()
};
for svr in server_list {
self.terminate_server(&svr);
}
}
}
async fn request_handler(
req: HttpRequest,
payload: Bytes,
state: web::Data<Arc<RwLock<Box<dyn Dispatcher>>>>,
module: web::Data<String>,
) -> HttpResponse {
let request = crate::generated::http::Request {
method: req.method().as_str().to_string(),
path: req.uri().path().to_string(),
query_string: req.query_string().to_string(),
header: extract_headers(&req),
body: payload.to_vec(),
};
let buf = serialize(request).unwrap();
let resp = {
let lock = (*state).read().unwrap();
lock.dispatch(module.get_ref(), OP_HANDLE_REQUEST, &buf)
};
match resp {
Ok(r) => {
let r = deserialize::<crate::generated::http::Response>(r.as_slice());
if let Ok(r) = r {
let mut response = HttpResponse::with_body(
StatusCode::from_u16(r.status_code as _).unwrap(),
Body::from_slice(&r.body),
);
if !r.header.is_empty() {
let headers = response.head_mut();
r.header.iter().for_each(|(key, val)| {
headers.headers.insert(
HeaderName::from_bytes(key.as_bytes()).unwrap(),
HeaderValue::from_str(val).unwrap(),
)
});
}
response
} else {
HttpResponse::InternalServerError().body("Malformed response from actor")
}
}
Err(e) => {
error!("Guest failed to handle HTTP request: {}", e);
HttpResponse::with_body(
StatusCode::from_u16(500u16).unwrap(),
Body::from_slice(b"Failed to handle request"),
)
}
}
}
fn extract_headers(req: &HttpRequest) -> HashMap<String, String> {
let mut hm = HashMap::new();
for (hname, hval) in req.headers().iter() {
hm.insert(
hname.as_str().to_string(),
hval.to_str().unwrap().to_string(),
);
}
hm
}