actix_web_socket_io/
lib.rs

1use actix_web::{
2    web::{Bytes, Payload},
3    HttpRequest, HttpResponse,
4};
5use actix_web_actors::ws::{self};
6use async_trait::async_trait;
7use crossbeam::channel::TryRecvError;
8use serde::Serialize;
9use serde_json::Value;
10use session::{Emiter, Session, SessionStore};
11use socketio::{EventData, MessageType};
12use std::{sync::Arc, time::Duration};
13use tokio::sync::RwLock;
14use uuid::Uuid;
15
16pub mod session;
17pub mod socketio;
18
19pub struct SocketIO {
20    pub socket_server: Arc<SocketServer>,
21    pub socket_config: Arc<SocketConfig>,
22}
23
24pub struct SocketIOResult {
25    pub http_response: Result<HttpResponse, actix_web::error::Error>,
26    pub session_receive: Arc<SessionReceive>,
27    pub session_id: Uuid,
28}
29
30#[derive(Clone)]
31pub struct SocketConfig {
32    // 心跳间隔(毫秒), 默认 25000
33    pub ping_interval: u64,
34    // 心跳超时(毫秒), 默认 20000
35    pub ping_timeout: u64,
36    // 每个块的最大字节数, 默认 1000000 Byte
37    pub max_payload: usize,
38}
39
40impl Default for SocketConfig {
41    fn default() -> Self {
42        Self {
43            ping_interval: 25000,
44            ping_timeout: 20000,
45            max_payload: 1000000,
46        }
47    }
48}
49
50impl SocketIO {
51    pub fn new() -> Self {
52        Self {
53            socket_config: Arc::new(SocketConfig::default()),
54            socket_server: Arc::new(SocketServer::new()),
55        }
56    }
57
58    pub fn config(&mut self, socket_config: SocketConfig) -> &mut Self {
59        self.socket_config = Arc::new(socket_config);
60
61        self
62    }
63
64    /// 建立连接
65    pub fn connect(&self, req: &HttpRequest, stream: Payload) -> SocketIOResult {
66        let session_store = self.socket_server.session_store.clone();
67        // 创建一个新会话
68        let session = Session::new(self.socket_config.clone(), session_store);
69
70        let session_receive = Arc::new(SessionReceive::new(session.id, self.socket_server.clone()));
71
72        let receiver = session.get_receiver();
73
74        // 收到事件统一处理
75        let inner_receive = session_receive.clone();
76        actix_web::rt::spawn(async move {
77            loop {
78                match receiver.try_recv() {
79                    Ok(message_data) => {
80                        inner_receive.handle_receive_msg(message_data).await;
81                    }
82                    Err(TryRecvError::Disconnected) => {
83                        break;
84                    }
85                    Err(TryRecvError::Empty) => {
86                        actix_web::rt::time::sleep(Duration::from_millis(20)).await;
87                    }
88                }
89            }
90        });
91
92        SocketIOResult {
93            session_id: session.id,
94            http_response: ws::start(session, req, stream),
95            session_receive: session_receive.clone(),
96        }
97    }
98}
99
100#[async_trait]
101pub trait MessageHandle: Sync + Send + 'static {
102    async fn handler(&self, data: Value, session_id: Uuid);
103}
104
105/// 监听客户端
106pub struct Listener {
107    pub event_name: String,
108    pub handler: Box<dyn MessageHandle>,
109}
110
111pub struct SocketServer {
112    pub session_store: Arc<RwLock<SessionStore>>,
113}
114
115///
116/// 数据接收对象
117///
118pub struct SessionReceive {
119    session_id: Uuid,
120    // 服务端监听的事件总线
121    listeners: RwLock<Vec<Listener>>,
122    socket_server: Arc<SocketServer>,
123}
124
125impl SessionReceive {
126    pub fn new(session_id: Uuid, socket_server: Arc<SocketServer>) -> Self {
127        Self {
128            session_id,
129            listeners: RwLock::new(vec![]),
130            socket_server,
131        }
132    }
133
134    /// 接收到客户端发来的事件
135    async fn handle_receive_msg(&self, message_type: MessageType) {
136        match message_type {
137            MessageType::Event(message_data) => self.handler_trigger_on(message_data).await,
138            MessageType::None => (),
139        }
140    }
141
142    /// 触发事件
143    async fn handler_trigger_on(&self, event: EventData) {
144        let listeners = self.listeners.read().await;
145        for listener in listeners.iter() {
146            // 按事件名匹配
147            if listener.event_name.eq(&event.0) {
148                listener
149                    .handler
150                    .handler(event.1.clone(), self.session_id)
151                    .await;
152            }
153        }
154    }
155
156    /// 处理二进制数据
157    pub fn handle_receive_binary_msg(&mut self, _data_bin: Bytes) {
158        // 触发监听
159    }
160
161    /// 监听客户端推来的事件
162    pub async fn on(&self, listener: Listener) {
163        self.listeners.write().await.push(listener);
164    }
165}
166
167impl SocketServer {
168    pub fn new() -> Self {
169        Self {
170            session_store: Arc::new(RwLock::new(SessionStore::new())),
171        }
172    }
173
174    /// 发送事件给客户端
175    pub async fn emit<D: Serialize + Send + 'static + Sync>(
176        &self,
177        emiter: Emiter<D>,
178        session_id: Option<Uuid>,
179    ) -> Result<(), String> {
180        let emiter = Arc::new(emiter);
181        if let Some(session_id) = session_id {
182            if let Some(session) = self.session_store.read().await.sessions.get(&session_id) {
183                session.do_send(emiter.clone());
184            }
185        } else {
186            for session in self.session_store.read().await.sessions.values() {
187                session.do_send(emiter.clone());
188            }
189        }
190
191        Ok(())
192    }
193}