Skip to main content

actix_web_socket_io/
lib.rs

1use actix_web::{
2    web::{Bytes, Payload},
3    HttpRequest, HttpResponse,
4};
5use actix_web_actors::ws::{self};
6use async_trait::async_trait;
7use serde::Serialize;
8use serde_json::Value;
9use session::{Emiter, Session, SessionStore};
10use socketio::{ConnectSuccess, EventData, MessageType};
11use std::{collections::HashMap, sync::Arc};
12use tokio::sync::RwLock;
13use uuid::Uuid;
14
15pub mod session;
16pub mod socketio;
17
18pub struct SocketIO {
19    pub socket_server: Arc<SocketServer>,
20    pub socket_config: Arc<SocketConfig>,
21}
22
23pub struct SocketIOResult {
24    pub http_response: Result<HttpResponse, actix_web::error::Error>,
25    pub session_receive: Arc<SessionReceive>,
26    pub session_id: Uuid,
27}
28
29#[derive(Clone)]
30pub struct SocketConfig {
31    // 心跳间隔(毫秒), 默认 25000
32    pub ping_interval: u64,
33    // 心跳超时(毫秒), 默认 20000
34    pub ping_timeout: u64,
35    // 每个块的最大字节数, 默认 1000000 Byte
36    pub max_payload: usize,
37}
38
39impl Default for SocketConfig {
40    fn default() -> Self {
41        Self {
42            ping_interval: 25000,
43            ping_timeout: 20000,
44            max_payload: 1000000,
45        }
46    }
47}
48
49impl SocketIO {
50    pub fn new() -> Self {
51        Self {
52            socket_config: Arc::new(SocketConfig::default()),
53            socket_server: Arc::new(SocketServer::new()),
54        }
55    }
56
57    pub fn config(&mut self, socket_config: SocketConfig) -> &mut Self {
58        self.socket_config = Arc::new(socket_config);
59
60        self
61    }
62
63    /// 建立连接
64    pub fn connect(&self, req: &HttpRequest, stream: Payload) -> SocketIOResult {
65        let session_store = self.socket_server.session_store.clone();
66        // 创建一个新会话
67        let session = Session::new(self.socket_config.clone(), session_store);
68
69        let session_receive = Arc::new(SessionReceive::new(session.id, self.socket_server.clone()));
70
71        let mut receiver = session.get_receiver();
72
73        // 收到事件统一处理
74        let inner_receive = session_receive.clone();
75        actix_web::rt::spawn(async move {
76            while let Ok(message_data) = receiver.recv().await {
77                inner_receive.handle_receive_msg(message_data).await;
78            }
79        });
80
81        SocketIOResult {
82            session_id: session.id,
83            http_response: ws::start(session, req, stream),
84            session_receive: session_receive.clone(),
85        }
86    }
87}
88
89#[async_trait]
90pub trait MessageHandle: Sync + Send + 'static {
91    async fn handler(&self, data: Value, session_id: Uuid);
92}
93
94/// 监听客户端
95pub struct Listener {
96    pub event_name: String,
97    pub handler: Box<dyn MessageHandle>,
98}
99
100pub struct SocketServer {
101    pub session_store: Arc<RwLock<SessionStore>>,
102}
103
104///
105/// 数据接收对象
106///
107pub struct SessionReceive {
108    session_id: Uuid,
109    // 服务端监听的事件总线
110    listeners: RwLock<Vec<Listener>>,
111    socket_server: Arc<SocketServer>,
112}
113
114impl SessionReceive {
115    pub fn new(session_id: Uuid, socket_server: Arc<SocketServer>) -> Self {
116        Self {
117            session_id,
118            listeners: RwLock::new(vec![]),
119            socket_server,
120        }
121    }
122
123    /// 接收到客户端发来的事件
124    async fn handle_receive_msg(&self, message_type: MessageType) {
125        match message_type {
126            MessageType::Connect => self.accept_connect().await,
127            MessageType::Event(message_data) => self.handler_trigger_on(message_data).await,
128            MessageType::None => (),
129        }
130    }
131
132    /// 同意建立连接
133    async fn accept_connect(&self) {
134        let session_store = self.socket_server.session_store.write().await;
135        let addr = session_store.sessions.get(&self.session_id);
136        if let Some(addr) = addr {
137            addr.do_send(ConnectSuccess {
138                data: HashMap::from([("sid", "accept")]),
139            });
140        }
141
142        self.handler_trigger_on(EventData("connect".into(), Value::Null))
143            .await;
144    }
145
146    /// 触发事件
147    async fn handler_trigger_on(&self, event: EventData) {
148        let listeners = self.listeners.read().await;
149        for listener in listeners.iter() {
150            // 按事件名匹配
151            if listener.event_name.eq(&event.0) {
152                listener
153                    .handler
154                    .handler(event.1.clone(), self.session_id)
155                    .await;
156            }
157        }
158    }
159
160    /// 处理二进制数据
161    pub fn handle_receive_binary_msg(&mut self, _data_bin: Bytes) {
162        // 触发监听
163    }
164
165    /// 监听客户端推来的事件
166    pub async fn on(&self, listener: Listener) {
167        self.listeners.write().await.push(listener);
168    }
169}
170
171impl SocketServer {
172    pub fn new() -> Self {
173        Self {
174            session_store: Arc::new(RwLock::new(SessionStore::new())),
175        }
176    }
177
178    /// 发送事件给客户端
179    pub async fn emit<D: Serialize + Send + 'static + Sync>(
180        &self,
181        emiter: Emiter<D>,
182        session_id: Option<Uuid>,
183    ) -> Result<(), String> {
184        let emiter = Arc::new(emiter);
185        if let Some(session_id) = session_id {
186            if let Some(session) = self.session_store.read().await.sessions.get(&session_id) {
187                session.do_send(emiter.clone());
188            }
189        } else {
190            for session in self.session_store.read().await.sessions.values() {
191                session.do_send(emiter.clone());
192            }
193        }
194
195        Ok(())
196    }
197}