use crate::server::config::ServerConfig;
use crate::common::config_types::TransportProtocol;
use crate::common::error::Result;
use crate::common::protocol::Frame;
use crate::server::handle::ServerHandle;
use super::{Server, ConnectionHandler};
use super::server_core::ServerCore;
use async_trait::async_trait;
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
use tokio::sync::Mutex;
use tracing::error;
use super::websocket::WebSocketServer;
use super::quic::QUICServer;
pub struct HybridServer {
servers: Vec<Arc<Mutex<Box<dyn Server>>>>,
protocols: Vec<TransportProtocol>,
is_running: Arc<AtomicBool>,
core: Option<Arc<ServerCore>>,
config: ServerConfig,
}
impl HybridServer {
pub fn new(config: ServerConfig, handler: Arc<dyn ConnectionHandler>) -> Result<Self> {
Self::with_connection_manager(config, handler, None, None, None, None)
}
pub fn with_connection_manager(
config: ServerConfig,
handler: Arc<dyn ConnectionHandler>,
connection_manager: Option<Arc<crate::server::connection::ConnectionManager>>,
device_manager: Option<Arc<crate::server::device::DeviceManager>>,
event_handler: Option<Arc<dyn crate::server::events::handler::ServerEventHandler>>,
authenticator: Option<Arc<dyn crate::server::auth::Authenticator>>,
) -> Result<Self> {
let mut core = ServerCore::new(&config, connection_manager.clone());
let final_device_manager = if let Some(dm) = device_manager {
Some(dm)
} else if config.device_conflict_strategy != crate::common::device::DeviceConflictStrategy::AllowAll {
Some(Arc::new(crate::server::device::DeviceManager::new(
config.device_conflict_strategy.clone(),
)))
} else {
None
};
core = core.with_device_manager(final_device_manager)
.with_event_handler(event_handler)
.with_authenticator(authenticator);
let shared_core = Arc::new(core);
let protocols = config.get_protocols();
let mut servers = Vec::new();
for protocol in &protocols {
let mut server_config = config.clone();
server_config.transport = *protocol;
server_config.transports = None;
let bind_address = config.get_protocol_address(protocol);
server_config.bind_address = bind_address;
let server: Box<dyn Server> = match protocol {
TransportProtocol::WebSocket => {
Box::new(WebSocketServer::with_shared_core(
server_config,
Arc::clone(&handler),
shared_core.clone(),
))
}
TransportProtocol::QUIC => {
Box::new(QUICServer::with_shared_core(
server_config,
Arc::clone(&handler),
shared_core.clone(),
)?)
}
TransportProtocol::TCP => {
return Err(crate::common::error::FlareError::protocol_error(
"TCP transport not yet implemented".to_string()
));
}
};
servers.push(Arc::new(Mutex::new(server)));
}
Ok(Self {
servers,
protocols,
is_running: Arc::new(AtomicBool::new(false)),
core: Some(shared_core),
config,
})
}
pub fn protocols(&self) -> &[TransportProtocol] {
&self.protocols
}
pub fn core(&self) -> Option<&Arc<ServerCore>> {
self.core.as_ref()
}
pub fn core_mut(&mut self) -> Option<&mut Arc<ServerCore>> {
self.core.as_mut()
}
}
#[async_trait::async_trait]
impl Server for HybridServer {
async fn start(&mut self) -> Result<()> {
if let Some(ref mut core) = self.core {
core.start_heartbeat(&self.config);
}
let mut started_count = 0;
let mut errors = Vec::new();
for server in &self.servers {
let mut s = server.lock().await;
match s.start().await {
Ok(_) => {
started_count += 1;
}
Err(e) => {
error!("Failed to start server: {:?}", e);
errors.push(e);
}
}
}
if started_count == 0 && !errors.is_empty() {
self.is_running.store(false, Ordering::SeqCst);
return Err(errors.remove(0));
}
if started_count > 0 {
self.is_running.store(true, Ordering::SeqCst);
}
Ok(())
}
async fn stop(&mut self) -> Result<()> {
self.is_running.store(false, Ordering::SeqCst);
if let Some(ref mut core) = self.core {
core.stop_heartbeat();
}
for server in &self.servers {
let mut s = server.lock().await;
if let Err(e) = s.stop().await {
error!("Failed to stop server: {:?}", e);
}
}
Ok(())
}
fn is_running(&self) -> bool {
self.is_running.load(Ordering::SeqCst)
}
}
#[async_trait]
impl ServerHandle for HybridServer {
async fn send_to(&self, connection_id: &str, frame: &Frame) -> Result<()> {
if let Some(ref core) = self.core {
return ServerHandle::send_to(&**core, connection_id, frame).await;
}
Err(crate::common::error::FlareError::protocol_error(
"ServerCore not initialized".to_string()
))
}
async fn send_to_user(&self, user_id: &str, frame: &Frame) -> Result<()> {
if let Some(ref core) = self.core {
return ServerHandle::send_to_user(&**core, user_id, frame).await;
}
Err(crate::common::error::FlareError::protocol_error(
"ServerCore not initialized".to_string()
))
}
async fn broadcast(&self, frame: &Frame) -> Result<()> {
if let Some(ref core) = self.core {
return ServerHandle::broadcast(&**core, frame).await;
}
Err(crate::common::error::FlareError::protocol_error(
"ServerCore not initialized".to_string()
))
}
async fn broadcast_except(&self, frame: &Frame, exclude_connection_id: &str) -> Result<()> {
if let Some(ref core) = self.core {
return ServerHandle::broadcast_except(&**core, frame, exclude_connection_id).await;
}
Err(crate::common::error::FlareError::protocol_error(
"ServerCore not initialized".to_string()
))
}
async fn disconnect(&self, connection_id: &str) -> Result<()> {
if let Some(ref core) = self.core {
return ServerHandle::disconnect(&**core, connection_id).await;
}
Err(crate::common::error::FlareError::protocol_error(
"ServerCore not initialized".to_string()
))
}
fn connection_count(&self) -> usize {
if let Some(ref core) = self.core {
return ServerHandle::connection_count(&**core);
}
0
}
fn user_count(&self) -> usize {
if let Some(ref core) = self.core {
return ServerHandle::user_count(&**core);
}
0
}
}