use actix_web::{
web::{Bytes, Payload},
HttpRequest, HttpResponse,
};
use actix_web_actors::ws::{self};
use async_trait::async_trait;
use serde::Serialize;
use serde_json::Value;
use session::{Emiter, Session, SessionStore};
use socketio::{ConnectSuccess, EventData, MessageType};
use std::{collections::HashMap, sync::Arc};
use tokio::sync::RwLock;
use uuid::Uuid;
pub mod session;
pub mod socketio;
pub struct SocketIO {
pub socket_server: Arc<SocketServer>,
pub socket_config: Arc<SocketConfig>,
}
pub struct SocketIOResult {
pub http_response: Result<HttpResponse, actix_web::error::Error>,
pub session_receive: Arc<SessionReceive>,
pub session_id: Uuid,
}
#[derive(Clone)]
pub struct SocketConfig {
pub ping_interval: u64,
pub ping_timeout: u64,
pub max_payload: usize,
}
impl Default for SocketConfig {
fn default() -> Self {
Self {
ping_interval: 25000,
ping_timeout: 20000,
max_payload: 1000000,
}
}
}
impl SocketIO {
pub fn new() -> Self {
Self {
socket_config: Arc::new(SocketConfig::default()),
socket_server: Arc::new(SocketServer::new()),
}
}
pub fn config(&mut self, socket_config: SocketConfig) -> &mut Self {
self.socket_config = Arc::new(socket_config);
self
}
pub fn connect(&self, req: &HttpRequest, stream: Payload) -> SocketIOResult {
let session_store = self.socket_server.session_store.clone();
let session = Session::new(self.socket_config.clone(), session_store);
let session_receive = Arc::new(SessionReceive::new(session.id, self.socket_server.clone()));
let mut receiver = session.get_receiver();
let inner_receive = session_receive.clone();
actix_web::rt::spawn(async move {
while let Ok(message_data) = receiver.recv().await {
inner_receive.handle_receive_msg(message_data).await;
}
});
SocketIOResult {
session_id: session.id,
http_response: ws::start(session, req, stream),
session_receive: session_receive.clone(),
}
}
}
#[async_trait]
pub trait MessageHandle: Sync + Send + 'static {
async fn handler(&self, data: Value, session_id: Uuid);
}
pub struct Listener {
pub event_name: String,
pub handler: Box<dyn MessageHandle>,
}
pub struct SocketServer {
pub session_store: Arc<RwLock<SessionStore>>,
}
pub struct SessionReceive {
session_id: Uuid,
listeners: RwLock<Vec<Listener>>,
socket_server: Arc<SocketServer>,
}
impl SessionReceive {
pub fn new(session_id: Uuid, socket_server: Arc<SocketServer>) -> Self {
Self {
session_id,
listeners: RwLock::new(vec![]),
socket_server,
}
}
async fn handle_receive_msg(&self, message_type: MessageType) {
match message_type {
MessageType::Connect => self.accept_connect().await,
MessageType::Event(message_data) => self.handler_trigger_on(message_data).await,
MessageType::None => (),
}
}
async fn accept_connect(&self) {
let session_store = self.socket_server.session_store.write().await;
let addr = session_store.sessions.get(&self.session_id);
if let Some(addr) = addr {
addr.do_send(ConnectSuccess {
data: HashMap::from([("sid", "accept")]),
});
}
self.handler_trigger_on(EventData("connect".into(), Value::Null))
.await;
}
async fn handler_trigger_on(&self, event: EventData) {
let listeners = self.listeners.read().await;
for listener in listeners.iter() {
if listener.event_name.eq(&event.0) {
listener
.handler
.handler(event.1.clone(), self.session_id)
.await;
}
}
}
pub fn handle_receive_binary_msg(&mut self, _data_bin: Bytes) {
}
pub async fn on(&self, listener: Listener) {
self.listeners.write().await.push(listener);
}
}
impl SocketServer {
pub fn new() -> Self {
Self {
session_store: Arc::new(RwLock::new(SessionStore::new())),
}
}
pub async fn emit<D: Serialize + Send + 'static + Sync>(
&self,
emiter: Emiter<D>,
session_id: Option<Uuid>,
) -> Result<(), String> {
let emiter = Arc::new(emiter);
if let Some(session_id) = session_id {
if let Some(session) = self.session_store.read().await.sessions.get(&session_id) {
session.do_send(emiter.clone());
}
} else {
for session in self.session_store.read().await.sessions.values() {
session.do_send(emiter.clone());
}
}
Ok(())
}
}