1use crate::lowlevel::{BuiService, EventChunkSender};
3use bui_backend_types::{ConnectionKey, SessionKey};
4
5use async_change_tracker::ChangeTracker;
6
7use std::collections::HashMap;
8use std::sync::Arc;
9
10use tokio::sync::mpsc;
11
12use parking_lot::RwLock;
13use uuid::Uuid;
14
15use serde::Serialize;
16
17use bui_backend_types::AccessToken;
18
19use crate::access_control;
20use crate::lowlevel::NewEventStreamConnection;
21use crate::Error;
22
23#[derive(Debug)]
27pub enum ConnectionEventType {
28 Connect(EventChunkSender),
30 Disconnect,
32}
33
34#[derive(Debug)]
36pub struct ConnectionEvent {
37 pub typ: ConnectionEventType,
39 pub session_key: SessionKey,
41 pub connection_key: ConnectionKey,
43 pub path: String,
45}
46
47pub struct BuiAppInner<T, CB> {
51 i_shared_arc: Arc<RwLock<ChangeTracker<T>>>,
52 i_txers: Arc<RwLock<HashMap<ConnectionKey, (SessionKey, EventChunkSender, String)>>>,
53 i_bui_server: BuiService<CB>,
54 auth: access_control::AccessControl,
55 local_addr: std::net::SocketAddr,
56}
57
58impl<'a, T, CB> BuiAppInner<T, CB> {
59 pub fn shared_arc(&self) -> &Arc<RwLock<ChangeTracker<T>>> {
61 &self.i_shared_arc
62 }
63
64 pub fn bui_service(&self) -> &BuiService<CB> {
66 &self.i_bui_server
67 }
68
69 pub fn local_addr(&self) -> &std::net::SocketAddr {
71 &self.local_addr
72 }
73
74 pub fn token(&self) -> AccessToken {
76 self.auth.token()
77 }
78
79 pub fn guess_url_with_token(&self) -> String {
84 match self.auth.token() {
85 AccessToken::NoToken => format!("http://{}", self.local_addr),
86 AccessToken::PreSharedToken(ref tok) => {
87 format!("http://{}/?token={}", self.local_addr, tok)
88 }
89 }
90 }
91}
92
93pub fn generate_valid_token() -> String {
95 let my_uuid = Uuid::new_v4();
96 format!("{}", my_uuid)
97}
98
99pub fn generate_random_auth(
101 addr: std::net::SocketAddr,
102 secret: Vec<u8>,
103) -> Result<access_control::AccessControl, Error> {
104 generate_auth_with_token(addr, secret, generate_valid_token())
105}
106
107pub fn generate_auth_with_token(
109 addr: std::net::SocketAddr,
110 secret: Vec<u8>,
111 token: String,
112) -> Result<access_control::AccessControl, Error> {
113 let access_token = AccessToken::PreSharedToken(token);
114 let info = access_control::AccessInfo::new(addr, access_token, secret)?;
115 Ok(access_control::AccessControl::WithToken(info))
116}
117
118pub async fn create_bui_app_inner<'a, T, CB>(
120 handle: tokio::runtime::Handle,
121 mut shutdown_rx: Option<tokio::sync::oneshot::Receiver<()>>,
122 auth: &access_control::AccessControl,
123 shared_arc: Arc<RwLock<ChangeTracker<T>>>,
124 event_name: Option<String>,
125 rx_conn: mpsc::Receiver<NewEventStreamConnection>,
126 bui_server: BuiService<CB>,
127) -> Result<(mpsc::Receiver<ConnectionEvent>, BuiAppInner<T, CB>), Error>
128where
129 T: Clone + Serialize + 'static + Send + Sync,
130 CB: serde::de::DeserializeOwned + Clone + Send + 'static,
131{
132 let (quit_trigger, valve) = stream_cancel::Valve::new();
133 let rx_conn = tokio_stream::wrappers::ReceiverStream::new(rx_conn);
134
135 let mut rx_conn_valve = valve.wrap(rx_conn);
136
137 if let Some(shutdown_rx) = shutdown_rx.take() {
138 handle.spawn(async move {
139 shutdown_rx.await.unwrap();
140 quit_trigger.cancel();
142 });
143 } else {
144 quit_trigger.disable();
146 }
147
148 let bui_server2 = bui_server.clone();
149
150 let addr = auth.bind_addr();
151 let listener = tokio::net::TcpListener::bind(addr).await?;
152
153 let local_addr = listener.local_addr()?;
154 let handle2 = handle.clone();
155
156 handle.spawn(async move {
157 loop {
158 let (socket, _remote_addr) = listener.accept().await.unwrap();
159 let bui_server = bui_server2.clone();
160
161 handle2.spawn(async move {
164 let socket = hyper_util::rt::TokioIo::new(socket);
167 let bui_server = bui_server.clone();
168
169 let hyper_service = hyper::service::service_fn(
170 move |request: hyper::Request<hyper::body::Incoming>| {
171 use hyper::service::Service;
172 bui_server.call(request)
174 },
175 );
176
177 if let Err(err) = hyper_util::server::conn::auto::Builder::new(
181 hyper_util::rt::TokioExecutor::new(),
182 )
183 .serve_connection_with_upgrades(socket, hyper_service)
186 .await
187 {
188 eprintln!("failed to serve connection: {err:#}");
189 }
190 });
191 }
192 });
193
194 let inner = BuiAppInner {
195 i_shared_arc: shared_arc,
196 i_txers: Arc::new(RwLock::new(HashMap::new())),
197 i_bui_server: bui_server,
198 auth: auth.clone(),
199 local_addr,
200 };
201
202 let (new_conn_tx, new_conn_rx) = mpsc::channel(5); let shared_arc = inner.i_shared_arc.clone();
206 let txers2 = inner.i_txers.clone();
207 let new_conn_tx2 = new_conn_tx.clone();
208 let event_name2: Option<String> = event_name.clone();
209
210 let handle_connections_fut = async move {
211 while let Some(conn_info) = futures::StreamExt::next(&mut rx_conn_valve).await {
212 let chunk_sender = conn_info.chunk_sender;
213 let chunk_sender: EventChunkSender = chunk_sender; let ckey = conn_info.session_key;
215 let connection_key = conn_info.connection_key;
216
217 let hc: hyper::body::Bytes = {
219 let shared = shared_arc.write();
220 create_event_source_msg(&shared.as_ref(), event_name2.as_deref()).into()
221 };
222
223 let typ = ConnectionEventType::Connect(chunk_sender.clone());
224 let session_key = ckey;
225 let path = conn_info.path.clone();
226 let path2 = conn_info.path.clone();
227
228 match new_conn_tx2
229 .send(ConnectionEvent {
230 typ,
231 session_key,
232 connection_key,
233 path,
234 })
235 .await
236 {
237 Ok(()) => {}
238 Err(e) => {
239 info!(
240 "failed sending ConnectionEvent. probably no listener. {:?}",
241 e
242 );
243 }
244 };
245
246 match chunk_sender.send(hc).await {
247 Ok(()) => {
248 let mut txer_guard = txers2.write();
249 txer_guard.insert(connection_key, (ckey, chunk_sender, path2));
250 }
251 Err(e) => {
252 error!("failed to send value on initial connect: {:?}", e);
253 }
254 }
255 }
256 };
257
258 handle.spawn(Box::pin(handle_connections_fut));
259
260 let shared_store2 = inner.i_shared_arc.clone();
263 let txers = inner.i_txers.clone();
264 let change_listener = {
266 let mut rx = {
267 let shared = shared_store2.write();
268 shared.get_changes(10) };
270 async move {
271 while let Some((_old, new_value)) = futures::StreamExt::next(&mut rx).await {
272 let sources_drain = {
274 let mut sources = txers.write();
275 sources.drain().collect::<Vec<_>>()
276 };
277
278 let mut restore = vec![];
279
280 let event_source_msg = create_event_source_msg(&new_value, event_name.as_deref());
281
282 for (connection_key, (session_key, tx, path)) in sources_drain {
283 let chunk = event_source_msg.clone().into();
284 match tx.send(chunk).await {
285 Ok(()) => {
286 restore.push((connection_key, (session_key, tx, path)));
287 }
288 Err(e) => {
289 info!(
290 "Failed to send data to event stream, client \
291 probably disconnected. {:?}",
292 e
293 );
294 let nct = new_conn_tx.clone();
295 let typ = ConnectionEventType::Disconnect;
296 let ce = ConnectionEvent {
297 typ,
298 session_key,
299 connection_key,
300 path,
301 };
302 match nct.send(ce).await {
303 Ok(()) => {}
304 Err(e) => {
305 info!(
306 "Failed to send ConnectionEvent, \
307 probably no listener. {:?}",
308 e
309 );
310 }
311 };
312 }
313 };
314 }
315 for (connection_key, element) in restore.into_iter() {
316 let mut sources = txers.write();
317 sources.insert(connection_key, element);
318 }
319 }
320 }
321 };
322 handle.spawn(Box::pin(change_listener));
323
324 Ok((new_conn_rx, inner))
325}
326
327fn create_event_source_msg<T: serde::Serialize>(value: &T, event_name: Option<&str>) -> String {
328 let buf = serde_json::to_string(&value).expect("encode");
329 if let Some(event_name) = event_name {
330 format!("event: {}\ndata: {}\n\n", event_name, buf)
331 } else {
332 format!("data: {}\n\n", buf)
333 }
334}