use crate::common::error::Result;
use crate::common::protocol::Frame;
use crate::server::{ServerConfig, ConnectionHandler, HybridServer, Server};
use crate::server::connection::ConnectionManager;
use std::sync::Arc;
use tokio::sync::Mutex;
pub struct ObserverServerBuilder {
config: ServerConfig,
handler: Option<Arc<dyn ConnectionHandler>>,
connection_manager: Option<Arc<ConnectionManager>>,
device_manager: Option<Arc<crate::server::device::DeviceManager>>,
event_handler: Option<Arc<dyn crate::server::events::handler::ServerEventHandler>>,
authenticator: Option<Arc<dyn crate::server::auth::Authenticator>>,
}
impl ObserverServerBuilder {
pub fn new(bind_address: impl Into<String>) -> Self {
Self {
config: ServerConfig::new(bind_address.into()),
handler: None,
connection_manager: None,
device_manager: None,
event_handler: None,
authenticator: None,
}
}
pub fn with_authenticator(mut self, authenticator: Arc<dyn crate::server::auth::Authenticator>) -> Self {
self.authenticator = Some(authenticator);
self
}
pub fn enable_auth(mut self) -> Self {
self.config = self.config.enable_auth();
self
}
pub fn with_auth_timeout(mut self, timeout: std::time::Duration) -> Self {
self.config = self.config.with_auth_timeout(timeout);
self
}
pub fn with_device_manager(mut self, device_manager: Arc<crate::server::device::DeviceManager>) -> Self {
self.device_manager = Some(device_manager);
self
}
pub fn with_event_handler(mut self, event_handler: Arc<dyn crate::server::events::handler::ServerEventHandler>) -> Self {
self.event_handler = Some(event_handler);
self
}
pub fn with_handler(mut self, handler: Arc<dyn ConnectionHandler>) -> Self {
self.handler = Some(handler);
self
}
pub fn with_connection_manager(mut self, manager: Arc<ConnectionManager>) -> Self {
self.connection_manager = Some(manager);
self
}
pub fn with_protocol(mut self, protocol: crate::common::config_types::TransportProtocol) -> Self {
self.config.transport = protocol;
self
}
pub fn with_protocols(mut self, protocols: Vec<crate::common::config_types::TransportProtocol>) -> Self {
self.config = self.config.with_protocols(protocols);
self
}
pub fn with_protocol_address(mut self, protocol: crate::common::config_types::TransportProtocol, address: String) -> Self {
self.config = self.config.with_protocol_address(protocol, address);
self
}
pub fn with_max_connections(mut self, max: usize) -> Self {
self.config = self.config.with_max_connections(max);
self
}
pub fn with_heartbeat(mut self, heartbeat: crate::common::config_types::HeartbeatConfig) -> Self {
self.config = self.config.with_heartbeat(heartbeat);
self
}
pub fn with_tls(mut self, tls: crate::common::config_types::TlsConfig) -> Self {
self.config = self.config.with_tls(tls);
self
}
pub fn with_default_format(mut self, format: crate::common::protocol::SerializationFormat) -> Self {
self.config = self.config.with_format(format);
self
}
pub fn with_default_compression(mut self, compression: crate::common::compression::CompressionAlgorithm) -> Self {
self.config = self.config.with_compression(compression);
self
}
pub fn build(self) -> Result<ObserverServer> {
let handler = self.handler.ok_or_else(|| {
crate::common::error::FlareError::general_error("Handler is required")
})?;
let server = if let Some(manager) = self.connection_manager {
HybridServer::with_connection_manager(
self.config,
handler,
Some(manager),
self.device_manager,
self.event_handler,
self.authenticator,
)?
} else {
HybridServer::with_connection_manager(
self.config,
handler,
None,
self.device_manager,
self.event_handler,
self.authenticator,
)?
};
Ok(ObserverServer {
server: Arc::new(Mutex::new(server)),
})
}
}
pub struct ObserverServer {
server: Arc<Mutex<HybridServer>>,
}
impl ObserverServer {
pub async fn start(&mut self) -> Result<()> {
let mut s = self.server.lock().await;
s.start().await
}
pub async fn stop(&mut self) -> Result<()> {
let mut s = self.server.lock().await;
s.stop().await
}
pub fn is_running(&self) -> bool {
tokio::task::block_in_place(|| {
let s = self.server.blocking_lock();
s.is_running()
})
}
pub fn connection_count(&self) -> usize {
tokio::task::block_in_place(|| {
let s = self.server.blocking_lock();
crate::server::handle::ServerHandle::connection_count(&*s)
})
}
pub fn user_count(&self) -> usize {
tokio::task::block_in_place(|| {
let s = self.server.blocking_lock();
crate::server::handle::ServerHandle::user_count(&*s)
})
}
pub async fn send_to(&self, connection_id: &str, frame: &Frame) -> Result<()> {
let s = self.server.lock().await;
crate::server::handle::ServerHandle::send_to(&*s, connection_id, frame).await
}
pub async fn send_to_user(&self, user_id: &str, frame: &Frame) -> Result<()> {
let s = self.server.lock().await;
crate::server::handle::ServerHandle::send_to_user(&*s, user_id, frame).await
}
pub async fn broadcast(&self, frame: &Frame) -> Result<()> {
let s = self.server.lock().await;
crate::server::handle::ServerHandle::broadcast(&*s, frame).await
}
pub async fn broadcast_except(&self, frame: &Frame, exclude_connection_id: &str) -> Result<()> {
let s = self.server.lock().await;
crate::server::handle::ServerHandle::broadcast_except(&*s, frame, exclude_connection_id).await
}
pub async fn disconnect(&self, connection_id: &str) -> Result<()> {
let s = self.server.lock().await;
crate::server::handle::ServerHandle::disconnect(&*s, connection_id).await
}
pub fn protocols(&self) -> Vec<crate::common::config_types::TransportProtocol> {
tokio::task::block_in_place(|| {
let s = self.server.blocking_lock();
s.protocols().to_vec()
})
}
pub fn get_server_handle_components(&self) -> Option<Arc<dyn crate::server::connection::ConnectionManagerTrait>> {
tokio::task::block_in_place(|| {
let s = self.server.blocking_lock();
if let Some(core) = s.core() {
Some(core.connection_manager_trait())
} else {
None
}
})
}
}