Skip to main content

bui_backend/
lowlevel.rs

1//! Implements a web server for a Browser User Interface (BUI).
2
3use std::{error::Error as StdError, future::Future, pin::Pin};
4
5use http;
6use hyper;
7#[cfg(feature = "bundle_files")]
8use includedir;
9
10use hyper::{
11    header::ACCEPT,
12    {Method, StatusCode},
13};
14
15type MyBody = http_body_util::combinators::BoxBody<bytes::Bytes, hyper::Error>;
16
17use tokio::sync::mpsc;
18use tokio_stream::StreamExt;
19
20use parking_lot::Mutex;
21use std::sync::Arc;
22
23use crate::access_control;
24use bui_backend_types::{AccessToken, CallbackDataAndSession, ConnectionKey, SessionKey};
25
26#[cfg(feature = "serve_files")]
27use std::io::Read;
28
29use serde::{Deserialize, Serialize};
30
31// ---------------------------
32const JSON_TYPE: &str = "application/json";
33const JSON_NULL: &[u8] = b"{}";
34
35/// The claims validated using JSON Web Tokens.
36#[derive(Serialize, Deserialize, Debug, Clone)]
37struct JwtClaims {
38    key: SessionKey,
39    exp: usize, // Required (validate_exp defaults to true in validation). Expiration time (as UTC timestamp)
40}
41
42/// Configuration settings for `BuiService`.
43///
44/// Defaults can be loaded using the function `get_default_config()`
45/// generated by the `bui_backend_codegen` crate.
46#[derive(Clone)]
47pub struct Config {
48    /// Location of the files to be served.
49    pub serve_filepath: &'static std::path::Path,
50    /// The bundled files.
51    #[cfg(feature = "bundle_files")]
52    #[cfg_attr(docsrs, doc(cfg(feature = "bundle_files")))]
53    pub bundled_files: &'static includedir::Files,
54    /// The number of messages in the event stream channel before blocking.
55    pub channel_size: usize,
56    /// The name of the cookie stored in the clients browser.
57    pub cookie_name: String,
58}
59
60/// Wrapper around `hyper::body::Bytes` to enable sending data to clients.
61pub type EventChunkSender = mpsc::Sender<hyper::body::Bytes>;
62
63/// Wrap the sender to each connected event stream listener.
64#[derive(Debug)]
65pub struct NewEventStreamConnection {
66    /// A sink for messages send to each connection (one per client tab).
67    pub chunk_sender: EventChunkSender,
68    /// Identifier for each connected session (one per client browser).
69    pub session_key: SessionKey,
70    /// Identifier for each connection (one per client tab).
71    pub connection_key: ConnectionKey,
72    /// The path being requested (starts with `BuiService::events_prefix`).
73    pub path: String,
74}
75
76type NewConnectionSender = mpsc::Sender<NewEventStreamConnection>;
77
78/// Handles HTTP requests not handled by bui_backend.
79///
80/// For any request, bui_backend will first check if it can respond
81/// directly and then, if not, call this request to the handling.
82pub type RawReqHandler = Arc<
83    Box<
84        dyn (Fn(
85                http::response::Builder,
86                http::Request<hyper::body::Incoming>,
87            ) -> Result<http::Response<MyBody>, http::Error>)
88            + Send
89            + Sync,
90    >,
91>;
92
93/// Implement this trait to handle callbacks.
94pub trait CallbackHandler: Send + dyn_clone::DynClone {
95    /// The type of the callback-provided data.
96    type Data;
97
98    /// HTTP request to "/callback" has been made with payload which as been
99    /// deserialized into `Self::Data` and session data stored in
100    /// [CallbackDataAndSession].
101    fn call<'a>(
102        &'a self,
103        data_sess: CallbackDataAndSession<Self::Data>,
104    ) -> Pin<Box<dyn Future<Output = Result<(), Box<dyn StdError + Send>>> + Send + 'a>>;
105}
106
107dyn_clone::clone_trait_object!(<CB> CallbackHandler<Data = CB>);
108
109/// A dummy callback handler that does nothing.
110///
111/// This is provided as an example.
112#[derive(Clone)]
113pub struct NoopCallbackHandler {}
114
115impl CallbackHandler for NoopCallbackHandler {
116    type Data = ();
117
118    fn call<'a>(
119        &'a self,
120        _: CallbackDataAndSession<Self::Data>,
121    ) -> Pin<Box<dyn Future<Output = Result<(), Box<dyn StdError + Send>>> + Send + 'a>> {
122        Box::pin(async { Ok(()) })
123    }
124}
125
126/// A callback handler that always returns an error.
127///
128/// This is provided for cases in which no callbacks are desired and also
129/// demonstrates error handling.
130#[derive(Clone)]
131pub struct ErrorCallbackHandler {}
132
133impl CallbackHandler for ErrorCallbackHandler {
134    type Data = ();
135
136    fn call<'a>(
137        &'a self,
138        _: CallbackDataAndSession<Self::Data>,
139    ) -> Pin<Box<dyn Future<Output = Result<(), Box<dyn StdError + Send>>> + Send + 'a>> {
140        let e = Box::new(ErrorCallbackError {});
141        let e = e as Box<dyn StdError + Send>;
142        Box::pin(async { Err(e) })
143    }
144}
145
146/// Handle HTTP requests and coordinate responses to data updates.
147///
148/// Implements `hyper::server::Service` to act as HTTP server and handle requests.
149#[derive(Clone)]
150pub struct BuiService<CB> {
151    config: Config,
152    callback_handler: Box<dyn Send + CallbackHandler<Data = CB>>,
153    next_connection_key: Arc<Mutex<ConnectionKey>>,
154    jwt_secret: Vec<u8>,
155    encoding_key: jsonwebtoken::EncodingKey,
156    valid_token: AccessToken,
157    tx_new_connection: NewConnectionSender,
158    events_prefix: String,
159    raw_req_handler: Option<RawReqHandler>,
160}
161
162impl<CB> BuiService<CB> {
163    fn fullpath(&self, path: &str) -> String {
164        assert!(path.starts_with('/')); // security check
165        let path = std::path::PathBuf::from(path)
166            .strip_prefix("/")
167            .unwrap()
168            .to_path_buf();
169        assert!(!path.starts_with("..")); // security check
170
171        let base = std::path::PathBuf::from(self.config.serve_filepath);
172        let result = base.join(path);
173        result.into_os_string().into_string().unwrap()
174    }
175
176    #[cfg(feature = "bundle_files")]
177    fn get_file_content(&self, file_path: &str) -> Option<Vec<u8>> {
178        let fullpath = self.fullpath(file_path);
179        let r = self.config.bundled_files.get(&fullpath);
180        match r {
181            Ok(s) => Some(s.into_owned()),
182            Err(_) => None,
183        }
184    }
185
186    #[cfg(feature = "serve_files")]
187    fn get_file_content(&self, file_path: &str) -> Option<Vec<u8>> {
188        let fullpath = self.fullpath(file_path);
189        let mut file = match std::fs::File::open(&fullpath) {
190            Ok(f) => f,
191            Err(e) => {
192                warn!("requested path {:?}, but got error {:?}", file_path, e);
193                return None;
194            }
195        };
196        let mut contents = Vec::new();
197        match file.read_to_end(&mut contents) {
198            Ok(_) => {}
199            Err(e) => {
200                warn!("when reading path {:?}, got error {:?}", file_path, e);
201                return None;
202            }
203        }
204        Some(contents)
205    }
206
207    /// Get the event stream path prefix.
208    pub fn events_prefix(&self) -> &str {
209        &self.events_prefix
210    }
211
212    fn get_next_connection_key(&self) -> ConnectionKey {
213        let mut nk = self.next_connection_key.lock();
214        let result = *nk;
215        nk.0 += 1;
216        result
217    }
218
219    fn do_set_cookie_x(
220        &self,
221        resp: http::response::Builder,
222    ) -> (http::response::Builder, SessionKey) {
223        // There was no valid client key in the HTTP header, so generate a
224        // new one and set it on client.
225        let session_key = SessionKey::new();
226        let claims = JwtClaims {
227            key: session_key,
228            exp: 10000000000,
229        };
230
231        let token = {
232            jsonwebtoken::encode(
233                &jsonwebtoken::Header::default(),
234                &claims,
235                &self.encoding_key,
236            )
237            .unwrap()
238        };
239        let mut c = cookie::Cookie::new(self.config.cookie_name.clone(), token);
240        c.set_same_site(cookie::SameSite::Strict);
241        c.set_http_only(true);
242        let resp = resp.header(
243            hyper::header::SET_COOKIE,
244            hyper::header::HeaderValue::from_str(&c.to_string()).unwrap(),
245        );
246        (resp, session_key)
247    }
248}
249
250fn body_from_buf(body_buf: &[u8]) -> MyBody {
251    let body = http_body_util::Full::new(bytes::Bytes::from(body_buf.to_vec()));
252    use http_body_util::BodyExt;
253    MyBody::new(body.map_err(|_: std::convert::Infallible| unreachable!()))
254}
255
256async fn handle_req<CB>(
257    self_: BuiService<CB>,
258    req: http::Request<hyper::body::Incoming>,
259    mut resp: http::response::Builder,
260    login_info: ValidLogin,
261    raw_req_handler: Option<RawReqHandler>,
262) -> Result<http::Response<MyBody>, http::Error> {
263    // TODO: convert this to be async yield when blocking on IO operations.
264    let session_key = match login_info {
265        ValidLogin::NeedsSessionKey => {
266            let (resp2, session_key) = self_.do_set_cookie_x(resp);
267            resp = resp2;
268            session_key
269        }
270        ValidLogin::ExistingSession(k) => k,
271    };
272
273    let resp_final = match (req.method(), req.uri().path()) {
274        (&Method::GET, path) => {
275            let path = if path == "/" { "/index.html" } else { path };
276
277            if path.starts_with(&self_.events_prefix) {
278                // Quality value parsing disabled with the following hack
279                // until this is addressed:
280                // https://github.com/hyperium/http/issues/213
281
282                let mut accepts_event_stream = false;
283                for value in req.headers().get_all(ACCEPT).iter() {
284                    if value
285                        .to_str()
286                        .expect("to_str()")
287                        .contains("text/event-stream")
288                    {
289                        accepts_event_stream = true;
290                    }
291                }
292
293                if accepts_event_stream {
294                    let connection_key = self_.get_next_connection_key();
295                    let (tx_event_stream, rx_event_stream) =
296                        mpsc::channel(self_.config.channel_size);
297
298                    let rx_event_stream =
299                        tokio_stream::wrappers::ReceiverStream::new(rx_event_stream);
300
301                    {
302                        let conn_info = NewEventStreamConnection {
303                            chunk_sender: tx_event_stream,
304                            session_key,
305                            connection_key,
306                            path: path.to_string(),
307                        };
308
309                        let send_future = self_.tx_new_connection.send(conn_info);
310                        match send_future.await {
311                            Ok(()) => {}
312                            Err(e) => {
313                                error!("failed to send new connection info: {:?}", e);
314                                // should we panic here?
315                            }
316                        };
317                    }
318
319                    resp = resp.header(
320                        hyper::header::CONTENT_TYPE,
321                        hyper::header::HeaderValue::from_str("text/event-stream")
322                            .expect("from_str"),
323                    );
324
325                    let rx_event_stream2 = rx_event_stream.map(|data: bytes::Bytes| {
326                        Ok::<_, hyper::Error>(hyper::body::Frame::data(data))
327                    });
328
329                    resp.body(MyBody::new(http_body_util::StreamBody::new(
330                        rx_event_stream2,
331                    )))?
332                } else {
333                    let estr = "Event request does not specify \
334                        'Accept' or does not accept the required \
335                        'text/event-stream'"
336                        .to_string();
337                    warn!("{}", estr);
338                    let e = ErrorsBackToBrowser { errors: vec![estr] };
339                    let body_buf = serde_json::to_vec(&e).unwrap();
340                    resp = resp.status(StatusCode::BAD_REQUEST);
341                    resp.body(body_from_buf(&body_buf))?
342                }
343            } else {
344                // TODO read file asynchronously
345                match self_.get_file_content(path) {
346                    Some(buf) => {
347                        let path = std::path::Path::new(path);
348                        let mime_type = match path.extension().map(|x| x.to_str()).unwrap_or(None) {
349                            Some(ext) => conduit_mime_types::get_mime_type(ext),
350                            None => None,
351                        };
352
353                        if let Some(mime_type) = mime_type {
354                            resp = resp.header(
355                                hyper::header::CONTENT_TYPE,
356                                hyper::header::HeaderValue::from_str(mime_type).expect("from_str"),
357                            );
358                        }
359                        resp.body(body_from_buf(&buf))?
360                    }
361                    None => {
362                        if let Some(raw_req_handler) = raw_req_handler {
363                            raw_req_handler(resp, req)?
364                        } else {
365                            resp = resp.status(StatusCode::NOT_FOUND);
366                            resp.body(body_from_buf(&[]))?
367                        }
368                    }
369                }
370            }
371        }
372        _ => {
373            if let Some(raw_req_handler) = raw_req_handler {
374                raw_req_handler(resp, req)?
375            } else {
376                resp = resp.status(StatusCode::NOT_FOUND);
377                resp.body(body_from_buf(&[]))?
378            }
379        }
380    };
381    Ok(resp_final)
382}
383
384fn handle_callback<CB>(
385    handler: Box<dyn CallbackHandler<Data = CB> + Send>,
386    session_key: bui_backend_types::SessionKey,
387    resp0: http::response::Builder,
388    req: http::Request<hyper::body::Incoming>,
389) -> Pin<Box<dyn Future<Output = Result<http::Response<MyBody>, hyper::Error>> + Send>>
390where
391    CB: 'static + serde::de::DeserializeOwned + Send,
392{
393    let result = async move {
394        let body = req.into_body();
395        let chunks: Result<http_body_util::Collected<bytes::Bytes>, hyper::Error> = {
396            use http_body_util::BodyExt;
397            body.collect().await
398        };
399        let data = chunks?.to_bytes();
400
401        // parse data
402
403        // Here we convert from a Vec<u8> JSON buf to our
404        // generic type `CB` whose definition can be shared
405        // between backend and frontend if using a Rust frontend.
406        // (If not using a rust frontend, the payload should be
407        // constructed such that this conversion succeeds.
408        match serde_json::from_slice::<CB>(&data) {
409            Ok(payload) => {
410                let args2 = CallbackDataAndSession {
411                    payload,
412                    session_key,
413                };
414
415                let x = {
416                    let fut = handler.call(args2);
417                    fut.await
418                };
419
420                // Send the payload to callback.
421                let r0 = match x {
422                    Ok(()) => resp0
423                        .header(hyper::header::CONTENT_TYPE, JSON_TYPE)
424                        .body(body_from_buf(JSON_NULL))
425                        .expect("response"),
426                    Err(e) => {
427                        error!("internal server error: {:?}", e);
428                        resp0
429                            .header(hyper::header::CONTENT_TYPE, JSON_TYPE)
430                            .status(StatusCode::INTERNAL_SERVER_ERROR)
431                            .body(body_from_buf(JSON_NULL))
432                            .expect("response")
433                    }
434                };
435                Ok(r0)
436            }
437            Err(e) => Ok(on_json_parse_err(e)),
438        }
439    };
440    Box::pin(result)
441}
442
443fn on_json_parse_err(e: serde_json::Error) -> http::Response<MyBody> {
444    let estr = format!("Failed parsing JSON: {}", e);
445    warn!("{}", estr);
446    let e = ErrorsBackToBrowser { errors: vec![estr] };
447    let body_buf = serde_json::to_vec(&e).unwrap();
448    http::Response::builder()
449        .header(hyper::header::CONTENT_TYPE, JSON_TYPE)
450        .status(StatusCode::BAD_REQUEST)
451        .body(body_from_buf(&body_buf))
452        .expect("response")
453}
454
455#[derive(Serialize, Deserialize, Debug, Clone)]
456struct ErrorsBackToBrowser {
457    errors: Vec<String>,
458}
459
460#[derive(Debug)]
461struct ErrorCallbackError {}
462impl StdError for ErrorCallbackError {}
463
464impl std::fmt::Display for ErrorCallbackError {
465    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> Result<(), std::fmt::Error> {
466        write!(f, "ErrorCallbackError")
467    }
468}
469
470/// User can login either with URL query param (no session key provided) or
471/// with cookie which includes the session key.
472#[derive(Debug)]
473enum ValidLogin {
474    ExistingSession(SessionKey),
475    NeedsSessionKey,
476}
477
478fn get_session_key<'a>(
479    map: &hyper::HeaderMap<hyper::header::HeaderValue>,
480    query_pairs: url::form_urlencoded::Parse,
481    cookie_name: &str,
482    decoding_key: &jsonwebtoken::DecodingKey,
483    valid_token: &AccessToken,
484) -> Result<ValidLogin, ErrorsBackToBrowser> {
485    use std::borrow::Cow;
486
487    let mut errors = Vec::new();
488
489    // first check for token in URI
490    for (key, value) in query_pairs {
491        debug!("got query pair {}, {}", key, value);
492        if key == Cow::Borrowed("token") {
493            if valid_token.does_match(&value) {
494                return Ok(ValidLogin::NeedsSessionKey);
495            } else {
496                warn!("incorrect token in URI: {}", value);
497                errors.push("incorrect token in URI".to_string());
498            }
499        }
500    }
501
502    // if no token there, check cookie.
503
504    for cookie in map.get_all(hyper::header::COOKIE).iter() {
505        match cookie.to_str() {
506            Ok(cookie_str) => {
507                let res_c = cookie::Cookie::parse(cookie_str.to_string());
508                match res_c {
509                    Ok(c) => {
510                        if c.name() == cookie_name {
511                            let encoded = c.value();
512                            debug!("jwt_encoded = {}", encoded);
513                            let validation = jsonwebtoken::Validation::new(Default::default());
514                            match jsonwebtoken::decode::<JwtClaims>(
515                                encoded,
516                                decoding_key,
517                                &validation,
518                            )
519                            .map(|token| token.claims.key)
520                            {
521                                Ok(k) => return Ok(ValidLogin::ExistingSession(k)),
522                                Err(e) => {
523                                    warn!("client passed token in cookie {:?}, resulting in error: {:?}", c, e);
524                                    let estr = format!("{}: {:?}", e, e);
525                                    errors.push(estr);
526                                }
527                            }
528                        }
529                    }
530                    Err(e) => {
531                        let estr = format!("cookie not parsed: {:?}", e);
532                        warn!("{}", estr);
533                        errors.push(estr);
534                    }
535                }
536            }
537            Err(e) => {
538                let estr = format!("cookie not converted to str: {:?}", e);
539                warn!("{}", estr);
540                errors.push(estr);
541            }
542        }
543    }
544
545    // If we are here, we got no (valid) session key.
546    debug!("no (valid) session key found");
547    match valid_token {
548        AccessToken::NoToken => {
549            debug!("no token needed, will give new session key");
550            Ok(ValidLogin::NeedsSessionKey)
551        }
552        _ => {
553            errors.push("no valid session key".to_string());
554            Err(ErrorsBackToBrowser { errors })
555        }
556    }
557}
558
559impl<CB> hyper::service::Service<hyper::Request<hyper::body::Incoming>> for BuiService<CB>
560where
561    CB: 'static + serde::de::DeserializeOwned + Clone + Send,
562{
563    type Response = hyper::Response<MyBody>;
564    type Error = hyper::Error;
565    type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;
566
567    fn call(&self, req: http::Request<hyper::body::Incoming>) -> Self::Future {
568        let decoding_key = jsonwebtoken::DecodingKey::from_secret(&self.jwt_secret);
569        // Parse cookies.
570        let res_session_key = {
571            let query = req.uri().query();
572            debug!("parsing query {:?}", query);
573
574            let pairs = url::form_urlencoded::parse(query.unwrap_or("").as_bytes());
575
576            get_session_key(
577                req.headers(),
578                pairs,
579                &self.config.cookie_name,
580                &decoding_key,
581                &self.valid_token,
582            )
583        };
584
585        debug!(
586            "got request from session key {:?}: {:?}",
587            res_session_key, req
588        );
589
590        if req.method() == Method::POST && req.uri().path() == "/callback" {
591            let login_info = match res_session_key {
592                Ok(login_info) => login_info,
593                Err(errors) => {
594                    warn!("no (valid) session key in callback");
595                    let body_buf = serde_json::to_vec(&errors).unwrap();
596                    let resp = http::Response::builder()
597                        .header(hyper::header::CONTENT_TYPE, JSON_TYPE)
598                        .status(StatusCode::BAD_REQUEST)
599                        .body(body_from_buf(&body_buf))
600                        .expect("response");
601                    return Box::pin(std::future::ready(Ok(resp)));
602                }
603            };
604
605            let mut resp0 = http::Response::builder();
606            let session_key = match login_info {
607                ValidLogin::NeedsSessionKey => {
608                    let (resp2, session_key) = self.do_set_cookie_x(resp0);
609                    resp0 = resp2;
610                    session_key
611                }
612                ValidLogin::ExistingSession(k) => k,
613            };
614
615            return Box::pin(handle_callback(
616                self.callback_handler.clone(),
617                session_key,
618                resp0,
619                req,
620            ));
621        }
622
623        let resp = http::Response::builder();
624
625        let login_info = match res_session_key {
626            Ok(login_info) => login_info,
627            Err(_errors) => {
628                let estr = "No (valid) token in request.".to_string();
629                let errors = ErrorsBackToBrowser { errors: vec![estr] };
630
631                let body_buf = serde_json::to_vec(&errors).unwrap();
632                let resp = http::Response::builder()
633                    .header(hyper::header::CONTENT_TYPE, JSON_TYPE)
634                    .status(StatusCode::BAD_REQUEST)
635                    .body(body_from_buf(&body_buf))
636                    .expect("response");
637                return Box::pin(std::future::ready(Ok(resp)));
638            }
639        };
640
641        use futures::future::FutureExt;
642        let resp_final = handle_req(
643            self.clone(),
644            req,
645            resp,
646            login_info,
647            self.raw_req_handler.clone(),
648        )
649        .map(|r| match r {
650            Ok(x) => Ok(x),
651            Err(_e) => unimplemented!(),
652        });
653
654        Box::pin(resp_final)
655    }
656}
657
658/// Create a stream of connection events and a `BuiService`.
659pub fn launcher<CB>(
660    config: Config,
661    auth: &access_control::AccessControl,
662    channel_size: usize,
663    events_prefix: &str,
664    raw_req_handler: Option<RawReqHandler>,
665    callback_handler: Box<dyn Send + CallbackHandler<Data = CB>>,
666) -> (mpsc::Receiver<NewEventStreamConnection>, BuiService<CB>) {
667    let next_connection_key = Arc::new(Mutex::new(ConnectionKey(0)));
668
669    let (tx_new_connection, rx_new_connection) = mpsc::channel(channel_size);
670
671    let service = BuiService {
672        config,
673        callback_handler,
674        next_connection_key,
675        jwt_secret: auth.jwt_secret().to_vec(),
676        encoding_key: jsonwebtoken::EncodingKey::from_secret(auth.jwt_secret()),
677        valid_token: auth.token(),
678        tx_new_connection,
679        events_prefix: events_prefix.to_string(),
680        raw_req_handler,
681    };
682
683    (rx_new_connection, service)
684}