bui_backend/
highlevel.rs

1//! Helpers for writing browser user interfaces (BUIs).
2use crate::lowlevel::{BuiService, EventChunkSender};
3use bui_backend_types::{ConnectionKey, SessionKey};
4
5use async_change_tracker::ChangeTracker;
6
7use std::collections::HashMap;
8use std::sync::Arc;
9
10use tokio::sync::mpsc;
11
12use parking_lot::RwLock;
13use uuid::Uuid;
14
15use serde::Serialize;
16
17use bui_backend_types::AccessToken;
18
19use crate::access_control;
20use crate::lowlevel::NewEventStreamConnection;
21use crate::Error;
22
23// ------
24
25/// The type of possible connect event, either connect or disconnect.
26#[derive(Debug)]
27pub enum ConnectionEventType {
28    /// A connection event with sink for event stream messages to the connected client.
29    Connect(EventChunkSender),
30    /// A disconnection event.
31    Disconnect,
32}
33
34/// State associated with connection or disconnection.
35#[derive(Debug)]
36pub struct ConnectionEvent {
37    /// The type of connection for this event.
38    pub typ: ConnectionEventType,
39    /// Identifier for the connecting session (one ber browser).
40    pub session_key: SessionKey,
41    /// Identifier for the connection (one ber tab).
42    pub connection_key: ConnectionKey,
43    /// The path being requested (starts with `BuiService::events_prefix`).
44    pub path: String,
45}
46
47// ------
48
49/// Maintain state within a BUI application.
50pub struct BuiAppInner<T, CB> {
51    i_shared_arc: Arc<RwLock<ChangeTracker<T>>>,
52    i_txers: Arc<RwLock<HashMap<ConnectionKey, (SessionKey, EventChunkSender, String)>>>,
53    i_bui_server: BuiService<CB>,
54    auth: access_control::AccessControl,
55    local_addr: std::net::SocketAddr,
56}
57
58impl<'a, T, CB> BuiAppInner<T, CB> {
59    /// Get reference counted reference to the underlying data store.
60    pub fn shared_arc(&self) -> &Arc<RwLock<ChangeTracker<T>>> {
61        &self.i_shared_arc
62    }
63
64    /// Get reference to to the underlying `BuiService`.
65    pub fn bui_service(&self) -> &BuiService<CB> {
66        &self.i_bui_server
67    }
68
69    /// Get our local IP address.
70    pub fn local_addr(&self) -> &std::net::SocketAddr {
71        &self.local_addr
72    }
73
74    /// Get our access token.
75    pub fn token(&self) -> AccessToken {
76        self.auth.token()
77    }
78
79    /// Attempt to get our URL.
80    ///
81    /// This may fail if, for example, the locally known IP address is
82    /// not the IP address that users will connect to.
83    pub fn guess_url_with_token(&self) -> String {
84        match self.auth.token() {
85            AccessToken::NoToken => format!("http://{}", self.local_addr),
86            AccessToken::PreSharedToken(ref tok) => {
87                format!("http://{}/?token={}", self.local_addr, tok)
88            }
89        }
90    }
91}
92
93/// Generate a random token
94pub fn generate_valid_token() -> String {
95    let my_uuid = Uuid::new_v4();
96    format!("{}", my_uuid)
97}
98
99/// Generate a random token and return access control information. Requires JWT secret.
100pub fn generate_random_auth(
101    addr: std::net::SocketAddr,
102    secret: Vec<u8>,
103) -> Result<access_control::AccessControl, Error> {
104    generate_auth_with_token(addr, secret, generate_valid_token())
105}
106
107/// Return access control information given a token and a JWT secret.
108pub fn generate_auth_with_token(
109    addr: std::net::SocketAddr,
110    secret: Vec<u8>,
111    token: String,
112) -> Result<access_control::AccessControl, Error> {
113    let access_token = AccessToken::PreSharedToken(token);
114    let info = access_control::AccessInfo::new(addr, access_token, secret)?;
115    Ok(access_control::AccessControl::WithToken(info))
116}
117
118/// Factory function to create a new BUI application.
119pub async fn create_bui_app_inner<'a, T, CB>(
120    handle: tokio::runtime::Handle,
121    mut shutdown_rx: Option<tokio::sync::oneshot::Receiver<()>>,
122    auth: &access_control::AccessControl,
123    shared_arc: Arc<RwLock<ChangeTracker<T>>>,
124    event_name: Option<String>,
125    rx_conn: mpsc::Receiver<NewEventStreamConnection>,
126    bui_server: BuiService<CB>,
127) -> Result<(mpsc::Receiver<ConnectionEvent>, BuiAppInner<T, CB>), Error>
128where
129    T: Clone + Serialize + 'static + Send + Sync,
130    CB: serde::de::DeserializeOwned + Clone + Send + 'static,
131{
132    let (quit_trigger, valve) = stream_cancel::Valve::new();
133    let rx_conn = tokio_stream::wrappers::ReceiverStream::new(rx_conn);
134
135    let mut rx_conn_valve = valve.wrap(rx_conn);
136
137    if let Some(shutdown_rx) = shutdown_rx.take() {
138        handle.spawn(async move {
139            shutdown_rx.await.unwrap();
140            // Cancel the stream. (The receiver will receive end-of-stream.)
141            quit_trigger.cancel();
142        });
143    } else {
144        // Allow dropping `quit_trigger` without canceling stream.
145        quit_trigger.disable();
146    }
147
148    let bui_server2 = bui_server.clone();
149
150    let addr = auth.bind_addr();
151    let listener = tokio::net::TcpListener::bind(addr).await?;
152
153    let local_addr = listener.local_addr()?;
154    let handle2 = handle.clone();
155
156    handle.spawn(async move {
157        loop {
158            let (socket, _remote_addr) = listener.accept().await.unwrap();
159            let bui_server = bui_server2.clone();
160
161            // Spawn a task to handle the connection. That way we can multiple connections
162            // concurrently.
163            handle2.spawn(async move {
164                // Hyper has its own `AsyncRead` and `AsyncWrite` traits and doesn't use tokio.
165                // `TokioIo` converts between them.
166                let socket = hyper_util::rt::TokioIo::new(socket);
167                let bui_server = bui_server.clone();
168
169                let hyper_service = hyper::service::service_fn(
170                    move |request: hyper::Request<hyper::body::Incoming>| {
171                        use hyper::service::Service;
172                        // Do we need to call `poll_ready`????
173                        bui_server.call(request)
174                    },
175                );
176
177                // `server::conn::auto::Builder` supports both http1 and http2.
178                //
179                // `TokioExecutor` tells hyper to use `tokio::spawn` to spawn tasks.
180                if let Err(err) = hyper_util::server::conn::auto::Builder::new(
181                    hyper_util::rt::TokioExecutor::new(),
182                )
183                // `serve_connection_with_upgrades` is required for websockets. If you don't need
184                // that you can use `serve_connection` instead.
185                .serve_connection_with_upgrades(socket, hyper_service)
186                .await
187                {
188                    eprintln!("failed to serve connection: {err:#}");
189                }
190            });
191        }
192    });
193
194    let inner = BuiAppInner {
195        i_shared_arc: shared_arc,
196        i_txers: Arc::new(RwLock::new(HashMap::new())),
197        i_bui_server: bui_server,
198        auth: auth.clone(),
199        local_addr,
200    };
201
202    // --- handle connections
203    let (new_conn_tx, new_conn_rx) = mpsc::channel(5); // TODO chan_size
204
205    let shared_arc = inner.i_shared_arc.clone();
206    let txers2 = inner.i_txers.clone();
207    let new_conn_tx2 = new_conn_tx.clone();
208    let event_name2: Option<String> = event_name.clone();
209
210    let handle_connections_fut = async move {
211        while let Some(conn_info) = futures::StreamExt::next(&mut rx_conn_valve).await {
212            let chunk_sender = conn_info.chunk_sender;
213            let chunk_sender: EventChunkSender = chunk_sender; // type annotation only
214            let ckey = conn_info.session_key;
215            let connection_key = conn_info.connection_key;
216
217            // send current value on initial connect
218            let hc: hyper::body::Bytes = {
219                let shared = shared_arc.write();
220                create_event_source_msg(&shared.as_ref(), event_name2.as_deref()).into()
221            };
222
223            let typ = ConnectionEventType::Connect(chunk_sender.clone());
224            let session_key = ckey;
225            let path = conn_info.path.clone();
226            let path2 = conn_info.path.clone();
227
228            match new_conn_tx2
229                .send(ConnectionEvent {
230                    typ,
231                    session_key,
232                    connection_key,
233                    path,
234                })
235                .await
236            {
237                Ok(()) => {}
238                Err(e) => {
239                    info!(
240                        "failed sending ConnectionEvent. probably no listener. {:?}",
241                        e
242                    );
243                }
244            };
245
246            match chunk_sender.send(hc).await {
247                Ok(()) => {
248                    let mut txer_guard = txers2.write();
249                    txer_guard.insert(connection_key, (ckey, chunk_sender, path2));
250                }
251                Err(e) => {
252                    error!("failed to send value on initial connect: {:?}", e);
253                }
254            }
255        }
256    };
257
258    handle.spawn(Box::pin(handle_connections_fut));
259
260    // --- push changes
261
262    let shared_store2 = inner.i_shared_arc.clone();
263    let txers = inner.i_txers.clone();
264    // Create a Stream to handle updates to our shared store.
265    let change_listener = {
266        let mut rx = {
267            let shared = shared_store2.write();
268            shared.get_changes(10) // capacity of channel is 10 changes
269        };
270        async move {
271            while let Some((_old, new_value)) = futures::StreamExt::next(&mut rx).await {
272                // We need to hold the loc on txers only briefly, so we do this.
273                let sources_drain = {
274                    let mut sources = txers.write();
275                    sources.drain().collect::<Vec<_>>()
276                };
277
278                let mut restore = vec![];
279
280                let event_source_msg = create_event_source_msg(&new_value, event_name.as_deref());
281
282                for (connection_key, (session_key, tx, path)) in sources_drain {
283                    let chunk = event_source_msg.clone().into();
284                    match tx.send(chunk).await {
285                        Ok(()) => {
286                            restore.push((connection_key, (session_key, tx, path)));
287                        }
288                        Err(e) => {
289                            info!(
290                                "Failed to send data to event stream, client \
291                                    probably disconnected. {:?}",
292                                e
293                            );
294                            let nct = new_conn_tx.clone();
295                            let typ = ConnectionEventType::Disconnect;
296                            let ce = ConnectionEvent {
297                                typ,
298                                session_key,
299                                connection_key,
300                                path,
301                            };
302                            match nct.send(ce).await {
303                                Ok(()) => {}
304                                Err(e) => {
305                                    info!(
306                                        "Failed to send ConnectionEvent, \
307                                    probably no listener. {:?}",
308                                        e
309                                    );
310                                }
311                            };
312                        }
313                    };
314                }
315                for (connection_key, element) in restore.into_iter() {
316                    let mut sources = txers.write();
317                    sources.insert(connection_key, element);
318                }
319            }
320        }
321    };
322    handle.spawn(Box::pin(change_listener));
323
324    Ok((new_conn_rx, inner))
325}
326
327fn create_event_source_msg<T: serde::Serialize>(value: &T, event_name: Option<&str>) -> String {
328    let buf = serde_json::to_string(&value).expect("encode");
329    if let Some(event_name) = event_name {
330        format!("event: {}\ndata: {}\n\n", event_name, buf)
331    } else {
332        format!("data: {}\n\n", buf)
333    }
334}