use std::{error::Error as StdError, future::Future, pin::Pin};
use http;
use hyper;
#[cfg(feature = "bundle_files")]
use includedir;
use hyper::{
header::ACCEPT,
{Method, StatusCode},
};
type MyBody = http_body_util::combinators::BoxBody<bytes::Bytes, hyper::Error>;
use tokio::sync::mpsc;
use tokio_stream::StreamExt;
use parking_lot::Mutex;
use std::sync::Arc;
use crate::access_control;
use bui_backend_types::{AccessToken, CallbackDataAndSession, ConnectionKey, SessionKey};
#[cfg(feature = "serve_files")]
use std::io::Read;
use serde::{Deserialize, Serialize};
const JSON_TYPE: &str = "application/json";
const JSON_NULL: &[u8] = b"{}";
#[derive(Serialize, Deserialize, Debug, Clone)]
struct JwtClaims {
key: SessionKey,
exp: usize, }
#[derive(Clone)]
pub struct Config {
pub serve_filepath: &'static std::path::Path,
#[cfg(feature = "bundle_files")]
#[cfg_attr(docsrs, doc(cfg(feature = "bundle_files")))]
pub bundled_files: &'static includedir::Files,
pub channel_size: usize,
pub cookie_name: String,
}
pub type EventChunkSender = mpsc::Sender<hyper::body::Bytes>;
#[derive(Debug)]
pub struct NewEventStreamConnection {
pub chunk_sender: EventChunkSender,
pub session_key: SessionKey,
pub connection_key: ConnectionKey,
pub path: String,
}
type NewConnectionSender = mpsc::Sender<NewEventStreamConnection>;
pub type RawReqHandler = Arc<
Box<
dyn (Fn(
http::response::Builder,
http::Request<hyper::body::Incoming>,
) -> Result<http::Response<MyBody>, http::Error>)
+ Send
+ Sync,
>,
>;
pub trait CallbackHandler: Send + dyn_clone::DynClone {
type Data;
fn call<'a>(
&'a self,
data_sess: CallbackDataAndSession<Self::Data>,
) -> Pin<Box<dyn Future<Output = Result<(), Box<dyn StdError + Send>>> + Send + 'a>>;
}
dyn_clone::clone_trait_object!(<CB> CallbackHandler<Data = CB>);
#[derive(Clone)]
pub struct NoopCallbackHandler {}
impl CallbackHandler for NoopCallbackHandler {
type Data = ();
fn call<'a>(
&'a self,
_: CallbackDataAndSession<Self::Data>,
) -> Pin<Box<dyn Future<Output = Result<(), Box<dyn StdError + Send>>> + Send + 'a>> {
Box::pin(async { Ok(()) })
}
}
#[derive(Clone)]
pub struct ErrorCallbackHandler {}
impl CallbackHandler for ErrorCallbackHandler {
type Data = ();
fn call<'a>(
&'a self,
_: CallbackDataAndSession<Self::Data>,
) -> Pin<Box<dyn Future<Output = Result<(), Box<dyn StdError + Send>>> + Send + 'a>> {
let e = Box::new(ErrorCallbackError {});
let e = e as Box<dyn StdError + Send>;
Box::pin(async { Err(e) })
}
}
#[derive(Clone)]
pub struct BuiService<CB> {
config: Config,
callback_handler: Box<dyn Send + CallbackHandler<Data = CB>>,
next_connection_key: Arc<Mutex<ConnectionKey>>,
jwt_secret: Vec<u8>,
encoding_key: jsonwebtoken::EncodingKey,
valid_token: AccessToken,
tx_new_connection: NewConnectionSender,
events_prefix: String,
raw_req_handler: Option<RawReqHandler>,
}
impl<CB> BuiService<CB> {
fn fullpath(&self, path: &str) -> String {
assert!(path.starts_with('/')); let path = std::path::PathBuf::from(path)
.strip_prefix("/")
.unwrap()
.to_path_buf();
assert!(!path.starts_with(".."));
let base = std::path::PathBuf::from(self.config.serve_filepath);
let result = base.join(path);
result.into_os_string().into_string().unwrap()
}
#[cfg(feature = "bundle_files")]
fn get_file_content(&self, file_path: &str) -> Option<Vec<u8>> {
let fullpath = self.fullpath(file_path);
let r = self.config.bundled_files.get(&fullpath);
match r {
Ok(s) => Some(s.into_owned()),
Err(_) => None,
}
}
#[cfg(feature = "serve_files")]
fn get_file_content(&self, file_path: &str) -> Option<Vec<u8>> {
let fullpath = self.fullpath(file_path);
let mut file = match std::fs::File::open(&fullpath) {
Ok(f) => f,
Err(e) => {
warn!("requested path {:?}, but got error {:?}", file_path, e);
return None;
}
};
let mut contents = Vec::new();
match file.read_to_end(&mut contents) {
Ok(_) => {}
Err(e) => {
warn!("when reading path {:?}, got error {:?}", file_path, e);
return None;
}
}
Some(contents)
}
pub fn events_prefix(&self) -> &str {
&self.events_prefix
}
fn get_next_connection_key(&self) -> ConnectionKey {
let mut nk = self.next_connection_key.lock();
let result = *nk;
nk.0 += 1;
result
}
fn do_set_cookie_x(
&self,
resp: http::response::Builder,
) -> (http::response::Builder, SessionKey) {
let session_key = SessionKey::new();
let claims = JwtClaims {
key: session_key,
exp: 10000000000,
};
let token = {
jsonwebtoken::encode(
&jsonwebtoken::Header::default(),
&claims,
&self.encoding_key,
)
.unwrap()
};
let mut c = cookie::Cookie::new(self.config.cookie_name.clone(), token);
c.set_same_site(cookie::SameSite::Strict);
c.set_http_only(true);
let resp = resp.header(
hyper::header::SET_COOKIE,
hyper::header::HeaderValue::from_str(&c.to_string()).unwrap(),
);
(resp, session_key)
}
}
fn body_from_buf(body_buf: &[u8]) -> MyBody {
let body = http_body_util::Full::new(bytes::Bytes::from(body_buf.to_vec()));
use http_body_util::BodyExt;
MyBody::new(body.map_err(|_: std::convert::Infallible| unreachable!()))
}
async fn handle_req<CB>(
self_: BuiService<CB>,
req: http::Request<hyper::body::Incoming>,
mut resp: http::response::Builder,
login_info: ValidLogin,
raw_req_handler: Option<RawReqHandler>,
) -> Result<http::Response<MyBody>, http::Error> {
let session_key = match login_info {
ValidLogin::NeedsSessionKey => {
let (resp2, session_key) = self_.do_set_cookie_x(resp);
resp = resp2;
session_key
}
ValidLogin::ExistingSession(k) => k,
};
let resp_final = match (req.method(), req.uri().path()) {
(&Method::GET, path) => {
let path = if path == "/" { "/index.html" } else { path };
if path.starts_with(&self_.events_prefix) {
let mut accepts_event_stream = false;
for value in req.headers().get_all(ACCEPT).iter() {
if value
.to_str()
.expect("to_str()")
.contains("text/event-stream")
{
accepts_event_stream = true;
}
}
if accepts_event_stream {
let connection_key = self_.get_next_connection_key();
let (tx_event_stream, rx_event_stream) =
mpsc::channel(self_.config.channel_size);
let rx_event_stream =
tokio_stream::wrappers::ReceiverStream::new(rx_event_stream);
{
let conn_info = NewEventStreamConnection {
chunk_sender: tx_event_stream,
session_key,
connection_key,
path: path.to_string(),
};
let send_future = self_.tx_new_connection.send(conn_info);
match send_future.await {
Ok(()) => {}
Err(e) => {
error!("failed to send new connection info: {:?}", e);
}
};
}
resp = resp.header(
hyper::header::CONTENT_TYPE,
hyper::header::HeaderValue::from_str("text/event-stream")
.expect("from_str"),
);
let rx_event_stream2 = rx_event_stream.map(|data: bytes::Bytes| {
Ok::<_, hyper::Error>(hyper::body::Frame::data(data))
});
resp.body(MyBody::new(http_body_util::StreamBody::new(
rx_event_stream2,
)))?
} else {
let estr = "Event request does not specify \
'Accept' or does not accept the required \
'text/event-stream'"
.to_string();
warn!("{}", estr);
let e = ErrorsBackToBrowser { errors: vec![estr] };
let body_buf = serde_json::to_vec(&e).unwrap();
resp = resp.status(StatusCode::BAD_REQUEST);
resp.body(body_from_buf(&body_buf))?
}
} else {
match self_.get_file_content(path) {
Some(buf) => {
let path = std::path::Path::new(path);
let mime_type = match path.extension().map(|x| x.to_str()).unwrap_or(None) {
Some(ext) => conduit_mime_types::get_mime_type(ext),
None => None,
};
if let Some(mime_type) = mime_type {
resp = resp.header(
hyper::header::CONTENT_TYPE,
hyper::header::HeaderValue::from_str(mime_type).expect("from_str"),
);
}
resp.body(body_from_buf(&buf))?
}
None => {
if let Some(raw_req_handler) = raw_req_handler {
raw_req_handler(resp, req)?
} else {
resp = resp.status(StatusCode::NOT_FOUND);
resp.body(body_from_buf(&[]))?
}
}
}
}
}
_ => {
if let Some(raw_req_handler) = raw_req_handler {
raw_req_handler(resp, req)?
} else {
resp = resp.status(StatusCode::NOT_FOUND);
resp.body(body_from_buf(&[]))?
}
}
};
Ok(resp_final)
}
fn handle_callback<CB>(
handler: Box<dyn CallbackHandler<Data = CB> + Send>,
session_key: bui_backend_types::SessionKey,
resp0: http::response::Builder,
req: http::Request<hyper::body::Incoming>,
) -> Pin<Box<dyn Future<Output = Result<http::Response<MyBody>, hyper::Error>> + Send>>
where
CB: 'static + serde::de::DeserializeOwned + Send,
{
let result = async move {
let body = req.into_body();
let chunks: Result<http_body_util::Collected<bytes::Bytes>, hyper::Error> = {
use http_body_util::BodyExt;
body.collect().await
};
let data = chunks?.to_bytes();
match serde_json::from_slice::<CB>(&data) {
Ok(payload) => {
let args2 = CallbackDataAndSession {
payload,
session_key,
};
let x = {
let fut = handler.call(args2);
fut.await
};
let r0 = match x {
Ok(()) => resp0
.header(hyper::header::CONTENT_TYPE, JSON_TYPE)
.body(body_from_buf(JSON_NULL))
.expect("response"),
Err(e) => {
error!("internal server error: {:?}", e);
resp0
.header(hyper::header::CONTENT_TYPE, JSON_TYPE)
.status(StatusCode::INTERNAL_SERVER_ERROR)
.body(body_from_buf(JSON_NULL))
.expect("response")
}
};
Ok(r0)
}
Err(e) => Ok(on_json_parse_err(e)),
}
};
Box::pin(result)
}
fn on_json_parse_err(e: serde_json::Error) -> http::Response<MyBody> {
let estr = format!("Failed parsing JSON: {}", e);
warn!("{}", estr);
let e = ErrorsBackToBrowser { errors: vec![estr] };
let body_buf = serde_json::to_vec(&e).unwrap();
http::Response::builder()
.header(hyper::header::CONTENT_TYPE, JSON_TYPE)
.status(StatusCode::BAD_REQUEST)
.body(body_from_buf(&body_buf))
.expect("response")
}
#[derive(Serialize, Deserialize, Debug, Clone)]
struct ErrorsBackToBrowser {
errors: Vec<String>,
}
#[derive(Debug)]
struct ErrorCallbackError {}
impl StdError for ErrorCallbackError {}
impl std::fmt::Display for ErrorCallbackError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> Result<(), std::fmt::Error> {
write!(f, "ErrorCallbackError")
}
}
#[derive(Debug)]
enum ValidLogin {
ExistingSession(SessionKey),
NeedsSessionKey,
}
fn get_session_key<'a>(
map: &hyper::HeaderMap<hyper::header::HeaderValue>,
query_pairs: url::form_urlencoded::Parse,
cookie_name: &str,
decoding_key: &jsonwebtoken::DecodingKey,
valid_token: &AccessToken,
) -> Result<ValidLogin, ErrorsBackToBrowser> {
use std::borrow::Cow;
let mut errors = Vec::new();
for (key, value) in query_pairs {
debug!("got query pair {}, {}", key, value);
if key == Cow::Borrowed("token") {
if valid_token.does_match(&value) {
return Ok(ValidLogin::NeedsSessionKey);
} else {
warn!("incorrect token in URI: {}", value);
errors.push("incorrect token in URI".to_string());
}
}
}
for cookie in map.get_all(hyper::header::COOKIE).iter() {
match cookie.to_str() {
Ok(cookie_str) => {
let res_c = cookie::Cookie::parse(cookie_str.to_string());
match res_c {
Ok(c) => {
if c.name() == cookie_name {
let encoded = c.value();
debug!("jwt_encoded = {}", encoded);
let validation = jsonwebtoken::Validation::new(Default::default());
match jsonwebtoken::decode::<JwtClaims>(
encoded,
decoding_key,
&validation,
)
.map(|token| token.claims.key)
{
Ok(k) => return Ok(ValidLogin::ExistingSession(k)),
Err(e) => {
warn!("client passed token in cookie {:?}, resulting in error: {:?}", c, e);
let estr = format!("{}: {:?}", e, e);
errors.push(estr);
}
}
}
}
Err(e) => {
let estr = format!("cookie not parsed: {:?}", e);
warn!("{}", estr);
errors.push(estr);
}
}
}
Err(e) => {
let estr = format!("cookie not converted to str: {:?}", e);
warn!("{}", estr);
errors.push(estr);
}
}
}
debug!("no (valid) session key found");
match valid_token {
AccessToken::NoToken => {
debug!("no token needed, will give new session key");
Ok(ValidLogin::NeedsSessionKey)
}
_ => {
errors.push("no valid session key".to_string());
Err(ErrorsBackToBrowser { errors })
}
}
}
impl<CB> hyper::service::Service<hyper::Request<hyper::body::Incoming>> for BuiService<CB>
where
CB: 'static + serde::de::DeserializeOwned + Clone + Send,
{
type Response = hyper::Response<MyBody>;
type Error = hyper::Error;
type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;
fn call(&self, req: http::Request<hyper::body::Incoming>) -> Self::Future {
let decoding_key = jsonwebtoken::DecodingKey::from_secret(&self.jwt_secret);
let res_session_key = {
let query = req.uri().query();
debug!("parsing query {:?}", query);
let pairs = url::form_urlencoded::parse(query.unwrap_or("").as_bytes());
get_session_key(
req.headers(),
pairs,
&self.config.cookie_name,
&decoding_key,
&self.valid_token,
)
};
debug!(
"got request from session key {:?}: {:?}",
res_session_key, req
);
if req.method() == Method::POST && req.uri().path() == "/callback" {
let login_info = match res_session_key {
Ok(login_info) => login_info,
Err(errors) => {
warn!("no (valid) session key in callback");
let body_buf = serde_json::to_vec(&errors).unwrap();
let resp = http::Response::builder()
.header(hyper::header::CONTENT_TYPE, JSON_TYPE)
.status(StatusCode::BAD_REQUEST)
.body(body_from_buf(&body_buf))
.expect("response");
return Box::pin(std::future::ready(Ok(resp)));
}
};
let mut resp0 = http::Response::builder();
let session_key = match login_info {
ValidLogin::NeedsSessionKey => {
let (resp2, session_key) = self.do_set_cookie_x(resp0);
resp0 = resp2;
session_key
}
ValidLogin::ExistingSession(k) => k,
};
return Box::pin(handle_callback(
self.callback_handler.clone(),
session_key,
resp0,
req,
));
}
let resp = http::Response::builder();
let login_info = match res_session_key {
Ok(login_info) => login_info,
Err(_errors) => {
let estr = "No (valid) token in request.".to_string();
let errors = ErrorsBackToBrowser { errors: vec![estr] };
let body_buf = serde_json::to_vec(&errors).unwrap();
let resp = http::Response::builder()
.header(hyper::header::CONTENT_TYPE, JSON_TYPE)
.status(StatusCode::BAD_REQUEST)
.body(body_from_buf(&body_buf))
.expect("response");
return Box::pin(std::future::ready(Ok(resp)));
}
};
use futures::future::FutureExt;
let resp_final = handle_req(
self.clone(),
req,
resp,
login_info,
self.raw_req_handler.clone(),
)
.map(|r| match r {
Ok(x) => Ok(x),
Err(_e) => unimplemented!(),
});
Box::pin(resp_final)
}
}
pub fn launcher<CB>(
config: Config,
auth: &access_control::AccessControl,
channel_size: usize,
events_prefix: &str,
raw_req_handler: Option<RawReqHandler>,
callback_handler: Box<dyn Send + CallbackHandler<Data = CB>>,
) -> (mpsc::Receiver<NewEventStreamConnection>, BuiService<CB>) {
let next_connection_key = Arc::new(Mutex::new(ConnectionKey(0)));
let (tx_new_connection, rx_new_connection) = mpsc::channel(channel_size);
let service = BuiService {
config,
callback_handler,
next_connection_key,
jwt_secret: auth.jwt_secret().to_vec(),
encoding_key: jsonwebtoken::EncodingKey::from_secret(auth.jwt_secret()),
valid_token: auth.token(),
tx_new_connection,
events_prefix: events_prefix.to_string(),
raw_req_handler,
};
(rx_new_connection, service)
}