1use crate::auth::{AuthError, AuthenticationProvider};
4use crate::session::{ClientRole, SessionData, SessionError, SessionManager};
5use futures_util::StreamExt;
6use serde_json::Value;
7use smcp::*;
8use socketioxide::{
9 extract::{AckSender, Data, SocketRef},
10 SocketIo,
11};
12use std::sync::Arc;
13use thiserror::Error;
14use tracing::{error, info, warn};
15
16#[derive(Error, Debug)]
18pub enum HandlerError {
19 #[error("Authentication error: {0}")]
20 Auth(#[from] AuthError),
21 #[error("Session error: {0}")]
22 Session(#[from] SessionError),
23 #[error("JSON error: {0}")]
24 Json(#[from] serde_json::Error),
25 #[error("Timeout error: {0}")]
26 Timeout(String),
27 #[error("Invalid request: {0}")]
28 InvalidRequest(String),
29}
30
31impl HandlerError {
32 pub fn error_code(&self) -> i32 {
34 match self {
35 HandlerError::Auth(_) => smcp::error_codes::UNAUTHORIZED,
36 HandlerError::Session(e) => e.error_code(),
37 HandlerError::Json(_) => smcp::error_codes::BAD_REQUEST,
38 HandlerError::Timeout(_) => smcp::error_codes::TIMEOUT,
39 HandlerError::InvalidRequest(_) => smcp::error_codes::BAD_REQUEST,
40 }
41 }
42
43 pub fn to_error_response(&self) -> smcp::ErrorResponse {
45 smcp::ErrorResponse::new(self.error_code(), self.to_string())
46 }
47}
48
49impl serde::Serialize for HandlerError {
50 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
51 where
52 S: serde::Serializer,
53 {
54 self.to_error_response().serialize(serializer)
56 }
57}
58
59#[derive(Clone, Debug)]
61pub struct ServerState {
62 pub session_manager: Arc<SessionManager>,
64 pub auth_provider: Arc<dyn AuthenticationProvider>,
66 pub io: Arc<SocketIo>,
68}
69
70pub struct SmcpHandler;
72
73impl SmcpHandler {
74 pub fn register_handlers(io: &SocketIo, state: ServerState) {
76 io.ns(SMCP_NAMESPACE, move |socket: SocketRef| {
78 let state = state.clone();
79 async move {
80 if let Err(e) = Self::on_connect(socket.clone(), &state).await {
81 error!("on_connect failed: {}", e);
82 return;
83 }
84
85 Self::handle_connection(socket, state)
87 }
88 });
89 }
90
91 fn handle_connection(socket: SocketRef, state: ServerState) {
93 socket.on_disconnect({
95 let state = state.clone();
96 move |socket: SocketRef| {
97 let state = state.clone();
98 async move { Self::on_disconnect(socket, state).await }
99 }
100 });
101
102 let state_join = state.clone();
103 socket.on(
104 smcp::events::SERVER_JOIN_OFFICE,
105 move |socket: SocketRef, Data::<EnterOfficeReq>(data), ack: AckSender| async move {
106 let result = Self::on_server_join_office(socket, data, state_join.clone()).await;
107 let _ = ack.send(&result);
108 },
109 );
110
111 let state_leave = state.clone();
112 socket.on(
113 smcp::events::SERVER_LEAVE_OFFICE,
114 move |socket: SocketRef, Data::<LeaveOfficeReq>(data), ack: AckSender| async move {
115 let result = Self::on_server_leave_office(socket, data, state_leave.clone()).await;
116 let _ = ack.send(&result);
117 },
118 );
119
120 let state_tool_call_cancel = state.clone();
121 socket.on(
122 smcp::events::SERVER_TOOL_CALL_CANCEL,
123 move |socket: SocketRef, Data::<AgentCallData>(data)| async move {
124 Self::on_server_tool_call_cancel(socket, data, state_tool_call_cancel.clone()).await
125 },
126 );
127
128 let state_update_config = state.clone();
129 socket.on(
130 smcp::events::SERVER_UPDATE_CONFIG,
131 move |socket: SocketRef, Data::<UpdateComputerConfigReq>(data)| async move {
132 Self::on_server_update_config(socket, data, state_update_config.clone()).await
133 },
134 );
135
136 let state_update_tool_list = state.clone();
137 socket.on(
138 smcp::events::SERVER_UPDATE_TOOL_LIST,
139 move |socket: SocketRef, Data::<UpdateComputerConfigReq>(data)| async move {
140 Self::on_server_update_tool_list(socket, data, state_update_tool_list.clone()).await
141 },
142 );
143
144 let state_tool_call = state.clone();
145 socket.on(
146 smcp::events::CLIENT_TOOL_CALL,
147 move |socket: SocketRef, Data::<ToolCallReq>(data), ack: AckSender| async move {
148 let result = Self::on_client_tool_call(socket, data, state_tool_call.clone()).await;
149 let _ = ack.send(&result);
150 },
151 );
152
153 let state_get_tools = state.clone();
154 socket.on(
155 smcp::events::CLIENT_GET_TOOLS,
156 move |socket: SocketRef, Data::<GetToolsReq>(data), ack: AckSender| async move {
157 let result = Self::on_client_get_tools(socket, data, state_get_tools.clone()).await;
158 let _ = ack.send(&result);
159 },
160 );
161
162 let state_get_desktop = state.clone();
163 socket.on(
164 smcp::events::CLIENT_GET_DESKTOP,
165 move |socket: SocketRef, Data::<GetDesktopReq>(data), ack: AckSender| async move {
166 let result =
167 Self::on_client_get_desktop(socket, data, state_get_desktop.clone()).await;
168 let _ = ack.send(&result);
169 },
170 );
171
172 let state_get_config = state.clone();
173 socket.on(
174 smcp::events::CLIENT_GET_CONFIG,
175 move |socket: SocketRef, Data::<GetComputerConfigReq>(data), ack: AckSender| async move {
176 let result = Self::on_client_get_config(socket, data, state_get_config.clone()).await;
177 let _ = ack.send(&result);
178 },
179 );
180
181 let state_update_desktop = state.clone();
182 socket.on(
183 smcp::events::SERVER_UPDATE_DESKTOP,
184 move |socket: SocketRef, Data::<UpdateComputerConfigReq>(data)| async move {
185 Self::on_server_update_desktop(socket, data, state_update_desktop.clone()).await
186 },
187 );
188
189 let state_list_room = state.clone();
190 socket.on(
191 smcp::events::SERVER_LIST_ROOM,
192 move |socket: SocketRef, Data::<ListRoomReq>(data), ack: AckSender| async move {
193 let result = Self::on_server_list_room(socket, data, state_list_room.clone()).await;
194 let _ = ack.send(&result);
195 },
196 );
197 }
198
199 async fn on_connect(socket: SocketRef, state: &ServerState) -> Result<(), HandlerError> {
201 info!(
202 "SocketIO Client {} connecting to {}...",
203 socket.id, SMCP_NAMESPACE
204 );
205
206 let headers = socket.req_parts().headers.clone();
208 let auth_data = socket.req_parts().extensions.get::<Value>();
209
210 state
212 .auth_provider
213 .authenticate(&headers, auth_data)
214 .await?;
215
216 info!(
217 "SocketIO Client {} connected successfully to {}",
218 socket.id, SMCP_NAMESPACE
219 );
220 Ok(())
221 }
222
223 async fn on_disconnect(socket: SocketRef, state: ServerState) {
225 info!(
226 "SocketIO Client {} disconnecting from {}...",
227 socket.id, SMCP_NAMESPACE
228 );
229
230 let sid = socket.id.to_string();
232 if let Some(session) = state.session_manager.unregister_session(&sid) {
233 if let Some(office_id) = session.office_id {
235 let notification = if session.role == ClientRole::Computer {
236 LeaveOfficeNotification {
237 office_id: office_id.clone(),
238 computer: Some(session.name),
239 agent: None,
240 }
241 } else {
242 LeaveOfficeNotification {
243 office_id: office_id.clone(),
244 computer: None,
245 agent: Some(session.name),
246 }
247 };
248
249 let _ = socket
250 .within(office_id)
251 .emit(smcp::events::NOTIFY_LEAVE_OFFICE, ¬ification)
252 .await;
253 }
254 }
255
256 info!(
257 "SocketIO Client {} disconnected from {}",
258 socket.id, SMCP_NAMESPACE
259 );
260 }
261
262 async fn on_server_join_office(
264 socket: SocketRef,
265 data: EnterOfficeReq,
266 state: ServerState,
267 ) -> (bool, Option<String>) {
268 info!("on_server_join_office called with data: {:?}", data);
269
270 let sid = socket.id.to_string();
271 let requested_role = ClientRole::from(data.role.clone());
272 let requested_name = data.name.clone();
273
274 let session = match state.session_manager.get_session(&sid) {
276 Some(s) => {
277 if s.role != requested_role {
279 return (
280 false,
281 Some(format!(
282 "Role mismatch: existing session has role {:?}, but requested {:?}",
283 s.role, requested_role
284 )),
285 );
286 }
287
288 if s.name != requested_name {
289 return (
290 false,
291 Some(format!(
292 "Name mismatch: existing session has name '{}', but requested '{}'",
293 s.name, requested_name
294 )),
295 );
296 }
297
298 s
299 }
300 None => {
301 let new_session = SessionData::new(sid.clone(), requested_name, requested_role);
303
304 if let Err(e) = state.session_manager.register_session(new_session.clone()) {
305 return (false, Some(format!("Failed to register session: {}", e)));
306 }
307 new_session
308 }
309 };
310
311 if let Err(e) =
313 Self::handle_join_room(socket.clone(), &session, &data.office_id, &state).await
314 {
315 error!("handle_join_room failed: {}", e);
316 return (false, Some(format!("Failed to join room: {}", e)));
317 }
318
319 if let Err(e) = state
321 .session_manager
322 .update_office_id(&sid, Some(data.office_id.clone()))
323 {
324 return (false, Some(format!("Failed to update office_id: {}", e)));
325 }
326
327 let session_name = session.name.clone();
329 let notification_data = if session.role == ClientRole::Computer {
330 EnterOfficeNotification {
331 office_id: data.office_id.clone(),
332 computer: Some(session_name.clone()),
333 agent: None,
334 }
335 } else {
336 EnterOfficeNotification {
337 office_id: data.office_id.clone(),
338 computer: None,
339 agent: Some(session_name.clone()),
340 }
341 };
342
343 let result = socket
344 .to(data.office_id.clone())
345 .emit(smcp::events::NOTIFY_ENTER_OFFICE, ¬ification_data)
346 .await;
347
348 if let Err(e) = result {
349 warn!("Failed to broadcast NOTIFY_ENTER_OFFICE: {}", e);
350 }
351
352 (true, None)
353 }
354
355 async fn on_server_leave_office(
357 socket: SocketRef,
358 data: LeaveOfficeReq,
359 state: ServerState,
360 ) -> (bool, Option<String>) {
361 let sid = socket.id.to_string();
362
363 let session = match state.session_manager.get_session(&sid) {
365 Some(s) => s,
366 None => return (false, Some(format!("Session not found: {}", sid))),
367 };
368
369 let notification = if session.role == ClientRole::Computer {
371 LeaveOfficeNotification {
372 office_id: data.office_id.clone(),
373 computer: Some(session.name),
374 agent: None,
375 }
376 } else {
377 LeaveOfficeNotification {
378 office_id: data.office_id.clone(),
379 computer: None,
380 agent: Some(session.name),
381 }
382 };
383
384 let _ = socket
386 .within(data.office_id.clone())
387 .emit(smcp::events::NOTIFY_LEAVE_OFFICE, ¬ification)
388 .await;
389
390 if let Err(e) = state.session_manager.update_office_id(&sid, None) {
392 return (false, Some(format!("Failed to update office_id: {}", e)));
393 }
394 socket.leave(data.office_id.clone());
395
396 (true, None)
397 }
398
399 async fn on_server_tool_call_cancel(
401 socket: SocketRef,
402 data: AgentCallData,
403 state: ServerState,
404 ) {
405 let sid = socket.id.to_string();
406 let session = match state.session_manager.get_session(&sid) {
407 Some(s) => s,
408 None => {
409 warn!("SERVER_TOOL_CALL_CANCEL from unknown session sid={}", sid);
410 return;
411 }
412 };
413
414 if session.role != ClientRole::Agent {
419 warn!(
420 "SERVER_TOOL_CALL_CANCEL role mismatch: expected Agent, got {:?}, sid={}",
421 session.role, sid
422 );
423 return;
424 }
425
426 let office_id = match session.office_id {
427 Some(ref office_id) => office_id.clone(),
428 None => {
429 warn!(
430 "SERVER_TOOL_CALL_CANCEL but session not in office, sid={}",
431 sid
432 );
433 return;
434 }
435 };
436
437 if let Err(e) = socket
438 .to(office_id)
439 .emit(smcp::events::NOTIFY_TOOL_CALL_CANCEL, &data)
440 .await
441 {
442 warn!("Failed to broadcast NOTIFY_TOOL_CALL_CANCEL: {}", e);
443 }
444 }
445
446 async fn on_server_update_config(
448 socket: SocketRef,
449 data: UpdateComputerConfigReq,
450 state: ServerState,
451 ) {
452 let sid = socket.id.to_string();
453 let session = match state.session_manager.get_session(&sid) {
454 Some(s) => s,
455 None => {
456 warn!("SERVER_UPDATE_CONFIG from unknown session sid={}", sid);
457 return;
458 }
459 };
460
461 if session.role != ClientRole::Computer {
463 warn!(
464 "SERVER_UPDATE_CONFIG role mismatch: expected Computer, got {:?}, sid={}",
465 session.role, sid
466 );
467 return;
468 }
469
470 let office_id = match session.office_id {
471 Some(ref office_id) => office_id.clone(),
472 None => {
473 warn!(
474 "SERVER_UPDATE_CONFIG but session not in office, sid={}",
475 sid
476 );
477 return;
478 }
479 };
480
481 let notification = UpdateMCPConfigNotification {
483 computer: data.computer.clone(),
484 };
485
486 let office_id_clone = office_id.clone();
487 let computer_clone = data.computer.clone();
488 info!(
489 "Broadcasting NOTIFY_UPDATE_CONFIG to room '{}' from computer '{}' (sid: {})",
490 office_id_clone, computer_clone, sid
491 );
492
493 if let Err(e) = socket
494 .to(office_id.clone())
495 .emit(smcp::events::NOTIFY_UPDATE_CONFIG, ¬ification)
496 .await
497 {
498 warn!("Failed to broadcast NOTIFY_UPDATE_CONFIG: {}", e);
499 } else {
500 info!(
501 "Successfully broadcasted NOTIFY_UPDATE_CONFIG to room '{}'",
502 office_id
503 );
504 }
505 }
506
507 async fn on_server_update_tool_list(
509 socket: SocketRef,
510 data: UpdateComputerConfigReq,
511 state: ServerState,
512 ) {
513 let sid = socket.id.to_string();
514 let session = match state.session_manager.get_session(&sid) {
515 Some(s) => s,
516 None => {
517 warn!("SERVER_UPDATE_TOOL_LIST from unknown session sid={}", sid);
518 return;
519 }
520 };
521
522 if session.role != ClientRole::Computer {
524 warn!(
525 "SERVER_UPDATE_TOOL_LIST role mismatch: expected Computer, got {:?}, sid={}",
526 session.role, sid
527 );
528 return;
529 }
530
531 let office_id = match session.office_id {
532 Some(ref office_id) => office_id.clone(),
533 None => {
534 warn!(
535 "SERVER_UPDATE_TOOL_LIST but session not in office, sid={}",
536 sid
537 );
538 return;
539 }
540 };
541
542 let notification = UpdateToolListNotification {
544 computer: data.computer,
545 };
546
547 if let Err(e) = socket
548 .to(office_id)
549 .emit(smcp::events::NOTIFY_UPDATE_TOOL_LIST, ¬ification)
550 .await
551 {
552 warn!("Failed to broadcast NOTIFY_UPDATE_TOOL_LIST: {}", e);
553 }
554 }
555
556 async fn on_client_tool_call(
558 socket: SocketRef,
559 data: ToolCallReq,
560 state: ServerState,
561 ) -> Result<Value, HandlerError> {
562 let sid = socket.id.to_string();
564 let session = state
565 .session_manager
566 .get_session(&sid)
567 .ok_or_else(|| HandlerError::Session(SessionError::NotFound(sid.clone())))?;
568
569 if session.role != ClientRole::Agent {
571 return Err(HandlerError::InvalidRequest(
572 "Only agents can make tool calls".to_string(),
573 ));
574 }
575
576 let office_id = session.office_id.ok_or_else(|| {
578 HandlerError::InvalidRequest(
579 "Agent must be in an office to make tool calls".to_string(),
580 )
581 })?;
582
583 let computer_sid = state
585 .session_manager
586 .get_computer_sid_in_office(&office_id, &data.computer)
587 .ok_or_else(|| {
588 HandlerError::InvalidRequest(format!(
589 "Computer '{}' not found in office",
590 data.computer
591 ))
592 })?;
593
594 let target_socket = state
596 .io
597 .of(SMCP_NAMESPACE)
598 .and_then(|op| op.get_socket(computer_sid.parse().unwrap()))
599 .ok_or_else(|| {
600 HandlerError::InvalidRequest("Target computer socket not found".to_string())
601 })?;
602
603 let timeout = tokio::time::Duration::from_secs(30);
605 let ack_result = target_socket.emit_with_ack(smcp::events::CLIENT_TOOL_CALL, &data);
606
607 match tokio::time::timeout(timeout, async move {
608 match ack_result {
609 Ok(stream) => {
610 let mut pinned = Box::pin(stream);
611 match pinned.next().await {
612 Some((_, response)) => response,
613 None => Ok(serde_json::Value::Null),
614 }
615 }
616 Err(_) => Ok(serde_json::Value::Null),
617 }
618 })
619 .await
620 {
621 Ok(Ok(response)) => {
622 match response {
624 serde_json::Value::Object(mut map) => {
625 let result = map.remove("result").unwrap_or(serde_json::Value::Null);
627 Ok(result)
628 }
629 _ => Ok(response),
630 }
631 }
632 Ok(Err(e)) => Err(HandlerError::Timeout(format!(
633 "Failed to get response from computer: {}",
634 e
635 ))),
636 Err(_) => Err(HandlerError::Timeout(
637 "Tool call timed out after 30 seconds".to_string(),
638 )),
639 }
640 }
641
642 async fn on_client_get_tools(
644 socket: SocketRef,
645 data: GetToolsReq,
646 state: ServerState,
647 ) -> Result<GetToolsRet, HandlerError> {
648 let sid = socket.id.to_string();
650 let session = state
651 .session_manager
652 .get_session(&sid)
653 .ok_or_else(|| HandlerError::Session(SessionError::NotFound(sid.clone())))?;
654
655 if session.role != ClientRole::Agent {
657 return Err(HandlerError::InvalidRequest(
658 "Only agents can get tools".to_string(),
659 ));
660 }
661
662 let office_id = session.office_id.ok_or_else(|| {
664 HandlerError::InvalidRequest("Agent must be in an office to get tools".to_string())
665 })?;
666
667 let computer_sid = state
669 .session_manager
670 .get_computer_sid_in_office(&office_id, &data.computer)
671 .ok_or_else(|| {
672 HandlerError::InvalidRequest(format!(
673 "Computer '{}' not found in office",
674 data.computer
675 ))
676 })?;
677
678 let target_socket = state
680 .io
681 .of(SMCP_NAMESPACE)
682 .and_then(|op| op.get_socket(computer_sid.parse().unwrap()))
683 .ok_or_else(|| {
684 HandlerError::InvalidRequest("Target computer socket not found".to_string())
685 })?;
686
687 let timeout = tokio::time::Duration::from_secs(30);
689 let ack_result = target_socket.emit_with_ack(smcp::events::CLIENT_GET_TOOLS, &data);
690
691 match tokio::time::timeout(timeout, async move {
692 match ack_result {
693 Ok(stream) => {
694 let mut pinned = Box::pin(stream);
695 match pinned.next().await {
696 Some((_, response)) => response,
697 None => Ok(serde_json::Value::Null),
698 }
699 }
700 Err(_) => Ok(serde_json::Value::Null),
701 }
702 })
703 .await
704 {
705 Ok(Ok(response)) => {
706 serde_json::from_value(response).map_err(|e| {
708 HandlerError::InvalidRequest(format!("Failed to parse response: {}", e))
709 })
710 }
711 Ok(Err(e)) => Err(HandlerError::Timeout(format!(
712 "Failed to get response from computer: {}",
713 e
714 ))),
715 Err(_) => Err(HandlerError::Timeout(
716 "Get tools timed out after 30 seconds".to_string(),
717 )),
718 }
719 }
720
721 async fn on_client_get_desktop(
723 socket: SocketRef,
724 data: GetDesktopReq,
725 state: ServerState,
726 ) -> Result<GetDesktopRet, HandlerError> {
727 let sid = socket.id.to_string();
729 let session = state
730 .session_manager
731 .get_session(&sid)
732 .ok_or_else(|| HandlerError::Session(SessionError::NotFound(sid.clone())))?;
733
734 if session.role != ClientRole::Agent {
736 return Err(HandlerError::InvalidRequest(
737 "Only agents can get desktop".to_string(),
738 ));
739 }
740
741 let office_id = session.office_id.ok_or_else(|| {
743 HandlerError::InvalidRequest("Agent must be in an office to get desktop".to_string())
744 })?;
745
746 let computer_sid = state
748 .session_manager
749 .get_computer_sid_in_office(&office_id, &data.computer)
750 .ok_or_else(|| {
751 HandlerError::InvalidRequest(format!(
752 "Computer '{}' not found in office",
753 data.computer
754 ))
755 })?;
756
757 let target_socket = state
759 .io
760 .of(SMCP_NAMESPACE)
761 .and_then(|op| op.get_socket(computer_sid.parse().unwrap()))
762 .ok_or_else(|| {
763 HandlerError::InvalidRequest("Target computer socket not found".to_string())
764 })?;
765
766 let timeout = tokio::time::Duration::from_secs(30);
768 let ack_result = target_socket.emit_with_ack(smcp::events::CLIENT_GET_DESKTOP, &data);
769
770 match tokio::time::timeout(timeout, async move {
771 match ack_result {
772 Ok(stream) => {
773 let mut pinned = Box::pin(stream);
774 match pinned.next().await {
775 Some((_, response)) => response,
776 None => Ok(serde_json::Value::Null),
777 }
778 }
779 Err(_) => Ok(serde_json::Value::Null),
780 }
781 })
782 .await
783 {
784 Ok(Ok(response)) => {
785 serde_json::from_value(response).map_err(|e| {
787 HandlerError::InvalidRequest(format!("Failed to parse response: {}", e))
788 })
789 }
790 Ok(Err(e)) => Err(HandlerError::Timeout(format!(
791 "Failed to get response from computer: {}",
792 e
793 ))),
794 Err(_) => Err(HandlerError::Timeout(
795 "Get desktop timed out after 30 seconds".to_string(),
796 )),
797 }
798 }
799
800 async fn on_client_get_config(
802 socket: SocketRef,
803 data: GetComputerConfigReq,
804 state: ServerState,
805 ) -> Result<GetComputerConfigRet, HandlerError> {
806 let sid = socket.id.to_string();
808 let session = state
809 .session_manager
810 .get_session(&sid)
811 .ok_or_else(|| HandlerError::Session(SessionError::NotFound(sid.clone())))?;
812
813 if session.role != ClientRole::Agent {
815 return Err(HandlerError::InvalidRequest(
816 "Only agents can get config".to_string(),
817 ));
818 }
819
820 let office_id = session.office_id.ok_or_else(|| {
822 HandlerError::InvalidRequest("Agent must be in an office to get config".to_string())
823 })?;
824
825 let computer_sid = state
827 .session_manager
828 .get_computer_sid_in_office(&office_id, &data.computer)
829 .ok_or_else(|| {
830 HandlerError::InvalidRequest(format!(
831 "Computer '{}' not found in office",
832 data.computer
833 ))
834 })?;
835
836 let target_socket = state
838 .io
839 .of(SMCP_NAMESPACE)
840 .and_then(|op| op.get_socket(computer_sid.parse().unwrap()))
841 .ok_or_else(|| {
842 HandlerError::InvalidRequest("Target computer socket not found".to_string())
843 })?;
844
845 let timeout = tokio::time::Duration::from_secs(30);
847 let ack_result = target_socket.emit_with_ack(smcp::events::CLIENT_GET_CONFIG, &data);
848
849 match tokio::time::timeout(timeout, async move {
850 match ack_result {
851 Ok(stream) => {
852 let mut pinned = Box::pin(stream);
853 match pinned.next().await {
854 Some((_, response)) => response,
855 None => Ok(serde_json::Value::Null),
856 }
857 }
858 Err(_) => Ok(serde_json::Value::Null),
859 }
860 })
861 .await
862 {
863 Ok(Ok(response)) => {
864 serde_json::from_value(response).map_err(|e| {
866 HandlerError::InvalidRequest(format!("Failed to parse response: {}", e))
867 })
868 }
869 Ok(Err(e)) => Err(HandlerError::Timeout(format!(
870 "Failed to get response from computer: {}",
871 e
872 ))),
873 Err(_) => Err(HandlerError::Timeout(
874 "Get config timed out after 30 seconds".to_string(),
875 )),
876 }
877 }
878
879 async fn on_server_update_desktop(
881 socket: SocketRef,
882 data: UpdateComputerConfigReq,
883 state: ServerState,
884 ) {
885 let sid = socket.id.to_string();
886 let session = match state.session_manager.get_session(&sid) {
887 Some(s) => s,
888 None => {
889 warn!("SERVER_UPDATE_DESKTOP from unknown session sid={}", sid);
890 return;
891 }
892 };
893
894 if session.role != ClientRole::Computer {
896 warn!(
897 "SERVER_UPDATE_DESKTOP role mismatch: expected Computer, got {:?}, sid={}",
898 session.role, sid
899 );
900 return;
901 }
902
903 let office_id = match session.office_id {
904 Some(ref office_id) => office_id.clone(),
905 None => {
906 warn!(
907 "SERVER_UPDATE_DESKTOP but session not in office, sid={}",
908 sid
909 );
910 return;
911 }
912 };
913
914 let notification = UpdateMCPConfigNotification {
916 computer: data.computer,
917 };
918
919 if let Err(e) = socket
920 .to(office_id)
921 .emit(smcp::events::NOTIFY_UPDATE_DESKTOP, ¬ification)
922 .await
923 {
924 warn!("Failed to broadcast NOTIFY_UPDATE_DESKTOP: {}", e);
925 }
926 }
927
928 async fn on_server_list_room(
930 socket: SocketRef,
931 data: ListRoomReq,
932 state: ServerState,
933 ) -> ListRoomRet {
934 let sid = socket.id.to_string();
936 let session = match state.session_manager.get_session(&sid) {
937 Some(s) => s,
938 None => {
939 warn!("List room from unknown session sid={}", sid);
940 return ListRoomRet {
941 sessions: vec![],
942 req_id: data.base.req_id,
943 };
944 }
945 };
946
947 let session_office_id = match session.office_id {
949 Some(id) => id,
950 None => {
951 warn!("Session {} not in any office", sid);
952 return ListRoomRet {
953 sessions: vec![],
954 req_id: data.base.req_id,
955 };
956 }
957 };
958
959 if session_office_id != data.office_id {
960 warn!(
961 "Session {} trying to list room {} but in office {}",
962 sid, data.office_id, session_office_id
963 );
964 return ListRoomRet {
965 sessions: vec![],
966 req_id: data.base.req_id,
967 };
968 }
969
970 let sessions = state
972 .session_manager
973 .get_sessions_in_office(&data.office_id);
974
975 let session_infos: Vec<SessionInfo> = sessions
977 .into_iter()
978 .map(|s| SessionInfo {
979 sid: s.sid,
980 name: s.name,
981 role: s.role.into(),
982 office_id: s.office_id.unwrap_or_default(),
983 })
984 .collect();
985
986 ListRoomRet {
987 sessions: session_infos,
988 req_id: data.base.req_id,
989 }
990 }
991
992 async fn handle_join_room(
994 socket: SocketRef,
995 session: &SessionData,
996 office_id: &str,
997 state: &ServerState,
998 ) -> Result<(), HandlerError> {
999 info!(
1000 "handle_join_room called: sid={}, office_id={}, role={:?}",
1001 socket.id, office_id, session.role
1002 );
1003
1004 match Self::validate_join_room(session, office_id, state)? {
1005 JoinRoomDecision::Noop => {
1006 info!("Noop decision for sid={}", socket.id);
1007 Ok(())
1008 }
1009 JoinRoomDecision::Join => {
1010 info!("Joining room '{}' for sid={}", office_id, socket.id);
1011 socket.join(office_id.to_string());
1012 Ok(())
1013 }
1014 JoinRoomDecision::LeaveAndJoin { leave_office } => {
1015 info!(
1016 "Leaving room '{}' and joining '{}' for sid={}",
1017 leave_office, office_id, socket.id
1018 );
1019
1020 let leave_notification = if session.role == ClientRole::Computer {
1022 LeaveOfficeNotification {
1023 office_id: leave_office.clone(),
1024 computer: Some(session.name.clone()),
1025 agent: None,
1026 }
1027 } else {
1028 LeaveOfficeNotification {
1029 office_id: leave_office.clone(),
1030 computer: None,
1031 agent: Some(session.name.clone()),
1032 }
1033 };
1034
1035 let _ = socket
1037 .within(leave_office.clone())
1038 .emit(smcp::events::NOTIFY_LEAVE_OFFICE, &leave_notification)
1039 .await;
1040
1041 socket.leave(leave_office);
1042 socket.join(office_id.to_string());
1043 Ok(())
1044 }
1045 }
1046 }
1047
1048 fn validate_join_room(
1049 session: &SessionData,
1050 office_id: &str,
1051 state: &ServerState,
1052 ) -> Result<JoinRoomDecision, HandlerError> {
1053 match session.role {
1054 ClientRole::Agent => {
1055 if let Some(current_office) = &session.office_id {
1056 if current_office != office_id {
1057 return Err(HandlerError::Session(SessionError::AgentAlreadyInRoom(
1058 current_office.clone(),
1059 )));
1060 }
1061 warn!(
1062 "Agent sid: {} already in room: {}. 正在重复加入房间",
1063 session.sid, current_office
1064 );
1065 return Ok(JoinRoomDecision::Noop);
1066 }
1067
1068 if state
1069 .session_manager
1070 .has_agent_in_office(&office_id.to_string())
1071 {
1072 return Err(HandlerError::Session(SessionError::AgentAlreadyExists));
1073 }
1074 }
1075 ClientRole::Computer => {
1076 if let Some(current_office) = &session.office_id {
1077 if current_office != office_id {
1078 if state
1079 .session_manager
1080 .has_computer_in_office(&office_id.to_string(), &session.name)
1081 {
1082 return Err(HandlerError::Session(
1083 SessionError::ComputerAlreadyExists(
1084 session.name.clone(),
1085 office_id.to_string(),
1086 ),
1087 ));
1088 }
1089 return Ok(JoinRoomDecision::LeaveAndJoin {
1090 leave_office: current_office.clone(),
1091 });
1092 }
1093 warn!(
1094 "Computer sid: {} already in room: {}. 正在重复加入房间",
1095 session.sid, current_office
1096 );
1097 return Ok(JoinRoomDecision::Noop);
1098 }
1099
1100 if state
1101 .session_manager
1102 .has_computer_in_office(&office_id.to_string(), &session.name)
1103 {
1104 return Err(HandlerError::Session(SessionError::ComputerAlreadyExists(
1105 session.name.clone(),
1106 office_id.to_string(),
1107 )));
1108 }
1109 }
1110 }
1111
1112 Ok(JoinRoomDecision::Join)
1113 }
1114}
1115
1116#[derive(Debug, Clone, PartialEq, Eq)]
1117enum JoinRoomDecision {
1118 Noop,
1119 Join,
1120 LeaveAndJoin { leave_office: String },
1121}
1122
1123#[cfg(test)]
1124mod tests {
1125 use super::*;
1126 use crate::auth::DefaultAuthenticationProvider;
1127 use serde_json;
1128
1129 fn create_test_state() -> ServerState {
1130 let (_layer, io) = SocketIo::builder().build_layer();
1131 ServerState {
1132 session_manager: Arc::new(SessionManager::new()),
1133 auth_provider: Arc::new(DefaultAuthenticationProvider::new(
1134 Some("test_secret".to_string()),
1135 None,
1136 )),
1137 io: Arc::new(io),
1138 }
1139 }
1140
1141 #[tokio::test]
1142 async fn test_agent_join_office() {
1143 let (_layer, io) = SocketIo::builder().build_layer();
1144 let state = ServerState {
1145 session_manager: Arc::new(SessionManager::new()),
1146 auth_provider: Arc::new(DefaultAuthenticationProvider::new(
1147 Some("test_secret".to_string()),
1148 None,
1149 )),
1150 io: Arc::new(io.clone()),
1151 };
1152
1153 SmcpHandler::register_handlers(&io, state.clone());
1155
1156 assert_eq!(state.session_manager.get_stats().total, 0);
1159 }
1160
1161 #[test]
1162 fn test_handler_error_serialize() {
1163 let err = HandlerError::InvalidRequest("bad".to_string());
1164 let json = serde_json::to_string(&err).unwrap();
1165 assert!(json.contains("Invalid request"));
1166 assert!(json.contains("bad"));
1167 }
1168
1169 #[test]
1170 fn test_validate_join_room_agent_already_in_other_room() {
1171 let state = create_test_state();
1172 let session = SessionData::new("sid1".to_string(), "a".to_string(), ClientRole::Agent)
1173 .with_office_id("office1".to_string());
1174
1175 let err = SmcpHandler::validate_join_room(&session, "office2", &state).unwrap_err();
1176 assert!(matches!(
1177 err,
1178 HandlerError::Session(SessionError::AgentAlreadyInRoom(o)) if o == "office1"
1179 ));
1180 }
1181
1182 #[test]
1183 fn test_validate_join_room_agent_already_exists() {
1184 let state = create_test_state();
1185 let office_id = "office1".to_string();
1186 let existing_agent = SessionData::new(
1187 "sid_agent".to_string(),
1188 "agent1".to_string(),
1189 ClientRole::Agent,
1190 )
1191 .with_office_id(office_id.clone());
1192 state
1193 .session_manager
1194 .register_session(existing_agent)
1195 .unwrap();
1196
1197 let new_agent = SessionData::new(
1198 "sid_new".to_string(),
1199 "agent2".to_string(),
1200 ClientRole::Agent,
1201 );
1202 let err = SmcpHandler::validate_join_room(&new_agent, &office_id, &state).unwrap_err();
1203 assert!(matches!(
1204 err,
1205 HandlerError::Session(SessionError::AgentAlreadyExists)
1206 ));
1207 }
1208
1209 #[test]
1210 fn test_validate_join_room_computer_duplicate_name_in_office() {
1211 let state = create_test_state();
1212 let office_id = "office1".to_string();
1213 let existing = SessionData::new(
1214 "sid_c1".to_string(),
1215 "computer1".to_string(),
1216 ClientRole::Computer,
1217 )
1218 .with_office_id(office_id.clone());
1219 state.session_manager.register_session(existing).unwrap();
1220
1221 let new_same_name = SessionData::new(
1222 "sid_c2".to_string(),
1223 "computer1".to_string(),
1224 ClientRole::Computer,
1225 );
1226 let err = SmcpHandler::validate_join_room(&new_same_name, &office_id, &state).unwrap_err();
1227 assert!(matches!(
1228 err,
1229 HandlerError::Session(SessionError::ComputerAlreadyExists(name, office))
1230 if name == "computer1" && office == "office1"
1231 ));
1232 }
1233
1234 #[test]
1235 fn test_validate_join_room_computer_switch_room() {
1236 let state = create_test_state();
1237 let session = SessionData::new(
1238 "sid_c".to_string(),
1239 "computer1".to_string(),
1240 ClientRole::Computer,
1241 )
1242 .with_office_id("office_old".to_string());
1243
1244 let decision = SmcpHandler::validate_join_room(&session, "office_new", &state).unwrap();
1245 assert_eq!(
1246 decision,
1247 JoinRoomDecision::LeaveAndJoin {
1248 leave_office: "office_old".to_string()
1249 }
1250 );
1251 }
1252
1253 #[test]
1254 fn test_validate_join_room_same_room_noop() {
1255 let state = create_test_state();
1256 let session = SessionData::new("sid".to_string(), "c".to_string(), ClientRole::Computer)
1257 .with_office_id("office1".to_string());
1258
1259 let decision = SmcpHandler::validate_join_room(&session, "office1", &state).unwrap();
1260 assert_eq!(decision, JoinRoomDecision::Noop);
1261 }
1262
1263 #[test]
1264 fn test_validate_join_room_agent_first_time_join() {
1265 let state = create_test_state();
1266 let session = SessionData::new("sid_agent".to_string(), "a".to_string(), ClientRole::Agent);
1267
1268 let decision = SmcpHandler::validate_join_room(&session, "office1", &state).unwrap();
1269 assert_eq!(decision, JoinRoomDecision::Join);
1270 }
1271
1272 #[test]
1273 fn test_validate_join_room_computer_first_time_join() {
1274 let state = create_test_state();
1275 let session = SessionData::new(
1276 "sid_computer".to_string(),
1277 "computer1".to_string(),
1278 ClientRole::Computer,
1279 );
1280
1281 let decision = SmcpHandler::validate_join_room(&session, "office1", &state).unwrap();
1282 assert_eq!(decision, JoinRoomDecision::Join);
1283 }
1284
1285 #[test]
1286 fn test_enter_office_notification_computer() {
1287 let computer_name = "computer1".to_string();
1288 let office_id = "office1".to_string();
1289
1290 let notification = EnterOfficeNotification {
1291 office_id: office_id.clone(),
1292 computer: Some(computer_name.clone()),
1293 agent: None,
1294 };
1295
1296 assert_eq!(notification.office_id, office_id);
1297 assert_eq!(notification.computer, Some(computer_name));
1298 assert_eq!(notification.agent, None);
1299 }
1300
1301 #[test]
1302 fn test_enter_office_notification_agent() {
1303 let agent_name = "agent1".to_string();
1304 let office_id = "office1".to_string();
1305
1306 let notification = EnterOfficeNotification {
1307 office_id: office_id.clone(),
1308 computer: None,
1309 agent: Some(agent_name.clone()),
1310 };
1311
1312 assert_eq!(notification.office_id, office_id);
1313 assert_eq!(notification.computer, None);
1314 assert_eq!(notification.agent, Some(agent_name));
1315 }
1316
1317 #[test]
1318 fn test_leave_office_notification_computer() {
1319 let computer_name = "computer1".to_string();
1320 let office_id = "office1".to_string();
1321
1322 let notification = LeaveOfficeNotification {
1323 office_id: office_id.clone(),
1324 computer: Some(computer_name.clone()),
1325 agent: None,
1326 };
1327
1328 assert_eq!(notification.office_id, office_id);
1329 assert_eq!(notification.computer, Some(computer_name));
1330 assert_eq!(notification.agent, None);
1331 }
1332
1333 #[test]
1334 fn test_update_tool_list_notification() {
1335 let computer_name = "computer1".to_string();
1336
1337 let notification = UpdateToolListNotification {
1338 computer: computer_name.clone(),
1339 };
1340
1341 assert_eq!(notification.computer, computer_name);
1342 }
1343
1344 #[test]
1345 fn test_update_mcp_config_notification() {
1346 let computer_name = "computer1".to_string();
1347
1348 let notification = UpdateMCPConfigNotification {
1349 computer: computer_name.clone(),
1350 };
1351
1352 assert_eq!(notification.computer, computer_name);
1353 }
1354
1355 #[test]
1356 fn test_notification_serialization() {
1357 let tool_list_notification = UpdateToolListNotification {
1359 computer: "computer1".to_string(),
1360 };
1361
1362 let json = serde_json::to_string(&tool_list_notification).unwrap();
1363 assert!(json.contains("\"computer\":\"computer1\""));
1364
1365 let mcp_config_notification = UpdateMCPConfigNotification {
1366 computer: "computer1".to_string(),
1367 };
1368
1369 let json = serde_json::to_string(&mcp_config_notification).unwrap();
1370 assert!(json.contains("\"computer\":\"computer1\""));
1371 }
1372}