actix_web_socket_io/
lib.rs1use actix_web::{
2 web::{Bytes, Payload},
3 HttpRequest, HttpResponse,
4};
5use actix_web_actors::ws::{self};
6use async_trait::async_trait;
7use crossbeam::channel::TryRecvError;
8use serde::Serialize;
9use serde_json::Value;
10use session::{Emiter, Session, SessionStore};
11use socketio::{EventData, MessageType};
12use std::{sync::Arc, time::Duration};
13use tokio::sync::RwLock;
14use uuid::Uuid;
15
16pub mod session;
17pub mod socketio;
18
19pub struct SocketIO {
20 pub socket_server: Arc<SocketServer>,
21 pub socket_config: Arc<SocketConfig>,
22}
23
24pub struct SocketIOResult {
25 pub http_response: Result<HttpResponse, actix_web::error::Error>,
26 pub session_receive: Arc<SessionReceive>,
27 pub session_id: Uuid,
28}
29
30#[derive(Clone)]
31pub struct SocketConfig {
32 pub ping_interval: u64,
34 pub ping_timeout: u64,
36 pub max_payload: usize,
38}
39
40impl Default for SocketConfig {
41 fn default() -> Self {
42 Self {
43 ping_interval: 25000,
44 ping_timeout: 20000,
45 max_payload: 1000000,
46 }
47 }
48}
49
50impl SocketIO {
51 pub fn new() -> Self {
52 Self {
53 socket_config: Arc::new(SocketConfig::default()),
54 socket_server: Arc::new(SocketServer::new()),
55 }
56 }
57
58 pub fn config(&mut self, socket_config: SocketConfig) -> &mut Self {
59 self.socket_config = Arc::new(socket_config);
60
61 self
62 }
63
64 pub fn connect(&self, req: &HttpRequest, stream: Payload) -> SocketIOResult {
66 let session_store = self.socket_server.session_store.clone();
67 let session = Session::new(self.socket_config.clone(), session_store);
69
70 let session_receive = Arc::new(SessionReceive::new(session.id, self.socket_server.clone()));
71
72 let receiver = session.get_receiver();
73
74 let inner_receive = session_receive.clone();
76 actix_web::rt::spawn(async move {
77 loop {
78 match receiver.try_recv() {
79 Ok(message_data) => {
80 inner_receive.handle_receive_msg(message_data).await;
81 }
82 Err(TryRecvError::Disconnected) => {
83 break;
84 }
85 Err(TryRecvError::Empty) => {
86 actix_web::rt::time::sleep(Duration::from_millis(20)).await;
87 }
88 }
89 }
90 });
91
92 SocketIOResult {
93 session_id: session.id,
94 http_response: ws::start(session, req, stream),
95 session_receive: session_receive.clone(),
96 }
97 }
98}
99
100#[async_trait]
101pub trait MessageHandle: Sync + Send + 'static {
102 async fn handler(&self, data: Value, session_id: Uuid);
103}
104
105pub struct Listener {
107 pub event_name: String,
108 pub handler: Box<dyn MessageHandle>,
109}
110
111pub struct SocketServer {
112 pub session_store: Arc<RwLock<SessionStore>>,
113}
114
115pub struct SessionReceive {
119 session_id: Uuid,
120 listeners: RwLock<Vec<Listener>>,
122 socket_server: Arc<SocketServer>,
123}
124
125impl SessionReceive {
126 pub fn new(session_id: Uuid, socket_server: Arc<SocketServer>) -> Self {
127 Self {
128 session_id,
129 listeners: RwLock::new(vec![]),
130 socket_server,
131 }
132 }
133
134 async fn handle_receive_msg(&self, message_type: MessageType) {
136 match message_type {
137 MessageType::Event(message_data) => self.handler_trigger_on(message_data).await,
138 MessageType::None => (),
139 }
140 }
141
142 async fn handler_trigger_on(&self, event: EventData) {
144 let listeners = self.listeners.read().await;
145 for listener in listeners.iter() {
146 if listener.event_name.eq(&event.0) {
148 listener
149 .handler
150 .handler(event.1.clone(), self.session_id)
151 .await;
152 }
153 }
154 }
155
156 pub fn handle_receive_binary_msg(&mut self, _data_bin: Bytes) {
158 }
160
161 pub async fn on(&self, listener: Listener) {
163 self.listeners.write().await.push(listener);
164 }
165}
166
167impl SocketServer {
168 pub fn new() -> Self {
169 Self {
170 session_store: Arc::new(RwLock::new(SessionStore::new())),
171 }
172 }
173
174 pub async fn emit<D: Serialize + Send + 'static + Sync>(
176 &self,
177 emiter: Emiter<D>,
178 session_id: Option<Uuid>,
179 ) -> Result<(), String> {
180 let emiter = Arc::new(emiter);
181 if let Some(session_id) = session_id {
182 if let Some(session) = self.session_store.read().await.sessions.get(&session_id) {
183 session.do_send(emiter.clone());
184 }
185 } else {
186 for session in self.session_store.read().await.sessions.values() {
187 session.do_send(emiter.clone());
188 }
189 }
190
191 Ok(())
192 }
193}