use crate::lowlevel::{BuiService, EventChunkSender};
use bui_backend_types::{ConnectionKey, SessionKey};
use async_change_tracker::ChangeTracker;
use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::mpsc;
use parking_lot::RwLock;
use uuid::Uuid;
use serde::Serialize;
use bui_backend_types::AccessToken;
use crate::access_control;
use crate::lowlevel::NewEventStreamConnection;
use crate::Error;
#[derive(Debug)]
pub enum ConnectionEventType {
Connect(EventChunkSender),
Disconnect,
}
#[derive(Debug)]
pub struct ConnectionEvent {
pub typ: ConnectionEventType,
pub session_key: SessionKey,
pub connection_key: ConnectionKey,
pub path: String,
}
pub struct BuiAppInner<T, CB> {
i_shared_arc: Arc<RwLock<ChangeTracker<T>>>,
i_txers: Arc<RwLock<HashMap<ConnectionKey, (SessionKey, EventChunkSender, String)>>>,
i_bui_server: BuiService<CB>,
auth: access_control::AccessControl,
local_addr: std::net::SocketAddr,
}
impl<'a, T, CB> BuiAppInner<T, CB> {
pub fn shared_arc(&self) -> &Arc<RwLock<ChangeTracker<T>>> {
&self.i_shared_arc
}
pub fn bui_service(&self) -> &BuiService<CB> {
&self.i_bui_server
}
pub fn local_addr(&self) -> &std::net::SocketAddr {
&self.local_addr
}
pub fn token(&self) -> AccessToken {
self.auth.token()
}
pub fn guess_url_with_token(&self) -> String {
match self.auth.token() {
AccessToken::NoToken => format!("http://{}", self.local_addr),
AccessToken::PreSharedToken(ref tok) => {
format!("http://{}/?token={}", self.local_addr, tok)
}
}
}
}
pub fn generate_valid_token() -> String {
let my_uuid = Uuid::new_v4();
format!("{}", my_uuid)
}
pub fn generate_random_auth(
addr: std::net::SocketAddr,
secret: Vec<u8>,
) -> Result<access_control::AccessControl, Error> {
generate_auth_with_token(addr, secret, generate_valid_token())
}
pub fn generate_auth_with_token(
addr: std::net::SocketAddr,
secret: Vec<u8>,
token: String,
) -> Result<access_control::AccessControl, Error> {
let access_token = AccessToken::PreSharedToken(token);
let info = access_control::AccessInfo::new(addr, access_token, secret)?;
Ok(access_control::AccessControl::WithToken(info))
}
pub async fn create_bui_app_inner<'a, T, CB>(
handle: tokio::runtime::Handle,
mut shutdown_rx: Option<tokio::sync::oneshot::Receiver<()>>,
auth: &access_control::AccessControl,
shared_arc: Arc<RwLock<ChangeTracker<T>>>,
event_name: Option<String>,
rx_conn: mpsc::Receiver<NewEventStreamConnection>,
bui_server: BuiService<CB>,
) -> Result<(mpsc::Receiver<ConnectionEvent>, BuiAppInner<T, CB>), Error>
where
T: Clone + Serialize + 'static + Send + Sync,
CB: serde::de::DeserializeOwned + Clone + Send + 'static,
{
let (quit_trigger, valve) = stream_cancel::Valve::new();
let rx_conn = tokio_stream::wrappers::ReceiverStream::new(rx_conn);
let mut rx_conn_valve = valve.wrap(rx_conn);
if let Some(shutdown_rx) = shutdown_rx.take() {
handle.spawn(async move {
shutdown_rx.await.unwrap();
quit_trigger.cancel();
});
} else {
quit_trigger.disable();
}
let bui_server2 = bui_server.clone();
let addr = auth.bind_addr();
let listener = tokio::net::TcpListener::bind(addr).await?;
let local_addr = listener.local_addr()?;
let handle2 = handle.clone();
handle.spawn(async move {
loop {
let (socket, _remote_addr) = listener.accept().await.unwrap();
let bui_server = bui_server2.clone();
handle2.spawn(async move {
let socket = hyper_util::rt::TokioIo::new(socket);
let bui_server = bui_server.clone();
let hyper_service = hyper::service::service_fn(
move |request: hyper::Request<hyper::body::Incoming>| {
use hyper::service::Service;
bui_server.call(request)
},
);
if let Err(err) = hyper_util::server::conn::auto::Builder::new(
hyper_util::rt::TokioExecutor::new(),
)
.serve_connection_with_upgrades(socket, hyper_service)
.await
{
eprintln!("failed to serve connection: {err:#}");
}
});
}
});
let inner = BuiAppInner {
i_shared_arc: shared_arc,
i_txers: Arc::new(RwLock::new(HashMap::new())),
i_bui_server: bui_server,
auth: auth.clone(),
local_addr,
};
let (new_conn_tx, new_conn_rx) = mpsc::channel(5);
let shared_arc = inner.i_shared_arc.clone();
let txers2 = inner.i_txers.clone();
let new_conn_tx2 = new_conn_tx.clone();
let event_name2: Option<String> = event_name.clone();
let handle_connections_fut = async move {
while let Some(conn_info) = futures::StreamExt::next(&mut rx_conn_valve).await {
let chunk_sender = conn_info.chunk_sender;
let chunk_sender: EventChunkSender = chunk_sender; let ckey = conn_info.session_key;
let connection_key = conn_info.connection_key;
let hc: hyper::body::Bytes = {
let shared = shared_arc.write();
create_event_source_msg(&shared.as_ref(), event_name2.as_deref()).into()
};
let typ = ConnectionEventType::Connect(chunk_sender.clone());
let session_key = ckey;
let path = conn_info.path.clone();
let path2 = conn_info.path.clone();
match new_conn_tx2
.send(ConnectionEvent {
typ,
session_key,
connection_key,
path,
})
.await
{
Ok(()) => {}
Err(e) => {
info!(
"failed sending ConnectionEvent. probably no listener. {:?}",
e
);
}
};
match chunk_sender.send(hc).await {
Ok(()) => {
let mut txer_guard = txers2.write();
txer_guard.insert(connection_key, (ckey, chunk_sender, path2));
}
Err(e) => {
error!("failed to send value on initial connect: {:?}", e);
}
}
}
};
handle.spawn(Box::pin(handle_connections_fut));
let shared_store2 = inner.i_shared_arc.clone();
let txers = inner.i_txers.clone();
let change_listener = {
let mut rx = {
let shared = shared_store2.write();
shared.get_changes(10) };
async move {
while let Some((_old, new_value)) = futures::StreamExt::next(&mut rx).await {
let sources_drain = {
let mut sources = txers.write();
sources.drain().collect::<Vec<_>>()
};
let mut restore = vec![];
let event_source_msg = create_event_source_msg(&new_value, event_name.as_deref());
for (connection_key, (session_key, tx, path)) in sources_drain {
let chunk = event_source_msg.clone().into();
match tx.send(chunk).await {
Ok(()) => {
restore.push((connection_key, (session_key, tx, path)));
}
Err(e) => {
info!(
"Failed to send data to event stream, client \
probably disconnected. {:?}",
e
);
let nct = new_conn_tx.clone();
let typ = ConnectionEventType::Disconnect;
let ce = ConnectionEvent {
typ,
session_key,
connection_key,
path,
};
match nct.send(ce).await {
Ok(()) => {}
Err(e) => {
info!(
"Failed to send ConnectionEvent, \
probably no listener. {:?}",
e
);
}
};
}
};
}
for (connection_key, element) in restore.into_iter() {
let mut sources = txers.write();
sources.insert(connection_key, element);
}
}
}
};
handle.spawn(Box::pin(change_listener));
Ok((new_conn_rx, inner))
}
fn create_event_source_msg<T: serde::Serialize>(value: &T, event_name: Option<&str>) -> String {
let buf = serde_json::to_string(&value).expect("encode");
if let Some(event_name) = event_name {
format!("event: {}\ndata: {}\n\n", event_name, buf)
} else {
format!("data: {}\n\n", buf)
}
}