Skip to main content

smcp_server_core/
handler.rs

1//! SMCP 协议处理器 / SMCP protocol handler
2
3use crate::auth::{AuthError, AuthenticationProvider};
4use crate::session::{ClientRole, SessionData, SessionError, SessionManager};
5use futures_util::StreamExt;
6use serde_json::Value;
7use smcp::*;
8use socketioxide::{
9    extract::{AckSender, Data, SocketRef},
10    SocketIo,
11};
12use std::sync::Arc;
13use thiserror::Error;
14use tracing::{error, info, warn};
15
16/// 处理器错误类型
17#[derive(Error, Debug)]
18pub enum HandlerError {
19    #[error("Authentication error: {0}")]
20    Auth(#[from] AuthError),
21    #[error("Session error: {0}")]
22    Session(#[from] SessionError),
23    #[error("JSON error: {0}")]
24    Json(#[from] serde_json::Error),
25    #[error("Timeout error: {0}")]
26    Timeout(String),
27    #[error("Invalid request: {0}")]
28    InvalidRequest(String),
29}
30
31impl HandlerError {
32    /// 获取错误码 / Get error code
33    pub fn error_code(&self) -> i32 {
34        match self {
35            HandlerError::Auth(_) => smcp::error_codes::UNAUTHORIZED,
36            HandlerError::Session(e) => e.error_code(),
37            HandlerError::Json(_) => smcp::error_codes::BAD_REQUEST,
38            HandlerError::Timeout(_) => smcp::error_codes::TIMEOUT,
39            HandlerError::InvalidRequest(_) => smcp::error_codes::BAD_REQUEST,
40        }
41    }
42
43    /// 转换为标准错误响应 / Convert to standard error response
44    pub fn to_error_response(&self) -> smcp::ErrorResponse {
45        smcp::ErrorResponse::new(self.error_code(), self.to_string())
46    }
47}
48
49impl serde::Serialize for HandlerError {
50    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
51    where
52        S: serde::Serializer,
53    {
54        // 使用标准错误响应格式 / Use standard error response format
55        self.to_error_response().serialize(serializer)
56    }
57}
58
59/// 服务器状态
60#[derive(Clone, Debug)]
61pub struct ServerState {
62    /// 会话管理器
63    pub session_manager: Arc<SessionManager>,
64    /// 认证提供者
65    pub auth_provider: Arc<dyn AuthenticationProvider>,
66    /// SocketIo 实例引用,用于跨 socket 通信
67    pub io: Arc<SocketIo>,
68}
69
70/// SMCP 事件处理器
71pub struct SmcpHandler;
72
73impl SmcpHandler {
74    /// 注册所有事件处理器
75    pub fn register_handlers(io: &SocketIo, state: ServerState) {
76        // 注册命名空间和连接处理器
77        io.ns(SMCP_NAMESPACE, move |socket: SocketRef| {
78            let state = state.clone();
79            async move {
80                if let Err(e) = Self::on_connect(socket.clone(), &state).await {
81                    error!("on_connect failed: {}", e);
82                    return;
83                }
84
85                // 连接时注册所有事件处理器
86                Self::handle_connection(socket, state)
87            }
88        });
89    }
90
91    /// 处理连接并注册事件处理器
92    fn handle_connection(socket: SocketRef, state: ServerState) {
93        // 注册各种事件处理器
94        socket.on_disconnect({
95            let state = state.clone();
96            move |socket: SocketRef| {
97                let state = state.clone();
98                async move { Self::on_disconnect(socket, state).await }
99            }
100        });
101
102        let state_join = state.clone();
103        socket.on(
104            smcp::events::SERVER_JOIN_OFFICE,
105            move |socket: SocketRef, Data::<EnterOfficeReq>(data), ack: AckSender| async move {
106                let result = Self::on_server_join_office(socket, data, state_join.clone()).await;
107                let _ = ack.send(&result);
108            },
109        );
110
111        let state_leave = state.clone();
112        socket.on(
113            smcp::events::SERVER_LEAVE_OFFICE,
114            move |socket: SocketRef, Data::<LeaveOfficeReq>(data), ack: AckSender| async move {
115                let result = Self::on_server_leave_office(socket, data, state_leave.clone()).await;
116                let _ = ack.send(&result);
117            },
118        );
119
120        let state_tool_call_cancel = state.clone();
121        socket.on(
122            smcp::events::SERVER_TOOL_CALL_CANCEL,
123            move |socket: SocketRef, Data::<AgentCallData>(data)| async move {
124                Self::on_server_tool_call_cancel(socket, data, state_tool_call_cancel.clone()).await
125            },
126        );
127
128        let state_update_config = state.clone();
129        socket.on(
130            smcp::events::SERVER_UPDATE_CONFIG,
131            move |socket: SocketRef, Data::<UpdateComputerConfigReq>(data)| async move {
132                Self::on_server_update_config(socket, data, state_update_config.clone()).await
133            },
134        );
135
136        let state_update_tool_list = state.clone();
137        socket.on(
138            smcp::events::SERVER_UPDATE_TOOL_LIST,
139            move |socket: SocketRef, Data::<UpdateComputerConfigReq>(data)| async move {
140                Self::on_server_update_tool_list(socket, data, state_update_tool_list.clone()).await
141            },
142        );
143
144        let state_tool_call = state.clone();
145        socket.on(
146            smcp::events::CLIENT_TOOL_CALL,
147            move |socket: SocketRef, Data::<ToolCallReq>(data), ack: AckSender| async move {
148                let result = Self::on_client_tool_call(socket, data, state_tool_call.clone()).await;
149                let _ = ack.send(&result);
150            },
151        );
152
153        let state_get_tools = state.clone();
154        socket.on(
155            smcp::events::CLIENT_GET_TOOLS,
156            move |socket: SocketRef, Data::<GetToolsReq>(data), ack: AckSender| async move {
157                let result = Self::on_client_get_tools(socket, data, state_get_tools.clone()).await;
158                let _ = ack.send(&result);
159            },
160        );
161
162        let state_get_desktop = state.clone();
163        socket.on(
164            smcp::events::CLIENT_GET_DESKTOP,
165            move |socket: SocketRef, Data::<GetDesktopReq>(data), ack: AckSender| async move {
166                let result =
167                    Self::on_client_get_desktop(socket, data, state_get_desktop.clone()).await;
168                let _ = ack.send(&result);
169            },
170        );
171
172        let state_get_config = state.clone();
173        socket.on(
174            smcp::events::CLIENT_GET_CONFIG,
175            move |socket: SocketRef, Data::<GetComputerConfigReq>(data), ack: AckSender| async move {
176                let result = Self::on_client_get_config(socket, data, state_get_config.clone()).await;
177                let _ = ack.send(&result);
178            },
179        );
180
181        let state_update_desktop = state.clone();
182        socket.on(
183            smcp::events::SERVER_UPDATE_DESKTOP,
184            move |socket: SocketRef, Data::<UpdateComputerConfigReq>(data)| async move {
185                Self::on_server_update_desktop(socket, data, state_update_desktop.clone()).await
186            },
187        );
188
189        let state_list_room = state.clone();
190        socket.on(
191            smcp::events::SERVER_LIST_ROOM,
192            move |socket: SocketRef, Data::<ListRoomReq>(data), ack: AckSender| async move {
193                let result = Self::on_server_list_room(socket, data, state_list_room.clone()).await;
194                let _ = ack.send(&result);
195            },
196        );
197    }
198
199    /// 处理连接事件
200    async fn on_connect(socket: SocketRef, state: &ServerState) -> Result<(), HandlerError> {
201        info!(
202            "SocketIO Client {} connecting to {}...",
203            socket.id, SMCP_NAMESPACE
204        );
205
206        // 获取请求头进行认证
207        let headers = socket.req_parts().headers.clone();
208        let auth_data = socket.req_parts().extensions.get::<Value>();
209
210        // 认证
211        state
212            .auth_provider
213            .authenticate(&headers, auth_data)
214            .await?;
215
216        info!(
217            "SocketIO Client {} connected successfully to {}",
218            socket.id, SMCP_NAMESPACE
219        );
220        Ok(())
221    }
222
223    /// 处理断开连接事件
224    async fn on_disconnect(socket: SocketRef, state: ServerState) {
225        info!(
226            "SocketIO Client {} disconnecting from {}...",
227            socket.id, SMCP_NAMESPACE
228        );
229
230        // 清理会话
231        let sid = socket.id.to_string();
232        if let Some(session) = state.session_manager.unregister_session(&sid) {
233            // 如果在房间内,广播离开消息
234            if let Some(office_id) = session.office_id {
235                let notification = if session.role == ClientRole::Computer {
236                    LeaveOfficeNotification {
237                        office_id: office_id.clone(),
238                        computer: Some(session.name),
239                        agent: None,
240                    }
241                } else {
242                    LeaveOfficeNotification {
243                        office_id: office_id.clone(),
244                        computer: None,
245                        agent: Some(session.name),
246                    }
247                };
248
249                let _ = socket
250                    .within(office_id)
251                    .emit(smcp::events::NOTIFY_LEAVE_OFFICE, &notification)
252                    .await;
253            }
254        }
255
256        info!(
257            "SocketIO Client {} disconnected from {}",
258            socket.id, SMCP_NAMESPACE
259        );
260    }
261
262    /// 处理加入办公室事件
263    async fn on_server_join_office(
264        socket: SocketRef,
265        data: EnterOfficeReq,
266        state: ServerState,
267    ) -> (bool, Option<String>) {
268        info!("on_server_join_office called with data: {:?}", data);
269
270        let sid = socket.id.to_string();
271        let requested_role = ClientRole::from(data.role.clone());
272        let requested_name = data.name.clone();
273
274        // 获取或创建会话
275        let session = match state.session_manager.get_session(&sid) {
276            Some(s) => {
277                // 检查角色/状态一致性
278                if s.role != requested_role {
279                    return (
280                        false,
281                        Some(format!(
282                            "Role mismatch: existing session has role {:?}, but requested {:?}",
283                            s.role, requested_role
284                        )),
285                    );
286                }
287
288                if s.name != requested_name {
289                    return (
290                        false,
291                        Some(format!(
292                            "Name mismatch: existing session has name '{}', but requested '{}'",
293                            s.name, requested_name
294                        )),
295                    );
296                }
297
298                s
299            }
300            None => {
301                // 创建新会话
302                let new_session = SessionData::new(sid.clone(), requested_name, requested_role);
303
304                if let Err(e) = state.session_manager.register_session(new_session.clone()) {
305                    return (false, Some(format!("Failed to register session: {}", e)));
306                }
307                new_session
308            }
309        };
310
311        // 检查并加入房间
312        if let Err(e) =
313            Self::handle_join_room(socket.clone(), &session, &data.office_id, &state).await
314        {
315            error!("handle_join_room failed: {}", e);
316            return (false, Some(format!("Failed to join room: {}", e)));
317        }
318
319        // 更新会话的办公室 ID(在成功加入房间后)
320        if let Err(e) = state
321            .session_manager
322            .update_office_id(&sid, Some(data.office_id.clone()))
323        {
324            return (false, Some(format!("Failed to update office_id: {}", e)));
325        }
326
327        // 构建通知数据
328        let session_name = session.name.clone();
329        let notification_data = if session.role == ClientRole::Computer {
330            EnterOfficeNotification {
331                office_id: data.office_id.clone(),
332                computer: Some(session_name.clone()),
333                agent: None,
334            }
335        } else {
336            EnterOfficeNotification {
337                office_id: data.office_id.clone(),
338                computer: None,
339                agent: Some(session_name.clone()),
340            }
341        };
342
343        let result = socket
344            .to(data.office_id.clone())
345            .emit(smcp::events::NOTIFY_ENTER_OFFICE, &notification_data)
346            .await;
347
348        if let Err(e) = result {
349            warn!("Failed to broadcast NOTIFY_ENTER_OFFICE: {}", e);
350        }
351
352        (true, None)
353    }
354
355    /// 处理离开办公室事件
356    async fn on_server_leave_office(
357        socket: SocketRef,
358        data: LeaveOfficeReq,
359        state: ServerState,
360    ) -> (bool, Option<String>) {
361        let sid = socket.id.to_string();
362
363        // 获取会话
364        let session = match state.session_manager.get_session(&sid) {
365            Some(s) => s,
366            None => return (false, Some(format!("Session not found: {}", sid))),
367        };
368
369        // 构建离开通知
370        let notification = if session.role == ClientRole::Computer {
371            LeaveOfficeNotification {
372                office_id: data.office_id.clone(),
373                computer: Some(session.name),
374                agent: None,
375            }
376        } else {
377            LeaveOfficeNotification {
378                office_id: data.office_id.clone(),
379                computer: None,
380                agent: Some(session.name),
381            }
382        };
383
384        // 广播离开消息
385        let _ = socket
386            .within(data.office_id.clone())
387            .emit(smcp::events::NOTIFY_LEAVE_OFFICE, &notification)
388            .await;
389
390        // 更新会话
391        if let Err(e) = state.session_manager.update_office_id(&sid, None) {
392            return (false, Some(format!("Failed to update office_id: {}", e)));
393        }
394        socket.leave(data.office_id.clone());
395
396        (true, None)
397    }
398
399    /// 处理工具调用取消事件
400    async fn on_server_tool_call_cancel(
401        socket: SocketRef,
402        data: AgentCallData,
403        state: ServerState,
404    ) {
405        let sid = socket.id.to_string();
406        let session = match state.session_manager.get_session(&sid) {
407            Some(s) => s,
408            None => {
409                warn!("SERVER_TOOL_CALL_CANCEL from unknown session sid={}", sid);
410                return;
411            }
412        };
413
414        // Python 侧语义:向 office(room) 广播并跳过自己
415        // 这里沿用 socketioxide 的 to(room) 语义:从当前 socket 触发时,会自动排除自身。
416
417        // 角色断言:取消工具调用通常由 Agent 发起
418        if session.role != ClientRole::Agent {
419            warn!(
420                "SERVER_TOOL_CALL_CANCEL role mismatch: expected Agent, got {:?}, sid={}",
421                session.role, sid
422            );
423            return;
424        }
425
426        let office_id = match session.office_id {
427            Some(ref office_id) => office_id.clone(),
428            None => {
429                warn!(
430                    "SERVER_TOOL_CALL_CANCEL but session not in office, sid={}",
431                    sid
432                );
433                return;
434            }
435        };
436
437        if let Err(e) = socket
438            .to(office_id)
439            .emit(smcp::events::NOTIFY_TOOL_CALL_CANCEL, &data)
440            .await
441        {
442            warn!("Failed to broadcast NOTIFY_TOOL_CALL_CANCEL: {}", e);
443        }
444    }
445
446    /// 处理配置更新事件
447    async fn on_server_update_config(
448        socket: SocketRef,
449        data: UpdateComputerConfigReq,
450        state: ServerState,
451    ) {
452        let sid = socket.id.to_string();
453        let session = match state.session_manager.get_session(&sid) {
454            Some(s) => s,
455            None => {
456                warn!("SERVER_UPDATE_CONFIG from unknown session sid={}", sid);
457                return;
458            }
459        };
460
461        // 角色断言:配置更新通常由 Computer 发起
462        if session.role != ClientRole::Computer {
463            warn!(
464                "SERVER_UPDATE_CONFIG role mismatch: expected Computer, got {:?}, sid={}",
465                session.role, sid
466            );
467            return;
468        }
469
470        let office_id = match session.office_id {
471            Some(ref office_id) => office_id.clone(),
472            None => {
473                warn!(
474                    "SERVER_UPDATE_CONFIG but session not in office, sid={}",
475                    sid
476                );
477                return;
478            }
479        };
480
481        // 广播配置更新通知(向 office 广播并跳过自己)
482        let notification = UpdateMCPConfigNotification {
483            computer: data.computer.clone(),
484        };
485
486        let office_id_clone = office_id.clone();
487        let computer_clone = data.computer.clone();
488        info!(
489            "Broadcasting NOTIFY_UPDATE_CONFIG to room '{}' from computer '{}' (sid: {})",
490            office_id_clone, computer_clone, sid
491        );
492
493        if let Err(e) = socket
494            .to(office_id.clone())
495            .emit(smcp::events::NOTIFY_UPDATE_CONFIG, &notification)
496            .await
497        {
498            warn!("Failed to broadcast NOTIFY_UPDATE_CONFIG: {}", e);
499        } else {
500            info!(
501                "Successfully broadcasted NOTIFY_UPDATE_CONFIG to room '{}'",
502                office_id
503            );
504        }
505    }
506
507    /// 处理工具列表更新事件
508    async fn on_server_update_tool_list(
509        socket: SocketRef,
510        data: UpdateComputerConfigReq,
511        state: ServerState,
512    ) {
513        let sid = socket.id.to_string();
514        let session = match state.session_manager.get_session(&sid) {
515            Some(s) => s,
516            None => {
517                warn!("SERVER_UPDATE_TOOL_LIST from unknown session sid={}", sid);
518                return;
519            }
520        };
521
522        // 角色断言:工具列表更新通常由 Computer 发起
523        if session.role != ClientRole::Computer {
524            warn!(
525                "SERVER_UPDATE_TOOL_LIST role mismatch: expected Computer, got {:?}, sid={}",
526                session.role, sid
527            );
528            return;
529        }
530
531        let office_id = match session.office_id {
532            Some(ref office_id) => office_id.clone(),
533            None => {
534                warn!(
535                    "SERVER_UPDATE_TOOL_LIST but session not in office, sid={}",
536                    sid
537                );
538                return;
539            }
540        };
541
542        // 广播工具列表更新通知(向 office 广播并跳过自己)
543        let notification = UpdateToolListNotification {
544            computer: data.computer,
545        };
546
547        if let Err(e) = socket
548            .to(office_id)
549            .emit(smcp::events::NOTIFY_UPDATE_TOOL_LIST, &notification)
550            .await
551        {
552            warn!("Failed to broadcast NOTIFY_UPDATE_TOOL_LIST: {}", e);
553        }
554    }
555
556    /// 处理客户端工具调用事件
557    async fn on_client_tool_call(
558        socket: SocketRef,
559        data: ToolCallReq,
560        state: ServerState,
561    ) -> Result<Value, HandlerError> {
562        // 获取 Agent 的会话信息
563        let sid = socket.id.to_string();
564        let session = state
565            .session_manager
566            .get_session(&sid)
567            .ok_or_else(|| HandlerError::Session(SessionError::NotFound(sid.clone())))?;
568
569        // 验证角色必须是 Agent
570        if session.role != ClientRole::Agent {
571            return Err(HandlerError::InvalidRequest(
572                "Only agents can make tool calls".to_string(),
573            ));
574        }
575
576        // 验证 Agent 在某个办公室内
577        let office_id = session.office_id.ok_or_else(|| {
578            HandlerError::InvalidRequest(
579                "Agent must be in an office to make tool calls".to_string(),
580            )
581        })?;
582
583        // 查找目标 Computer 的 sid
584        let computer_sid = state
585            .session_manager
586            .get_computer_sid_in_office(&office_id, &data.computer)
587            .ok_or_else(|| {
588                HandlerError::InvalidRequest(format!(
589                    "Computer '{}' not found in office",
590                    data.computer
591                ))
592            })?;
593
594        // 获取目标 socket
595        let target_socket = state
596            .io
597            .of(SMCP_NAMESPACE)
598            .and_then(|op| op.get_socket(computer_sid.parse().unwrap()))
599            .ok_or_else(|| {
600                HandlerError::InvalidRequest("Target computer socket not found".to_string())
601            })?;
602
603        // 转发请求并等待响应
604        let timeout = tokio::time::Duration::from_secs(30);
605        let ack_result = target_socket.emit_with_ack(smcp::events::CLIENT_TOOL_CALL, &data);
606
607        match tokio::time::timeout(timeout, async move {
608            match ack_result {
609                Ok(stream) => {
610                    let mut pinned = Box::pin(stream);
611                    match pinned.next().await {
612                        Some((_, response)) => response,
613                        None => Ok(serde_json::Value::Null),
614                    }
615                }
616                Err(_) => Ok(serde_json::Value::Null),
617            }
618        })
619        .await
620        {
621            Ok(Ok(response)) => {
622                // 解析响应
623                match response {
624                    serde_json::Value::Object(mut map) => {
625                        // 提取 result 字段
626                        let result = map.remove("result").unwrap_or(serde_json::Value::Null);
627                        Ok(result)
628                    }
629                    _ => Ok(response),
630                }
631            }
632            Ok(Err(e)) => Err(HandlerError::Timeout(format!(
633                "Failed to get response from computer: {}",
634                e
635            ))),
636            Err(_) => Err(HandlerError::Timeout(
637                "Tool call timed out after 30 seconds".to_string(),
638            )),
639        }
640    }
641
642    /// 处理获取工具列表事件
643    async fn on_client_get_tools(
644        socket: SocketRef,
645        data: GetToolsReq,
646        state: ServerState,
647    ) -> Result<GetToolsRet, HandlerError> {
648        // 获取 Agent 的会话信息
649        let sid = socket.id.to_string();
650        let session = state
651            .session_manager
652            .get_session(&sid)
653            .ok_or_else(|| HandlerError::Session(SessionError::NotFound(sid.clone())))?;
654
655        // 验证角色必须是 Agent
656        if session.role != ClientRole::Agent {
657            return Err(HandlerError::InvalidRequest(
658                "Only agents can get tools".to_string(),
659            ));
660        }
661
662        // 验证 Agent 在某个办公室内
663        let office_id = session.office_id.ok_or_else(|| {
664            HandlerError::InvalidRequest("Agent must be in an office to get tools".to_string())
665        })?;
666
667        // 查找目标 Computer 的 sid
668        let computer_sid = state
669            .session_manager
670            .get_computer_sid_in_office(&office_id, &data.computer)
671            .ok_or_else(|| {
672                HandlerError::InvalidRequest(format!(
673                    "Computer '{}' not found in office",
674                    data.computer
675                ))
676            })?;
677
678        // 获取目标 socket
679        let target_socket = state
680            .io
681            .of(SMCP_NAMESPACE)
682            .and_then(|op| op.get_socket(computer_sid.parse().unwrap()))
683            .ok_or_else(|| {
684                HandlerError::InvalidRequest("Target computer socket not found".to_string())
685            })?;
686
687        // 转发请求并等待响应
688        let timeout = tokio::time::Duration::from_secs(30);
689        let ack_result = target_socket.emit_with_ack(smcp::events::CLIENT_GET_TOOLS, &data);
690
691        match tokio::time::timeout(timeout, async move {
692            match ack_result {
693                Ok(stream) => {
694                    let mut pinned = Box::pin(stream);
695                    match pinned.next().await {
696                        Some((_, response)) => response,
697                        None => Ok(serde_json::Value::Null),
698                    }
699                }
700                Err(_) => Ok(serde_json::Value::Null),
701            }
702        })
703        .await
704        {
705            Ok(Ok(response)) => {
706                // 解析响应
707                serde_json::from_value(response).map_err(|e| {
708                    HandlerError::InvalidRequest(format!("Failed to parse response: {}", e))
709                })
710            }
711            Ok(Err(e)) => Err(HandlerError::Timeout(format!(
712                "Failed to get response from computer: {}",
713                e
714            ))),
715            Err(_) => Err(HandlerError::Timeout(
716                "Get tools timed out after 30 seconds".to_string(),
717            )),
718        }
719    }
720
721    /// 处理获取桌面信息事件
722    async fn on_client_get_desktop(
723        socket: SocketRef,
724        data: GetDesktopReq,
725        state: ServerState,
726    ) -> Result<GetDesktopRet, HandlerError> {
727        // 获取 Agent 的会话信息
728        let sid = socket.id.to_string();
729        let session = state
730            .session_manager
731            .get_session(&sid)
732            .ok_or_else(|| HandlerError::Session(SessionError::NotFound(sid.clone())))?;
733
734        // 验证角色必须是 Agent
735        if session.role != ClientRole::Agent {
736            return Err(HandlerError::InvalidRequest(
737                "Only agents can get desktop".to_string(),
738            ));
739        }
740
741        // 验证 Agent 在某个办公室内
742        let office_id = session.office_id.ok_or_else(|| {
743            HandlerError::InvalidRequest("Agent must be in an office to get desktop".to_string())
744        })?;
745
746        // 查找目标 Computer 的 sid
747        let computer_sid = state
748            .session_manager
749            .get_computer_sid_in_office(&office_id, &data.computer)
750            .ok_or_else(|| {
751                HandlerError::InvalidRequest(format!(
752                    "Computer '{}' not found in office",
753                    data.computer
754                ))
755            })?;
756
757        // 获取目标 socket
758        let target_socket = state
759            .io
760            .of(SMCP_NAMESPACE)
761            .and_then(|op| op.get_socket(computer_sid.parse().unwrap()))
762            .ok_or_else(|| {
763                HandlerError::InvalidRequest("Target computer socket not found".to_string())
764            })?;
765
766        // 转发请求并等待响应
767        let timeout = tokio::time::Duration::from_secs(30);
768        let ack_result = target_socket.emit_with_ack(smcp::events::CLIENT_GET_DESKTOP, &data);
769
770        match tokio::time::timeout(timeout, async move {
771            match ack_result {
772                Ok(stream) => {
773                    let mut pinned = Box::pin(stream);
774                    match pinned.next().await {
775                        Some((_, response)) => response,
776                        None => Ok(serde_json::Value::Null),
777                    }
778                }
779                Err(_) => Ok(serde_json::Value::Null),
780            }
781        })
782        .await
783        {
784            Ok(Ok(response)) => {
785                // 解析响应
786                serde_json::from_value(response).map_err(|e| {
787                    HandlerError::InvalidRequest(format!("Failed to parse response: {}", e))
788                })
789            }
790            Ok(Err(e)) => Err(HandlerError::Timeout(format!(
791                "Failed to get response from computer: {}",
792                e
793            ))),
794            Err(_) => Err(HandlerError::Timeout(
795                "Get desktop timed out after 30 seconds".to_string(),
796            )),
797        }
798    }
799
800    /// 处理获取计算机配置事件
801    async fn on_client_get_config(
802        socket: SocketRef,
803        data: GetComputerConfigReq,
804        state: ServerState,
805    ) -> Result<GetComputerConfigRet, HandlerError> {
806        // 获取 Agent 的会话信息
807        let sid = socket.id.to_string();
808        let session = state
809            .session_manager
810            .get_session(&sid)
811            .ok_or_else(|| HandlerError::Session(SessionError::NotFound(sid.clone())))?;
812
813        // 验证角色必须是 Agent
814        if session.role != ClientRole::Agent {
815            return Err(HandlerError::InvalidRequest(
816                "Only agents can get config".to_string(),
817            ));
818        }
819
820        // 验证 Agent 在某个办公室内
821        let office_id = session.office_id.ok_or_else(|| {
822            HandlerError::InvalidRequest("Agent must be in an office to get config".to_string())
823        })?;
824
825        // 查找目标 Computer 的 sid
826        let computer_sid = state
827            .session_manager
828            .get_computer_sid_in_office(&office_id, &data.computer)
829            .ok_or_else(|| {
830                HandlerError::InvalidRequest(format!(
831                    "Computer '{}' not found in office",
832                    data.computer
833                ))
834            })?;
835
836        // 获取目标 socket
837        let target_socket = state
838            .io
839            .of(SMCP_NAMESPACE)
840            .and_then(|op| op.get_socket(computer_sid.parse().unwrap()))
841            .ok_or_else(|| {
842                HandlerError::InvalidRequest("Target computer socket not found".to_string())
843            })?;
844
845        // 转发请求并等待响应
846        let timeout = tokio::time::Duration::from_secs(30);
847        let ack_result = target_socket.emit_with_ack(smcp::events::CLIENT_GET_CONFIG, &data);
848
849        match tokio::time::timeout(timeout, async move {
850            match ack_result {
851                Ok(stream) => {
852                    let mut pinned = Box::pin(stream);
853                    match pinned.next().await {
854                        Some((_, response)) => response,
855                        None => Ok(serde_json::Value::Null),
856                    }
857                }
858                Err(_) => Ok(serde_json::Value::Null),
859            }
860        })
861        .await
862        {
863            Ok(Ok(response)) => {
864                // 解析响应
865                serde_json::from_value(response).map_err(|e| {
866                    HandlerError::InvalidRequest(format!("Failed to parse response: {}", e))
867                })
868            }
869            Ok(Err(e)) => Err(HandlerError::Timeout(format!(
870                "Failed to get response from computer: {}",
871                e
872            ))),
873            Err(_) => Err(HandlerError::Timeout(
874                "Get config timed out after 30 seconds".to_string(),
875            )),
876        }
877    }
878
879    /// 处理桌面更新事件
880    async fn on_server_update_desktop(
881        socket: SocketRef,
882        data: UpdateComputerConfigReq,
883        state: ServerState,
884    ) {
885        let sid = socket.id.to_string();
886        let session = match state.session_manager.get_session(&sid) {
887            Some(s) => s,
888            None => {
889                warn!("SERVER_UPDATE_DESKTOP from unknown session sid={}", sid);
890                return;
891            }
892        };
893
894        // 角色断言:桌面更新通常由 Computer 发起
895        if session.role != ClientRole::Computer {
896            warn!(
897                "SERVER_UPDATE_DESKTOP role mismatch: expected Computer, got {:?}, sid={}",
898                session.role, sid
899            );
900            return;
901        }
902
903        let office_id = match session.office_id {
904            Some(ref office_id) => office_id.clone(),
905            None => {
906                warn!(
907                    "SERVER_UPDATE_DESKTOP but session not in office, sid={}",
908                    sid
909                );
910                return;
911            }
912        };
913
914        // 广播桌面更新通知(向 office 广播并跳过自己)
915        let notification = UpdateMCPConfigNotification {
916            computer: data.computer,
917        };
918
919        if let Err(e) = socket
920            .to(office_id)
921            .emit(smcp::events::NOTIFY_UPDATE_DESKTOP, &notification)
922            .await
923        {
924            warn!("Failed to broadcast NOTIFY_UPDATE_DESKTOP: {}", e);
925        }
926    }
927
928    /// 处理列出房间事件
929    async fn on_server_list_room(
930        socket: SocketRef,
931        data: ListRoomReq,
932        state: ServerState,
933    ) -> ListRoomRet {
934        // 获取发起者会话信息
935        let sid = socket.id.to_string();
936        let session = match state.session_manager.get_session(&sid) {
937            Some(s) => s,
938            None => {
939                warn!("List room from unknown session sid={}", sid);
940                return ListRoomRet {
941                    sessions: vec![],
942                    req_id: data.base.req_id,
943                };
944            }
945        };
946
947        // 权限校验:只能查询自己所在的办公室
948        let session_office_id = match session.office_id {
949            Some(id) => id,
950            None => {
951                warn!("Session {} not in any office", sid);
952                return ListRoomRet {
953                    sessions: vec![],
954                    req_id: data.base.req_id,
955                };
956            }
957        };
958
959        if session_office_id != data.office_id {
960            warn!(
961                "Session {} trying to list room {} but in office {}",
962                sid, data.office_id, session_office_id
963            );
964            return ListRoomRet {
965                sessions: vec![],
966                req_id: data.base.req_id,
967            };
968        }
969
970        // 获取指定办公室的所有会话
971        let sessions = state
972            .session_manager
973            .get_sessions_in_office(&data.office_id);
974
975        // 转换为 SessionInfo 列表
976        let session_infos: Vec<SessionInfo> = sessions
977            .into_iter()
978            .map(|s| SessionInfo {
979                sid: s.sid,
980                name: s.name,
981                role: s.role.into(),
982                office_id: s.office_id.unwrap_or_default(),
983            })
984            .collect();
985
986        ListRoomRet {
987            sessions: session_infos,
988            req_id: data.base.req_id,
989        }
990    }
991
992    /// 处理加入房间的逻辑
993    async fn handle_join_room(
994        socket: SocketRef,
995        session: &SessionData,
996        office_id: &str,
997        state: &ServerState,
998    ) -> Result<(), HandlerError> {
999        info!(
1000            "handle_join_room called: sid={}, office_id={}, role={:?}",
1001            socket.id, office_id, session.role
1002        );
1003
1004        match Self::validate_join_room(session, office_id, state)? {
1005            JoinRoomDecision::Noop => {
1006                info!("Noop decision for sid={}", socket.id);
1007                Ok(())
1008            }
1009            JoinRoomDecision::Join => {
1010                info!("Joining room '{}' for sid={}", office_id, socket.id);
1011                socket.join(office_id.to_string());
1012                Ok(())
1013            }
1014            JoinRoomDecision::LeaveAndJoin { leave_office } => {
1015                info!(
1016                    "Leaving room '{}' and joining '{}' for sid={}",
1017                    leave_office, office_id, socket.id
1018                );
1019
1020                // 构建离开通知(Python语义:切换房间前需要通知旧房间)
1021                let leave_notification = if session.role == ClientRole::Computer {
1022                    LeaveOfficeNotification {
1023                        office_id: leave_office.clone(),
1024                        computer: Some(session.name.clone()),
1025                        agent: None,
1026                    }
1027                } else {
1028                    LeaveOfficeNotification {
1029                        office_id: leave_office.clone(),
1030                        computer: None,
1031                        agent: Some(session.name.clone()),
1032                    }
1033                };
1034
1035                // 向旧房间广播离开消息
1036                let _ = socket
1037                    .within(leave_office.clone())
1038                    .emit(smcp::events::NOTIFY_LEAVE_OFFICE, &leave_notification)
1039                    .await;
1040
1041                socket.leave(leave_office);
1042                socket.join(office_id.to_string());
1043                Ok(())
1044            }
1045        }
1046    }
1047
1048    fn validate_join_room(
1049        session: &SessionData,
1050        office_id: &str,
1051        state: &ServerState,
1052    ) -> Result<JoinRoomDecision, HandlerError> {
1053        match session.role {
1054            ClientRole::Agent => {
1055                if let Some(current_office) = &session.office_id {
1056                    if current_office != office_id {
1057                        return Err(HandlerError::Session(SessionError::AgentAlreadyInRoom(
1058                            current_office.clone(),
1059                        )));
1060                    }
1061                    warn!(
1062                        "Agent sid: {} already in room: {}. 正在重复加入房间",
1063                        session.sid, current_office
1064                    );
1065                    return Ok(JoinRoomDecision::Noop);
1066                }
1067
1068                if state
1069                    .session_manager
1070                    .has_agent_in_office(&office_id.to_string())
1071                {
1072                    return Err(HandlerError::Session(SessionError::AgentAlreadyExists));
1073                }
1074            }
1075            ClientRole::Computer => {
1076                if let Some(current_office) = &session.office_id {
1077                    if current_office != office_id {
1078                        if state
1079                            .session_manager
1080                            .has_computer_in_office(&office_id.to_string(), &session.name)
1081                        {
1082                            return Err(HandlerError::Session(
1083                                SessionError::ComputerAlreadyExists(
1084                                    session.name.clone(),
1085                                    office_id.to_string(),
1086                                ),
1087                            ));
1088                        }
1089                        return Ok(JoinRoomDecision::LeaveAndJoin {
1090                            leave_office: current_office.clone(),
1091                        });
1092                    }
1093                    warn!(
1094                        "Computer sid: {} already in room: {}. 正在重复加入房间",
1095                        session.sid, current_office
1096                    );
1097                    return Ok(JoinRoomDecision::Noop);
1098                }
1099
1100                if state
1101                    .session_manager
1102                    .has_computer_in_office(&office_id.to_string(), &session.name)
1103                {
1104                    return Err(HandlerError::Session(SessionError::ComputerAlreadyExists(
1105                        session.name.clone(),
1106                        office_id.to_string(),
1107                    )));
1108                }
1109            }
1110        }
1111
1112        Ok(JoinRoomDecision::Join)
1113    }
1114}
1115
1116#[derive(Debug, Clone, PartialEq, Eq)]
1117enum JoinRoomDecision {
1118    Noop,
1119    Join,
1120    LeaveAndJoin { leave_office: String },
1121}
1122
1123#[cfg(test)]
1124mod tests {
1125    use super::*;
1126    use crate::auth::DefaultAuthenticationProvider;
1127    use serde_json;
1128
1129    fn create_test_state() -> ServerState {
1130        let (_layer, io) = SocketIo::builder().build_layer();
1131        ServerState {
1132            session_manager: Arc::new(SessionManager::new()),
1133            auth_provider: Arc::new(DefaultAuthenticationProvider::new(
1134                Some("test_secret".to_string()),
1135                None,
1136            )),
1137            io: Arc::new(io),
1138        }
1139    }
1140
1141    #[tokio::test]
1142    async fn test_agent_join_office() {
1143        let (_layer, io) = SocketIo::builder().build_layer();
1144        let state = ServerState {
1145            session_manager: Arc::new(SessionManager::new()),
1146            auth_provider: Arc::new(DefaultAuthenticationProvider::new(
1147                Some("test_secret".to_string()),
1148                None,
1149            )),
1150            io: Arc::new(io.clone()),
1151        };
1152
1153        // 注册处理器
1154        SmcpHandler::register_handlers(&io, state.clone());
1155
1156        // 测试逻辑需要实际的 Socket.IO 客户端连接
1157        // 这里只做基本的单元测试
1158        assert_eq!(state.session_manager.get_stats().total, 0);
1159    }
1160
1161    #[test]
1162    fn test_handler_error_serialize() {
1163        let err = HandlerError::InvalidRequest("bad".to_string());
1164        let json = serde_json::to_string(&err).unwrap();
1165        assert!(json.contains("Invalid request"));
1166        assert!(json.contains("bad"));
1167    }
1168
1169    #[test]
1170    fn test_validate_join_room_agent_already_in_other_room() {
1171        let state = create_test_state();
1172        let session = SessionData::new("sid1".to_string(), "a".to_string(), ClientRole::Agent)
1173            .with_office_id("office1".to_string());
1174
1175        let err = SmcpHandler::validate_join_room(&session, "office2", &state).unwrap_err();
1176        assert!(matches!(
1177            err,
1178            HandlerError::Session(SessionError::AgentAlreadyInRoom(o)) if o == "office1"
1179        ));
1180    }
1181
1182    #[test]
1183    fn test_validate_join_room_agent_already_exists() {
1184        let state = create_test_state();
1185        let office_id = "office1".to_string();
1186        let existing_agent = SessionData::new(
1187            "sid_agent".to_string(),
1188            "agent1".to_string(),
1189            ClientRole::Agent,
1190        )
1191        .with_office_id(office_id.clone());
1192        state
1193            .session_manager
1194            .register_session(existing_agent)
1195            .unwrap();
1196
1197        let new_agent = SessionData::new(
1198            "sid_new".to_string(),
1199            "agent2".to_string(),
1200            ClientRole::Agent,
1201        );
1202        let err = SmcpHandler::validate_join_room(&new_agent, &office_id, &state).unwrap_err();
1203        assert!(matches!(
1204            err,
1205            HandlerError::Session(SessionError::AgentAlreadyExists)
1206        ));
1207    }
1208
1209    #[test]
1210    fn test_validate_join_room_computer_duplicate_name_in_office() {
1211        let state = create_test_state();
1212        let office_id = "office1".to_string();
1213        let existing = SessionData::new(
1214            "sid_c1".to_string(),
1215            "computer1".to_string(),
1216            ClientRole::Computer,
1217        )
1218        .with_office_id(office_id.clone());
1219        state.session_manager.register_session(existing).unwrap();
1220
1221        let new_same_name = SessionData::new(
1222            "sid_c2".to_string(),
1223            "computer1".to_string(),
1224            ClientRole::Computer,
1225        );
1226        let err = SmcpHandler::validate_join_room(&new_same_name, &office_id, &state).unwrap_err();
1227        assert!(matches!(
1228            err,
1229            HandlerError::Session(SessionError::ComputerAlreadyExists(name, office))
1230                if name == "computer1" && office == "office1"
1231        ));
1232    }
1233
1234    #[test]
1235    fn test_validate_join_room_computer_switch_room() {
1236        let state = create_test_state();
1237        let session = SessionData::new(
1238            "sid_c".to_string(),
1239            "computer1".to_string(),
1240            ClientRole::Computer,
1241        )
1242        .with_office_id("office_old".to_string());
1243
1244        let decision = SmcpHandler::validate_join_room(&session, "office_new", &state).unwrap();
1245        assert_eq!(
1246            decision,
1247            JoinRoomDecision::LeaveAndJoin {
1248                leave_office: "office_old".to_string()
1249            }
1250        );
1251    }
1252
1253    #[test]
1254    fn test_validate_join_room_same_room_noop() {
1255        let state = create_test_state();
1256        let session = SessionData::new("sid".to_string(), "c".to_string(), ClientRole::Computer)
1257            .with_office_id("office1".to_string());
1258
1259        let decision = SmcpHandler::validate_join_room(&session, "office1", &state).unwrap();
1260        assert_eq!(decision, JoinRoomDecision::Noop);
1261    }
1262
1263    #[test]
1264    fn test_validate_join_room_agent_first_time_join() {
1265        let state = create_test_state();
1266        let session = SessionData::new("sid_agent".to_string(), "a".to_string(), ClientRole::Agent);
1267
1268        let decision = SmcpHandler::validate_join_room(&session, "office1", &state).unwrap();
1269        assert_eq!(decision, JoinRoomDecision::Join);
1270    }
1271
1272    #[test]
1273    fn test_validate_join_room_computer_first_time_join() {
1274        let state = create_test_state();
1275        let session = SessionData::new(
1276            "sid_computer".to_string(),
1277            "computer1".to_string(),
1278            ClientRole::Computer,
1279        );
1280
1281        let decision = SmcpHandler::validate_join_room(&session, "office1", &state).unwrap();
1282        assert_eq!(decision, JoinRoomDecision::Join);
1283    }
1284
1285    #[test]
1286    fn test_enter_office_notification_computer() {
1287        let computer_name = "computer1".to_string();
1288        let office_id = "office1".to_string();
1289
1290        let notification = EnterOfficeNotification {
1291            office_id: office_id.clone(),
1292            computer: Some(computer_name.clone()),
1293            agent: None,
1294        };
1295
1296        assert_eq!(notification.office_id, office_id);
1297        assert_eq!(notification.computer, Some(computer_name));
1298        assert_eq!(notification.agent, None);
1299    }
1300
1301    #[test]
1302    fn test_enter_office_notification_agent() {
1303        let agent_name = "agent1".to_string();
1304        let office_id = "office1".to_string();
1305
1306        let notification = EnterOfficeNotification {
1307            office_id: office_id.clone(),
1308            computer: None,
1309            agent: Some(agent_name.clone()),
1310        };
1311
1312        assert_eq!(notification.office_id, office_id);
1313        assert_eq!(notification.computer, None);
1314        assert_eq!(notification.agent, Some(agent_name));
1315    }
1316
1317    #[test]
1318    fn test_leave_office_notification_computer() {
1319        let computer_name = "computer1".to_string();
1320        let office_id = "office1".to_string();
1321
1322        let notification = LeaveOfficeNotification {
1323            office_id: office_id.clone(),
1324            computer: Some(computer_name.clone()),
1325            agent: None,
1326        };
1327
1328        assert_eq!(notification.office_id, office_id);
1329        assert_eq!(notification.computer, Some(computer_name));
1330        assert_eq!(notification.agent, None);
1331    }
1332
1333    #[test]
1334    fn test_update_tool_list_notification() {
1335        let computer_name = "computer1".to_string();
1336
1337        let notification = UpdateToolListNotification {
1338            computer: computer_name.clone(),
1339        };
1340
1341        assert_eq!(notification.computer, computer_name);
1342    }
1343
1344    #[test]
1345    fn test_update_mcp_config_notification() {
1346        let computer_name = "computer1".to_string();
1347
1348        let notification = UpdateMCPConfigNotification {
1349            computer: computer_name.clone(),
1350        };
1351
1352        assert_eq!(notification.computer, computer_name);
1353    }
1354
1355    #[test]
1356    fn test_notification_serialization() {
1357        // 验证通知类型序列化正确性
1358        let tool_list_notification = UpdateToolListNotification {
1359            computer: "computer1".to_string(),
1360        };
1361
1362        let json = serde_json::to_string(&tool_list_notification).unwrap();
1363        assert!(json.contains("\"computer\":\"computer1\""));
1364
1365        let mcp_config_notification = UpdateMCPConfigNotification {
1366            computer: "computer1".to_string(),
1367        };
1368
1369        let json = serde_json::to_string(&mcp_config_notification).unwrap();
1370        assert!(json.contains("\"computer\":\"computer1\""));
1371    }
1372}