use crate::server::connection::{ConnectionManager, ConnectionManagerTrait};
use crate::server::transports::server_core::ServerCore;
use crate::server::transports::ConnectionHandler;
use crate::common::MessageParser;
use crate::common::protocol::{Frame, pong, frame_with_system_command, Reliability};
use crate::transport::events::{ConnectionEvent, ConnectionObserver};
use crate::server::events::handler::ServerEventHandler;
use crate::common::error::Result;
use std::sync::Arc;
use tracing::{debug, error, info};
use std::convert::TryFrom;
pub struct DefaultServerMessageObserver {
handler: Arc<dyn ConnectionHandler>,
manager: Arc<ConnectionManager>,
parser: MessageParser,
connection_id: String,
core: Arc<ServerCore>,
device_manager: Option<Arc<crate::server::device::DeviceManager>>,
event_handler: Option<Arc<dyn ServerEventHandler>>,
}
impl Clone for DefaultServerMessageObserver {
fn clone(&self) -> Self {
Self {
handler: Arc::clone(&self.handler),
manager: Arc::clone(&self.manager),
parser: self.parser.clone(),
connection_id: self.connection_id.clone(),
core: Arc::clone(&self.core),
device_manager: self.device_manager.clone(),
event_handler: self.event_handler.clone(),
}
}
}
impl DefaultServerMessageObserver {
pub fn new(
handler: Arc<dyn ConnectionHandler>,
manager: Arc<ConnectionManager>,
parser: MessageParser,
connection_id: String,
core: Arc<ServerCore>,
device_manager: Option<Arc<crate::server::device::DeviceManager>>,
event_handler: Option<Arc<dyn ServerEventHandler>>,
) -> Self {
Self {
handler,
manager,
parser,
connection_id,
core,
device_manager,
event_handler,
}
}
pub async fn handle_system_command(
&self,
frame: &Frame,
sys_type: i32,
connection_id: &str,
) -> Result<()> {
use crate::common::protocol::flare::core::commands::system_command::Type as SysType;
match SysType::try_from(sys_type) {
Ok(SysType::Connect) => {
let manager_trait = Arc::clone(&self.manager) as Arc<dyn ConnectionManagerTrait>;
if let Some((conn, _)) = manager_trait.get_connection(connection_id).await {
if let Err(e) = self.core.handle_connect_complete(
frame,
connection_id,
conn,
Arc::clone(&self.handler),
).await {
error!("[DefaultObserver] 处理 CONNECT 消息失败: {}", e);
}
} else {
error!("[DefaultObserver] 连接不存在: {}", connection_id);
}
}
Ok(SysType::Ping) => {
let manager = Arc::clone(&self.manager) as Arc<dyn ConnectionManagerTrait>;
let conn_id = connection_id.to_string();
let manager_update = Arc::clone(&manager);
let conn_id_update = conn_id.clone();
tokio::spawn(async move {
let _ = manager_update.update_connection_active(&conn_id_update).await;
});
let parser_clone = self.parser.clone();
if let Some(ref event_handler) = self.event_handler {
if let Ok(Some(custom_response)) = event_handler.handle_ping(frame, connection_id).await {
let manager_get = Arc::clone(&manager);
let parser = parser_clone.clone();
tokio::spawn(async move {
if let Some((conn, _)) = manager_get.get_connection(&conn_id).await {
if let Ok(data) = parser.serialize(&custom_response) {
let conn_clone = Arc::clone(&conn);
let mut c = conn_clone.lock().await;
let _ = c.send(&data).await;
}
}
});
return Ok(());
}
}
let pong_cmd = pong();
let pong_frame = frame_with_system_command(pong_cmd, Reliability::AtLeastOnce);
if let Ok(pong_data) = parser_clone.serialize(&pong_frame) {
let manager_get = Arc::clone(&manager);
tokio::spawn(async move {
if let Some((conn, _)) = manager_get.get_connection(&conn_id).await {
let conn_clone = Arc::clone(&conn);
let mut c = conn_clone.lock().await;
let _ = c.send(&pong_data).await;
}
});
}
}
Ok(SysType::Pong) => {
let manager = Arc::clone(&self.manager) as Arc<dyn ConnectionManagerTrait>;
let conn_id = connection_id.to_string();
if let Some(ref event_handler) = self.event_handler {
let _ = event_handler.handle_pong(frame, connection_id).await;
}
tokio::spawn(async move {
let _ = manager.update_connection_active(&conn_id).await;
});
}
_ => {
debug!("[DefaultObserver] 未处理的系统命令类型: {}", sys_type);
}
}
Ok(())
}
pub async fn handle_message_command(
&self,
frame: &Frame,
command: &crate::common::protocol::MessageCommand,
connection_id: &str,
) -> Result<()> {
if let Some(ref event_handler) = self.event_handler {
use crate::common::protocol::flare::core::commands::message_command::Type as MsgType;
if let Ok(msg_type) = MsgType::try_from(command.r#type) {
if let Ok(Some(response)) = event_handler
.handle_message_command_by_type(command, msg_type, connection_id)
.await
{
let manager_trait = Arc::clone(&self.manager) as Arc<dyn ConnectionManagerTrait>;
let conn_id = connection_id.to_string();
let parser = self.parser.clone();
tokio::spawn(async move {
if let Some((conn, _)) = manager_trait.get_connection(&conn_id).await {
if let Ok(data) = parser.serialize(&response) {
let conn_clone = Arc::clone(&conn);
let mut c = conn_clone.lock().await;
let _ = c.send(&data).await;
}
}
});
return Ok(());
}
}
}
let handler = Arc::clone(&self.handler);
let manager = Arc::clone(&self.manager);
let parser = self.parser.clone();
let conn_id = connection_id.to_string();
let frame_clone = frame.clone();
let manager_update = Arc::clone(&manager) as Arc<dyn ConnectionManagerTrait>;
let conn_id_update = conn_id.clone();
tokio::spawn(async move {
let _ = manager_update.update_connection_active(&conn_id_update).await;
});
tokio::spawn(async move {
if let Ok(Some(response)) = handler.handle_frame(&frame_clone, &conn_id).await {
let manager_trait = Arc::clone(&manager) as Arc<dyn ConnectionManagerTrait>;
if let Some((conn, _)) = manager_trait.get_connection(&conn_id).await {
if let Ok(data) = parser.serialize(&response) {
let conn_clone = Arc::clone(&conn);
let mut c = conn_clone.lock().await;
let _ = c.send(&data).await;
}
}
}
});
Ok(())
}
pub async fn handle_notification_command(
&self,
frame: &Frame,
command: &crate::common::protocol::NotificationCommand,
connection_id: &str,
) -> Result<()> {
if let Some(ref event_handler) = self.event_handler {
use crate::common::protocol::flare::core::commands::notification_command::Type as NotifType;
if let Ok(notif_type) = NotifType::try_from(command.r#type) {
if let Ok(Some(response)) = event_handler
.handle_notification_command_by_type(command, notif_type, connection_id)
.await
{
let manager_trait = Arc::clone(&self.manager) as Arc<dyn ConnectionManagerTrait>;
let conn_id = connection_id.to_string();
let parser = self.parser.clone();
tokio::spawn(async move {
if let Some((conn, _)) = manager_trait.get_connection(&conn_id).await {
if let Ok(data) = parser.serialize(&response) {
let conn_clone = Arc::clone(&conn);
let mut c = conn_clone.lock().await;
let _ = c.send(&data).await;
}
}
});
return Ok(());
}
}
}
let handler = Arc::clone(&self.handler);
let manager = Arc::clone(&self.manager);
let parser = self.parser.clone();
let conn_id = connection_id.to_string();
let frame_clone = frame.clone();
let manager_update = Arc::clone(&manager) as Arc<dyn ConnectionManagerTrait>;
let conn_id_update = conn_id.clone();
tokio::spawn(async move {
let _ = manager_update.update_connection_active(&conn_id_update).await;
});
tokio::spawn(async move {
if let Ok(Some(response)) = handler.handle_frame(&frame_clone, &conn_id).await {
let manager_trait = Arc::clone(&manager) as Arc<dyn ConnectionManagerTrait>;
if let Some((conn, _)) = manager_trait.get_connection(&conn_id).await {
if let Ok(data) = parser.serialize(&response) {
let conn_clone = Arc::clone(&conn);
let mut c = conn_clone.lock().await;
let _ = c.send(&data).await;
}
}
}
});
Ok(())
}
}
impl ConnectionObserver for DefaultServerMessageObserver {
fn on_event(&self, event: &ConnectionEvent) {
match event {
ConnectionEvent::Message(data) => {
if let Ok(frame) = self.parser.parse(data) {
if let Some(cmd) = &frame.command {
match &cmd.r#type {
Some(crate::common::protocol::flare::core::commands::command::Type::System(sys_cmd)) => {
let sys_type = sys_cmd.r#type;
let conn_id = self.connection_id.clone();
let frame_clone = frame.clone();
let observer = self.clone();
tokio::spawn(async move {
if let Err(e) = observer.handle_system_command(&frame_clone, sys_type, &conn_id).await {
error!("[DefaultObserver] 处理系统命令失败: {}", e);
}
});
}
Some(crate::common::protocol::flare::core::commands::command::Type::Message(msg_cmd)) => {
let conn_id = self.connection_id.clone();
let frame_clone = frame.clone();
let msg_cmd_clone = msg_cmd.clone();
let observer = self.clone();
tokio::spawn(async move {
if let Err(e) = observer.handle_message_command(&frame_clone, &msg_cmd_clone, &conn_id).await {
error!("[DefaultObserver] 处理消息命令失败: {}", e);
}
});
}
Some(crate::common::protocol::flare::core::commands::command::Type::Notification(notif_cmd)) => {
let conn_id = self.connection_id.clone();
let frame_clone = frame.clone();
let notif_cmd_clone = notif_cmd.clone();
let observer = self.clone();
tokio::spawn(async move {
if let Err(e) = observer.handle_notification_command(&frame_clone, ¬if_cmd_clone, &conn_id).await {
error!("[DefaultObserver] 处理通知命令失败: {}", e);
}
});
}
_ => {
debug!("[DefaultObserver] 未处理的命令类型");
}
}
}
}
}
ConnectionEvent::Disconnected(reason) => {
let handler = Arc::clone(&self.handler);
let manager = Arc::clone(&self.manager) as Arc<dyn ConnectionManagerTrait>;
let conn_id = self.connection_id.clone();
let device_manager = self.device_manager.clone();
let event_handler = self.event_handler.clone();
let reason_str = reason.clone();
debug!("[DefaultObserver] Connection disconnected: {}", conn_id);
tokio::spawn(async move {
let user_id = if let Some((_, conn_info)) = manager.get_connection(&conn_id).await {
conn_info.user_id
} else {
None
};
if let Some(ref event_handler) = event_handler {
let _ = event_handler.on_disconnect(&conn_id, Some(reason_str.as_str())).await;
}
let _ = handler.on_disconnect(&conn_id).await;
match manager.remove_connection(&conn_id).await {
Ok(_) => {
debug!("[DefaultObserver] Successfully removed connection: {}", conn_id);
}
Err(e) => {
debug!("[DefaultObserver] Connection {} already removed or not found: {}", conn_id, e);
}
}
if let (Some(device_mgr), Some(user_id)) = (device_manager, user_id) {
if let Err(e) = device_mgr.remove_device(&user_id, &conn_id).await {
debug!("[DefaultObserver] Failed to remove device from DeviceManager: {}", e);
} else {
info!("[DefaultObserver] Successfully removed device from DeviceManager: user_id={}, connection_id={}", user_id, conn_id);
}
}
});
}
ConnectionEvent::Connected => {
}
ConnectionEvent::Error(e) => {
error!("[DefaultObserver] Connection error for {}: {:?}", self.connection_id, e);
let handler = Arc::clone(&self.handler);
let manager = Arc::clone(&self.manager) as Arc<dyn ConnectionManagerTrait>;
let conn_id = self.connection_id.clone();
let device_manager = self.device_manager.clone();
let event_handler = self.event_handler.clone();
let error_msg = format!("{:?}", e);
debug!("[DefaultObserver] Connection error detected, removing connection: {}", conn_id);
tokio::spawn(async move {
let user_id = if let Some((_, conn_info)) = manager.get_connection(&conn_id).await {
conn_info.user_id
} else {
None
};
if let Some(ref event_handler) = event_handler {
let _ = event_handler.on_error(&conn_id, &error_msg).await;
}
let _ = handler.on_disconnect(&conn_id).await;
match manager.remove_connection(&conn_id).await {
Ok(_) => {
debug!("[DefaultObserver] Successfully removed connection after error: {}", conn_id);
}
Err(e) => {
debug!("[DefaultObserver] Connection {} already removed or not found after error: {}", conn_id, e);
}
}
if let (Some(device_mgr), Some(user_id)) = (device_manager, user_id) {
if let Err(e) = device_mgr.remove_device(&user_id, &conn_id).await {
debug!("[DefaultObserver] Failed to remove device from DeviceManager: {}", e);
} else {
info!("[DefaultObserver] Successfully removed device from DeviceManager: user_id={}, connection_id={}", user_id, conn_id);
}
}
});
}
}
}
}