use std::sync::Arc;
use epics_base_rs::server::database::PvDatabase;
use epics_ca_rs::server::{CaServer, ServerConnectionEvent};
use tokio::sync::Mutex;
use tokio::sync::broadcast;
use crate::error::BridgeResult;
pub struct DownstreamServer {
server: Mutex<Option<CaServer>>,
shadow_db: Arc<PvDatabase>,
}
impl DownstreamServer {
pub fn new(shadow_db: Arc<PvDatabase>, port: u16) -> Self {
let server = CaServer::from_parts(shadow_db.clone(), port, None, None, None);
Self {
server: Mutex::new(Some(server)),
shadow_db,
}
}
#[cfg(feature = "ca-gateway-tls")]
pub fn new_with_tls(
shadow_db: Arc<PvDatabase>,
port: u16,
tls: std::sync::Arc<epics_ca_rs::tls::ServerConfig>,
) -> Self {
let mut server = CaServer::from_parts(shadow_db.clone(), port, None, None, None);
server.set_tls(tls);
Self {
server: Mutex::new(Some(server)),
shadow_db,
}
}
pub fn database(&self) -> &Arc<PvDatabase> {
&self.shadow_db
}
pub async fn connection_events(&self) -> Option<broadcast::Receiver<ServerConnectionEvent>> {
let mut guard = self.server.lock().await;
guard.as_mut().map(|s| s.connection_events())
}
pub async fn beacon_anomaly_handle(&self) -> Option<Arc<tokio::sync::Notify>> {
let guard = self.server.lock().await;
guard.as_ref().map(|s| s.beacon_anomaly_handle())
}
pub async fn run(&self) -> BridgeResult<()> {
let server = {
let mut guard = self.server.lock().await;
match guard.take() {
Some(s) => s,
None => {
return Err(crate::error::BridgeError::PutRejected(
"DownstreamServer already running or consumed".into(),
));
}
}
};
server
.run()
.await
.map_err(|e| crate::error::BridgeError::PutRejected(format!("CaServer run: {e}")))
}
pub async fn reinstall(&self, server: CaServer) -> Option<CaServer> {
let mut guard = self.server.lock().await;
let prev = guard.take();
*guard = Some(server);
prev
}
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn construct_downstream() {
let db = Arc::new(PvDatabase::new());
let downstream = DownstreamServer::new(db.clone(), 0);
assert!(Arc::ptr_eq(downstream.database(), &db));
}
#[tokio::test]
async fn connection_events_subscribe() {
let db = Arc::new(PvDatabase::new());
let downstream = DownstreamServer::new(db, 0);
let rx = downstream.connection_events().await;
assert!(rx.is_some(), "expected receiver");
}
}