1use std::{error::Error as StdError, future::Future, pin::Pin};
4
5use http;
6use hyper;
7#[cfg(feature = "bundle_files")]
8use includedir;
9
10use hyper::{
11 header::ACCEPT,
12 {Method, StatusCode},
13};
14
15type MyBody = http_body_util::combinators::BoxBody<bytes::Bytes, hyper::Error>;
16
17use tokio::sync::mpsc;
18use tokio_stream::StreamExt;
19
20use parking_lot::Mutex;
21use std::sync::Arc;
22
23use crate::access_control;
24use bui_backend_types::{AccessToken, CallbackDataAndSession, ConnectionKey, SessionKey};
25
26#[cfg(feature = "serve_files")]
27use std::io::Read;
28
29use serde::{Deserialize, Serialize};
30
31const JSON_TYPE: &str = "application/json";
33const JSON_NULL: &[u8] = b"{}";
34
35#[derive(Serialize, Deserialize, Debug, Clone)]
37struct JwtClaims {
38 key: SessionKey,
39 exp: usize, }
41
42#[derive(Clone)]
47pub struct Config {
48 pub serve_filepath: &'static std::path::Path,
50 #[cfg(feature = "bundle_files")]
52 #[cfg_attr(docsrs, doc(cfg(feature = "bundle_files")))]
53 pub bundled_files: &'static includedir::Files,
54 pub channel_size: usize,
56 pub cookie_name: String,
58}
59
60pub type EventChunkSender = mpsc::Sender<hyper::body::Bytes>;
62
63#[derive(Debug)]
65pub struct NewEventStreamConnection {
66 pub chunk_sender: EventChunkSender,
68 pub session_key: SessionKey,
70 pub connection_key: ConnectionKey,
72 pub path: String,
74}
75
76type NewConnectionSender = mpsc::Sender<NewEventStreamConnection>;
77
78pub type RawReqHandler = Arc<
83 Box<
84 dyn (Fn(
85 http::response::Builder,
86 http::Request<hyper::body::Incoming>,
87 ) -> Result<http::Response<MyBody>, http::Error>)
88 + Send
89 + Sync,
90 >,
91>;
92
93pub trait CallbackHandler: Send + dyn_clone::DynClone {
95 type Data;
97
98 fn call<'a>(
102 &'a self,
103 data_sess: CallbackDataAndSession<Self::Data>,
104 ) -> Pin<Box<dyn Future<Output = Result<(), Box<dyn StdError + Send>>> + Send + 'a>>;
105}
106
107dyn_clone::clone_trait_object!(<CB> CallbackHandler<Data = CB>);
108
109#[derive(Clone)]
113pub struct NoopCallbackHandler {}
114
115impl CallbackHandler for NoopCallbackHandler {
116 type Data = ();
117
118 fn call<'a>(
119 &'a self,
120 _: CallbackDataAndSession<Self::Data>,
121 ) -> Pin<Box<dyn Future<Output = Result<(), Box<dyn StdError + Send>>> + Send + 'a>> {
122 Box::pin(async { Ok(()) })
123 }
124}
125
126#[derive(Clone)]
131pub struct ErrorCallbackHandler {}
132
133impl CallbackHandler for ErrorCallbackHandler {
134 type Data = ();
135
136 fn call<'a>(
137 &'a self,
138 _: CallbackDataAndSession<Self::Data>,
139 ) -> Pin<Box<dyn Future<Output = Result<(), Box<dyn StdError + Send>>> + Send + 'a>> {
140 let e = Box::new(ErrorCallbackError {});
141 let e = e as Box<dyn StdError + Send>;
142 Box::pin(async { Err(e) })
143 }
144}
145
146#[derive(Clone)]
150pub struct BuiService<CB> {
151 config: Config,
152 callback_handler: Box<dyn Send + CallbackHandler<Data = CB>>,
153 next_connection_key: Arc<Mutex<ConnectionKey>>,
154 jwt_secret: Vec<u8>,
155 encoding_key: jsonwebtoken::EncodingKey,
156 valid_token: AccessToken,
157 tx_new_connection: NewConnectionSender,
158 events_prefix: String,
159 raw_req_handler: Option<RawReqHandler>,
160}
161
162impl<CB> BuiService<CB> {
163 fn fullpath(&self, path: &str) -> String {
164 assert!(path.starts_with('/')); let path = std::path::PathBuf::from(path)
166 .strip_prefix("/")
167 .unwrap()
168 .to_path_buf();
169 assert!(!path.starts_with("..")); let base = std::path::PathBuf::from(self.config.serve_filepath);
172 let result = base.join(path);
173 result.into_os_string().into_string().unwrap()
174 }
175
176 #[cfg(feature = "bundle_files")]
177 fn get_file_content(&self, file_path: &str) -> Option<Vec<u8>> {
178 let fullpath = self.fullpath(file_path);
179 let r = self.config.bundled_files.get(&fullpath);
180 match r {
181 Ok(s) => Some(s.into_owned()),
182 Err(_) => None,
183 }
184 }
185
186 #[cfg(feature = "serve_files")]
187 fn get_file_content(&self, file_path: &str) -> Option<Vec<u8>> {
188 let fullpath = self.fullpath(file_path);
189 let mut file = match std::fs::File::open(&fullpath) {
190 Ok(f) => f,
191 Err(e) => {
192 warn!("requested path {:?}, but got error {:?}", file_path, e);
193 return None;
194 }
195 };
196 let mut contents = Vec::new();
197 match file.read_to_end(&mut contents) {
198 Ok(_) => {}
199 Err(e) => {
200 warn!("when reading path {:?}, got error {:?}", file_path, e);
201 return None;
202 }
203 }
204 Some(contents)
205 }
206
207 pub fn events_prefix(&self) -> &str {
209 &self.events_prefix
210 }
211
212 fn get_next_connection_key(&self) -> ConnectionKey {
213 let mut nk = self.next_connection_key.lock();
214 let result = *nk;
215 nk.0 += 1;
216 result
217 }
218
219 fn do_set_cookie_x(
220 &self,
221 resp: http::response::Builder,
222 ) -> (http::response::Builder, SessionKey) {
223 let session_key = SessionKey::new();
226 let claims = JwtClaims {
227 key: session_key,
228 exp: 10000000000,
229 };
230
231 let token = {
232 jsonwebtoken::encode(
233 &jsonwebtoken::Header::default(),
234 &claims,
235 &self.encoding_key,
236 )
237 .unwrap()
238 };
239 let mut c = cookie::Cookie::new(self.config.cookie_name.clone(), token);
240 c.set_same_site(cookie::SameSite::Strict);
241 c.set_http_only(true);
242 let resp = resp.header(
243 hyper::header::SET_COOKIE,
244 hyper::header::HeaderValue::from_str(&c.to_string()).unwrap(),
245 );
246 (resp, session_key)
247 }
248}
249
250fn body_from_buf(body_buf: &[u8]) -> MyBody {
251 let body = http_body_util::Full::new(bytes::Bytes::from(body_buf.to_vec()));
252 use http_body_util::BodyExt;
253 MyBody::new(body.map_err(|_: std::convert::Infallible| unreachable!()))
254}
255
256async fn handle_req<CB>(
257 self_: BuiService<CB>,
258 req: http::Request<hyper::body::Incoming>,
259 mut resp: http::response::Builder,
260 login_info: ValidLogin,
261 raw_req_handler: Option<RawReqHandler>,
262) -> Result<http::Response<MyBody>, http::Error> {
263 let session_key = match login_info {
265 ValidLogin::NeedsSessionKey => {
266 let (resp2, session_key) = self_.do_set_cookie_x(resp);
267 resp = resp2;
268 session_key
269 }
270 ValidLogin::ExistingSession(k) => k,
271 };
272
273 let resp_final = match (req.method(), req.uri().path()) {
274 (&Method::GET, path) => {
275 let path = if path == "/" { "/index.html" } else { path };
276
277 if path.starts_with(&self_.events_prefix) {
278 let mut accepts_event_stream = false;
283 for value in req.headers().get_all(ACCEPT).iter() {
284 if value
285 .to_str()
286 .expect("to_str()")
287 .contains("text/event-stream")
288 {
289 accepts_event_stream = true;
290 }
291 }
292
293 if accepts_event_stream {
294 let connection_key = self_.get_next_connection_key();
295 let (tx_event_stream, rx_event_stream) =
296 mpsc::channel(self_.config.channel_size);
297
298 let rx_event_stream =
299 tokio_stream::wrappers::ReceiverStream::new(rx_event_stream);
300
301 {
302 let conn_info = NewEventStreamConnection {
303 chunk_sender: tx_event_stream,
304 session_key,
305 connection_key,
306 path: path.to_string(),
307 };
308
309 let send_future = self_.tx_new_connection.send(conn_info);
310 match send_future.await {
311 Ok(()) => {}
312 Err(e) => {
313 error!("failed to send new connection info: {:?}", e);
314 }
316 };
317 }
318
319 resp = resp.header(
320 hyper::header::CONTENT_TYPE,
321 hyper::header::HeaderValue::from_str("text/event-stream")
322 .expect("from_str"),
323 );
324
325 let rx_event_stream2 = rx_event_stream.map(|data: bytes::Bytes| {
326 Ok::<_, hyper::Error>(hyper::body::Frame::data(data))
327 });
328
329 resp.body(MyBody::new(http_body_util::StreamBody::new(
330 rx_event_stream2,
331 )))?
332 } else {
333 let estr = "Event request does not specify \
334 'Accept' or does not accept the required \
335 'text/event-stream'"
336 .to_string();
337 warn!("{}", estr);
338 let e = ErrorsBackToBrowser { errors: vec![estr] };
339 let body_buf = serde_json::to_vec(&e).unwrap();
340 resp = resp.status(StatusCode::BAD_REQUEST);
341 resp.body(body_from_buf(&body_buf))?
342 }
343 } else {
344 match self_.get_file_content(path) {
346 Some(buf) => {
347 let path = std::path::Path::new(path);
348 let mime_type = match path.extension().map(|x| x.to_str()).unwrap_or(None) {
349 Some(ext) => conduit_mime_types::get_mime_type(ext),
350 None => None,
351 };
352
353 if let Some(mime_type) = mime_type {
354 resp = resp.header(
355 hyper::header::CONTENT_TYPE,
356 hyper::header::HeaderValue::from_str(mime_type).expect("from_str"),
357 );
358 }
359 resp.body(body_from_buf(&buf))?
360 }
361 None => {
362 if let Some(raw_req_handler) = raw_req_handler {
363 raw_req_handler(resp, req)?
364 } else {
365 resp = resp.status(StatusCode::NOT_FOUND);
366 resp.body(body_from_buf(&[]))?
367 }
368 }
369 }
370 }
371 }
372 _ => {
373 if let Some(raw_req_handler) = raw_req_handler {
374 raw_req_handler(resp, req)?
375 } else {
376 resp = resp.status(StatusCode::NOT_FOUND);
377 resp.body(body_from_buf(&[]))?
378 }
379 }
380 };
381 Ok(resp_final)
382}
383
384fn handle_callback<CB>(
385 handler: Box<dyn CallbackHandler<Data = CB> + Send>,
386 session_key: bui_backend_types::SessionKey,
387 resp0: http::response::Builder,
388 req: http::Request<hyper::body::Incoming>,
389) -> Pin<Box<dyn Future<Output = Result<http::Response<MyBody>, hyper::Error>> + Send>>
390where
391 CB: 'static + serde::de::DeserializeOwned + Send,
392{
393 let result = async move {
394 let body = req.into_body();
395 let chunks: Result<http_body_util::Collected<bytes::Bytes>, hyper::Error> = {
396 use http_body_util::BodyExt;
397 body.collect().await
398 };
399 let data = chunks?.to_bytes();
400
401 match serde_json::from_slice::<CB>(&data) {
409 Ok(payload) => {
410 let args2 = CallbackDataAndSession {
411 payload,
412 session_key,
413 };
414
415 let x = {
416 let fut = handler.call(args2);
417 fut.await
418 };
419
420 let r0 = match x {
422 Ok(()) => resp0
423 .header(hyper::header::CONTENT_TYPE, JSON_TYPE)
424 .body(body_from_buf(JSON_NULL))
425 .expect("response"),
426 Err(e) => {
427 error!("internal server error: {:?}", e);
428 resp0
429 .header(hyper::header::CONTENT_TYPE, JSON_TYPE)
430 .status(StatusCode::INTERNAL_SERVER_ERROR)
431 .body(body_from_buf(JSON_NULL))
432 .expect("response")
433 }
434 };
435 Ok(r0)
436 }
437 Err(e) => Ok(on_json_parse_err(e)),
438 }
439 };
440 Box::pin(result)
441}
442
443fn on_json_parse_err(e: serde_json::Error) -> http::Response<MyBody> {
444 let estr = format!("Failed parsing JSON: {}", e);
445 warn!("{}", estr);
446 let e = ErrorsBackToBrowser { errors: vec![estr] };
447 let body_buf = serde_json::to_vec(&e).unwrap();
448 http::Response::builder()
449 .header(hyper::header::CONTENT_TYPE, JSON_TYPE)
450 .status(StatusCode::BAD_REQUEST)
451 .body(body_from_buf(&body_buf))
452 .expect("response")
453}
454
455#[derive(Serialize, Deserialize, Debug, Clone)]
456struct ErrorsBackToBrowser {
457 errors: Vec<String>,
458}
459
460#[derive(Debug)]
461struct ErrorCallbackError {}
462impl StdError for ErrorCallbackError {}
463
464impl std::fmt::Display for ErrorCallbackError {
465 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> Result<(), std::fmt::Error> {
466 write!(f, "ErrorCallbackError")
467 }
468}
469
470#[derive(Debug)]
473enum ValidLogin {
474 ExistingSession(SessionKey),
475 NeedsSessionKey,
476}
477
478fn get_session_key<'a>(
479 map: &hyper::HeaderMap<hyper::header::HeaderValue>,
480 query_pairs: url::form_urlencoded::Parse,
481 cookie_name: &str,
482 decoding_key: &jsonwebtoken::DecodingKey,
483 valid_token: &AccessToken,
484) -> Result<ValidLogin, ErrorsBackToBrowser> {
485 use std::borrow::Cow;
486
487 let mut errors = Vec::new();
488
489 for (key, value) in query_pairs {
491 debug!("got query pair {}, {}", key, value);
492 if key == Cow::Borrowed("token") {
493 if valid_token.does_match(&value) {
494 return Ok(ValidLogin::NeedsSessionKey);
495 } else {
496 warn!("incorrect token in URI: {}", value);
497 errors.push("incorrect token in URI".to_string());
498 }
499 }
500 }
501
502 for cookie in map.get_all(hyper::header::COOKIE).iter() {
505 match cookie.to_str() {
506 Ok(cookie_str) => {
507 let res_c = cookie::Cookie::parse(cookie_str.to_string());
508 match res_c {
509 Ok(c) => {
510 if c.name() == cookie_name {
511 let encoded = c.value();
512 debug!("jwt_encoded = {}", encoded);
513 let validation = jsonwebtoken::Validation::new(Default::default());
514 match jsonwebtoken::decode::<JwtClaims>(
515 encoded,
516 decoding_key,
517 &validation,
518 )
519 .map(|token| token.claims.key)
520 {
521 Ok(k) => return Ok(ValidLogin::ExistingSession(k)),
522 Err(e) => {
523 warn!("client passed token in cookie {:?}, resulting in error: {:?}", c, e);
524 let estr = format!("{}: {:?}", e, e);
525 errors.push(estr);
526 }
527 }
528 }
529 }
530 Err(e) => {
531 let estr = format!("cookie not parsed: {:?}", e);
532 warn!("{}", estr);
533 errors.push(estr);
534 }
535 }
536 }
537 Err(e) => {
538 let estr = format!("cookie not converted to str: {:?}", e);
539 warn!("{}", estr);
540 errors.push(estr);
541 }
542 }
543 }
544
545 debug!("no (valid) session key found");
547 match valid_token {
548 AccessToken::NoToken => {
549 debug!("no token needed, will give new session key");
550 Ok(ValidLogin::NeedsSessionKey)
551 }
552 _ => {
553 errors.push("no valid session key".to_string());
554 Err(ErrorsBackToBrowser { errors })
555 }
556 }
557}
558
559impl<CB> hyper::service::Service<hyper::Request<hyper::body::Incoming>> for BuiService<CB>
560where
561 CB: 'static + serde::de::DeserializeOwned + Clone + Send,
562{
563 type Response = hyper::Response<MyBody>;
564 type Error = hyper::Error;
565 type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;
566
567 fn call(&self, req: http::Request<hyper::body::Incoming>) -> Self::Future {
568 let decoding_key = jsonwebtoken::DecodingKey::from_secret(&self.jwt_secret);
569 let res_session_key = {
571 let query = req.uri().query();
572 debug!("parsing query {:?}", query);
573
574 let pairs = url::form_urlencoded::parse(query.unwrap_or("").as_bytes());
575
576 get_session_key(
577 req.headers(),
578 pairs,
579 &self.config.cookie_name,
580 &decoding_key,
581 &self.valid_token,
582 )
583 };
584
585 debug!(
586 "got request from session key {:?}: {:?}",
587 res_session_key, req
588 );
589
590 if req.method() == Method::POST && req.uri().path() == "/callback" {
591 let login_info = match res_session_key {
592 Ok(login_info) => login_info,
593 Err(errors) => {
594 warn!("no (valid) session key in callback");
595 let body_buf = serde_json::to_vec(&errors).unwrap();
596 let resp = http::Response::builder()
597 .header(hyper::header::CONTENT_TYPE, JSON_TYPE)
598 .status(StatusCode::BAD_REQUEST)
599 .body(body_from_buf(&body_buf))
600 .expect("response");
601 return Box::pin(std::future::ready(Ok(resp)));
602 }
603 };
604
605 let mut resp0 = http::Response::builder();
606 let session_key = match login_info {
607 ValidLogin::NeedsSessionKey => {
608 let (resp2, session_key) = self.do_set_cookie_x(resp0);
609 resp0 = resp2;
610 session_key
611 }
612 ValidLogin::ExistingSession(k) => k,
613 };
614
615 return Box::pin(handle_callback(
616 self.callback_handler.clone(),
617 session_key,
618 resp0,
619 req,
620 ));
621 }
622
623 let resp = http::Response::builder();
624
625 let login_info = match res_session_key {
626 Ok(login_info) => login_info,
627 Err(_errors) => {
628 let estr = "No (valid) token in request.".to_string();
629 let errors = ErrorsBackToBrowser { errors: vec![estr] };
630
631 let body_buf = serde_json::to_vec(&errors).unwrap();
632 let resp = http::Response::builder()
633 .header(hyper::header::CONTENT_TYPE, JSON_TYPE)
634 .status(StatusCode::BAD_REQUEST)
635 .body(body_from_buf(&body_buf))
636 .expect("response");
637 return Box::pin(std::future::ready(Ok(resp)));
638 }
639 };
640
641 use futures::future::FutureExt;
642 let resp_final = handle_req(
643 self.clone(),
644 req,
645 resp,
646 login_info,
647 self.raw_req_handler.clone(),
648 )
649 .map(|r| match r {
650 Ok(x) => Ok(x),
651 Err(_e) => unimplemented!(),
652 });
653
654 Box::pin(resp_final)
655 }
656}
657
658pub fn launcher<CB>(
660 config: Config,
661 auth: &access_control::AccessControl,
662 channel_size: usize,
663 events_prefix: &str,
664 raw_req_handler: Option<RawReqHandler>,
665 callback_handler: Box<dyn Send + CallbackHandler<Data = CB>>,
666) -> (mpsc::Receiver<NewEventStreamConnection>, BuiService<CB>) {
667 let next_connection_key = Arc::new(Mutex::new(ConnectionKey(0)));
668
669 let (tx_new_connection, rx_new_connection) = mpsc::channel(channel_size);
670
671 let service = BuiService {
672 config,
673 callback_handler,
674 next_connection_key,
675 jwt_secret: auth.jwt_secret().to_vec(),
676 encoding_key: jsonwebtoken::EncodingKey::from_secret(auth.jwt_secret()),
677 valid_token: auth.token(),
678 tx_new_connection,
679 events_prefix: events_prefix.to_string(),
680 raw_req_handler,
681 };
682
683 (rx_new_connection, service)
684}