use crate::server::config::ServerConfig;
use tracing::debug;
use crate::server::connection::ConnectionManager;
use crate::common::error::Result;
use crate::server::transports::{Server, ConnectionHandler};
use crate::server::transports::server_core::ServerCore;
use crate::server::handle::ServerHandle;
use crate::common::{generate_id};
use crate::transport::connection::Connection;
use crate::transport::websocket::WebSocketTransport;
use async_trait::async_trait;
use std::sync::Arc;
use tokio::net::{TcpListener, TcpStream};
use tokio::sync::Mutex;
use tokio_tungstenite::accept_async;
pub struct WebSocketServer {
config: ServerConfig,
core: Arc<ServerCore>, handler: Arc<dyn ConnectionHandler>,
is_running: Arc<Mutex<bool>>,
}
impl WebSocketServer {
pub fn new(config: ServerConfig, handler: Arc<dyn ConnectionHandler>) -> Self {
Self::with_server_core(config, handler, None)
}
pub fn with_connection_manager(
config: ServerConfig,
handler: Arc<dyn ConnectionHandler>,
connection_manager: Option<Arc<ConnectionManager>>,
) -> Self {
Self::with_server_core(config, handler, connection_manager)
}
fn with_server_core(
config: ServerConfig,
handler: Arc<dyn ConnectionHandler>,
connection_manager: Option<Arc<ConnectionManager>>,
) -> Self {
let core = Arc::new(ServerCore::new(&config, connection_manager));
Self {
config,
core,
handler,
is_running: Arc::new(Mutex::new(false)),
}
}
pub fn with_shared_core(
config: ServerConfig,
handler: Arc<dyn ConnectionHandler>,
core: Arc<ServerCore>,
) -> Self {
Self {
config,
core,
handler,
is_running: Arc::new(Mutex::new(false)),
}
}
}
#[async_trait]
impl Server for WebSocketServer {
async fn start(&mut self) -> Result<()> {
let addr = self.config.bind_address.parse::<std::net::SocketAddr>()
.map_err(|e| crate::common::error::FlareError::protocol_error(format!("Invalid address: {}", e)))?;
let listener = TcpListener::bind(addr).await
.map_err(|e| crate::common::error::FlareError::connection_failed(format!("Failed to bind: {}", e)))?;
*self.is_running.lock().await = true;
self.core.start_heartbeat(&self.config);
let handler = Arc::clone(&self.handler);
let manager = Arc::clone(&self.core.connection_manager);
let parser = self.core.parser.clone();
let config = self.config.clone();
let is_running = Arc::clone(&self.is_running);
let core = Arc::clone(&self.core);
let core_clone = Arc::clone(&core);
tokio::spawn(async move {
debug!("[DEBUG WebSocketServer] 开始监听连接");
while *is_running.lock().await {
match listener.accept().await {
Ok((stream, _addr)) => {
debug!("[DEBUG WebSocketServer] 收到新连接");
let handler_clone = Arc::clone(&handler);
let manager_clone = Arc::clone(&manager);
let parser_clone = parser.clone();
let config_clone = config.clone();
let core_clone = Arc::clone(&core_clone);
tokio::spawn(async move {
debug!("[DEBUG WebSocketServer] 连接处理任务开始");
handle_websocket_connection(
stream,
handler_clone,
manager_clone,
parser_clone,
config_clone,
core_clone,
).await;
debug!("[DEBUG WebSocketServer] 连接处理任务结束");
});
}
Err(e) => {
debug!("Failed to accept connection: {}", e);
}
}
}
debug!("[DEBUG WebSocketServer] 停止监听连接");
});
Ok(())
}
async fn stop(&mut self) -> Result<()> {
*self.is_running.lock().await = false;
self.core.stop_heartbeat();
let connection_ids = self.core.list_connections().await;
for conn_id in connection_ids {
let manager_trait = self.core.connection_manager_trait();
if let Some((conn, _)) = manager_trait.get_connection(&conn_id).await {
let mut c = conn.lock().await;
let _ = c.close().await;
}
let _ = ServerHandle::disconnect(&*self.core, &conn_id).await;
}
Ok(())
}
fn is_running(&self) -> bool {
debug!("[DEBUG WebSocketServer] is_running 开始");
let result = tokio::task::block_in_place(|| {
debug!("[DEBUG WebSocketServer] is_running: block_in_place 内部,获取 blocking_lock");
let guard = self.is_running.blocking_lock();
debug!("[DEBUG WebSocketServer] is_running: blocking_lock 已获取");
let result = *guard;
debug!("[DEBUG WebSocketServer] is_running: 值 = {}", result);
result
});
debug!("[DEBUG WebSocketServer] is_running 返回: {}", result);
result
}
}
async fn handle_websocket_connection(
stream: TcpStream,
handler: Arc<dyn ConnectionHandler>,
manager: Arc<ConnectionManager>,
parser: crate::common::MessageParser,
config: ServerConfig,
core: Arc<ServerCore>,
) {
let ws_stream_plain = match accept_async(stream).await {
Ok(ws) => ws,
Err(e) => {
debug!("WebSocket handshake failed: {}", e);
return;
}
};
let transport = WebSocketTransport::from_tcp_stream(ws_stream_plain);
let connection: Box<dyn Connection> = Box::new(transport);
let connection_id = generate_id();
debug!("[DEBUG WebSocketServer] handle_websocket_connection: 检查连接数限制, connection_id={}", connection_id);
if manager.connection_count() >= config.max_connections {
debug!("Connection limit exceeded: {}", config.max_connections);
return;
}
debug!("[DEBUG WebSocketServer] handle_websocket_connection: 连接数检查通过");
debug!("[DEBUG WebSocketServer] handle_websocket_connection: 准备添加连接");
let requires_auth = core.auth_enabled();
if let Err(e) = manager.add_connection(connection_id.clone(), connection, None, requires_auth) {
debug!("Failed to add connection: {}", e);
return;
}
debug!("[DEBUG WebSocketServer] handle_websocket_connection: 连接已添加到管理器");
debug!("[DEBUG WebSocketServer] 准备调用 handler.on_connect, connection_id={}", connection_id);
if let Err(e) = handler.on_connect(&connection_id).await {
debug!("Handler on_connect error: {}", e);
} else {
debug!("[DEBUG WebSocketServer] handler.on_connect 成功返回");
}
let handler_clone = Arc::clone(&handler);
let manager_clone = Arc::clone(&manager);
let parser_clone = parser.clone();
let conn_id_clone = connection_id.clone();
let core_clone = Arc::clone(&core);
let device_manager = core.device_manager();
let event_handler = core.event_handler();
let observer = Arc::new(crate::server::events::DefaultServerMessageObserver::new(
handler_clone,
manager_clone,
parser_clone,
conn_id_clone.clone(),
core_clone,
device_manager,
event_handler, ));
debug!("[DEBUG WebSocketServer] handle_websocket_connection: 准备获取连接并添加观察者");
if let Some((conn, _)) = manager.get_connection(&connection_id) {
debug!("[DEBUG WebSocketServer] handle_websocket_connection: 连接已获取");
{
debug!("[DEBUG WebSocketServer] handle_websocket_connection: 获取连接锁");
let mut c = conn.lock().await;
debug!("[DEBUG WebSocketServer] handle_websocket_connection: 连接锁已获取,添加观察者");
c.add_observer(observer);
debug!("[DEBUG WebSocketServer] handle_websocket_connection: 观察者已添加");
}
debug!("[DEBUG WebSocketServer] handle_websocket_connection: 连接锁已释放");
} else {
debug!("[DEBUG WebSocketServer] handle_websocket_connection: 警告:无法获取连接,connection_id={}", connection_id);
}
debug!("[DEBUG WebSocketServer] handle_websocket_connection: 连接处理完成, connection_id={}", connection_id);
debug!("[DEBUG WebSocketServer] handle_websocket_connection: 函数即将返回, connection_id={}", connection_id);
}