actix_web_socket_io/
lib.rs1use actix_web::{
2 web::{Bytes, Payload},
3 HttpRequest, HttpResponse,
4};
5use actix_web_actors::ws::{self};
6use async_trait::async_trait;
7use serde::Serialize;
8use serde_json::Value;
9use session::{Emiter, Session, SessionStore};
10use socketio::{ConnectSuccess, EventData, MessageType};
11use std::{collections::HashMap, sync::Arc};
12use tokio::sync::RwLock;
13use uuid::Uuid;
14
15pub mod session;
16pub mod socketio;
17
18pub struct SocketIO {
19 pub socket_server: Arc<SocketServer>,
20 pub socket_config: Arc<SocketConfig>,
21}
22
23pub struct SocketIOResult {
24 pub http_response: Result<HttpResponse, actix_web::error::Error>,
25 pub session_receive: Arc<SessionReceive>,
26 pub session_id: Uuid,
27}
28
29#[derive(Clone)]
30pub struct SocketConfig {
31 pub ping_interval: u64,
33 pub ping_timeout: u64,
35 pub max_payload: usize,
37}
38
39impl Default for SocketConfig {
40 fn default() -> Self {
41 Self {
42 ping_interval: 25000,
43 ping_timeout: 20000,
44 max_payload: 1000000,
45 }
46 }
47}
48
49impl SocketIO {
50 pub fn new() -> Self {
51 Self {
52 socket_config: Arc::new(SocketConfig::default()),
53 socket_server: Arc::new(SocketServer::new()),
54 }
55 }
56
57 pub fn config(&mut self, socket_config: SocketConfig) -> &mut Self {
58 self.socket_config = Arc::new(socket_config);
59
60 self
61 }
62
63 pub fn connect(&self, req: &HttpRequest, stream: Payload) -> SocketIOResult {
65 let session_store = self.socket_server.session_store.clone();
66 let session = Session::new(self.socket_config.clone(), session_store);
68
69 let session_receive = Arc::new(SessionReceive::new(session.id, self.socket_server.clone()));
70
71 let mut receiver = session.get_receiver();
72
73 let inner_receive = session_receive.clone();
75 actix_web::rt::spawn(async move {
76 while let Ok(message_data) = receiver.recv().await {
77 inner_receive.handle_receive_msg(message_data).await;
78 }
79 });
80
81 SocketIOResult {
82 session_id: session.id,
83 http_response: ws::start(session, req, stream),
84 session_receive: session_receive.clone(),
85 }
86 }
87}
88
89#[async_trait]
90pub trait MessageHandle: Sync + Send + 'static {
91 async fn handler(&self, data: Value, session_id: Uuid);
92}
93
94pub struct Listener {
96 pub event_name: String,
97 pub handler: Box<dyn MessageHandle>,
98}
99
100pub struct SocketServer {
101 pub session_store: Arc<RwLock<SessionStore>>,
102}
103
104pub struct SessionReceive {
108 session_id: Uuid,
109 listeners: RwLock<Vec<Listener>>,
111 socket_server: Arc<SocketServer>,
112}
113
114impl SessionReceive {
115 pub fn new(session_id: Uuid, socket_server: Arc<SocketServer>) -> Self {
116 Self {
117 session_id,
118 listeners: RwLock::new(vec![]),
119 socket_server,
120 }
121 }
122
123 async fn handle_receive_msg(&self, message_type: MessageType) {
125 match message_type {
126 MessageType::Connect => self.accept_connect().await,
127 MessageType::Event(message_data) => self.handler_trigger_on(message_data).await,
128 MessageType::None => (),
129 }
130 }
131
132 async fn accept_connect(&self) {
134 let session_store = self.socket_server.session_store.write().await;
135 let addr = session_store.sessions.get(&self.session_id);
136 if let Some(addr) = addr {
137 addr.do_send(ConnectSuccess {
138 data: HashMap::from([("sid", "accept")]),
139 });
140 }
141
142 self.handler_trigger_on(EventData("connect".into(), Value::Null))
143 .await;
144 }
145
146 async fn handler_trigger_on(&self, event: EventData) {
148 let listeners = self.listeners.read().await;
149 for listener in listeners.iter() {
150 if listener.event_name.eq(&event.0) {
152 listener
153 .handler
154 .handler(event.1.clone(), self.session_id)
155 .await;
156 }
157 }
158 }
159
160 pub fn handle_receive_binary_msg(&mut self, _data_bin: Bytes) {
162 }
164
165 pub async fn on(&self, listener: Listener) {
167 self.listeners.write().await.push(listener);
168 }
169}
170
171impl SocketServer {
172 pub fn new() -> Self {
173 Self {
174 session_store: Arc::new(RwLock::new(SessionStore::new())),
175 }
176 }
177
178 pub async fn emit<D: Serialize + Send + 'static + Sync>(
180 &self,
181 emiter: Emiter<D>,
182 session_id: Option<Uuid>,
183 ) -> Result<(), String> {
184 let emiter = Arc::new(emiter);
185 if let Some(session_id) = session_id {
186 if let Some(session) = self.session_store.read().await.sessions.get(&session_id) {
187 session.do_send(emiter.clone());
188 }
189 } else {
190 for session in self.session_store.read().await.sessions.values() {
191 session.do_send(emiter.clone());
192 }
193 }
194
195 Ok(())
196 }
197}