use crate::prelude::*;
use deadpool_redis::redis::AsyncCommands;
use chrono::Utc;
use dashmap::DashMap;
use serde::{Deserialize, Serialize};
use serde_json::json;
use std::sync::Arc;
use tokio::sync::mpsc;
#[derive(thiserror::Error, Debug)]
pub enum WsError {
#[error("Redis manager error: {0}")]
RedisManager(#[from] RedisManagerError),
#[error("Redis error: {0}")]
Redis(#[from] deadpool_redis::redis::RedisError),
#[error("Redis pool error: {0}")]
RedisPool(#[from] deadpool_redis::PoolError),
#[error("JSON error: {0}")]
Json(#[from] serde_json::Error),
}
pub type Result<T> = std::result::Result<T, WsError>;
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct UserMsg {
pub from: String, pub to: String, pub msg: String, pub time: String, }
impl UserMsg {
pub fn new(from: String, to: String, msg: String) -> Self {
Self {
from,
to,
msg,
time: Utc::now().to_rfc3339(),
}
}
}
pub struct WsConfig {
pub server: u32,
pub redis: RedisManager,
}
#[derive(Clone)]
pub struct WsManager {
pub server: u32,
pub redis: RedisManager,
pub local_sessions: Arc<DashMap<String, mpsc::Sender<String>>>,
}
impl WsManager {
pub fn new(config: WsConfig) -> Self {
Self {
server: config.server,
redis: config.redis,
local_sessions: Arc::new(DashMap::new()),
}
}
pub async fn register(&self, uid: &str, tx: mpsc::Sender<String>) -> Result<()> {
let mut conn = self.redis.get_connection().await?;
conn.set::<_, _, ()>(format!("user:{}", uid), self.server.to_string()).await?;
self.local_sessions.insert(uid.to_string(), tx);
Ok(())
}
pub async fn join_room(&self, room_name: &str, uid: &str) -> Result<()> {
let mut conn = self.redis.get_connection().await?;
conn.sadd::<_, _, ()>(format!("room:{}", room_name), uid).await?;
Ok(())
}
pub async fn leave_room(&self, room_name: &str, uid: &str) -> Result<()> {
let mut conn = self.redis.get_connection().await?;
conn.srem::<_, _, ()>(format!("room:{}", room_name), uid).await?;
Ok(())
}
pub async fn msg_room(&self, room_name: &str, msg_obj: UserMsg) -> Result<()> {
let mut conn = self.redis.get_connection().await?;
let msg_str = serde_json::to_string(&msg_obj)?;
conn.rpush::<_, _, ()>(format!("room_msgs:{}", room_name), &msg_str).await?;
let users: Vec<String> = conn.smembers(format!("room:{}", room_name)).await?;
for uid in users {
let _ = self.msg_user(&uid, msg_str.clone()).await;
}
Ok(())
}
pub async fn msg_user(&self, uid: &str, msg: String) -> Result<bool> {
let mut conn = self.redis.get_connection().await?;
let user_data: Option<String> = conn.get(format!("user:{}", uid)).await?;
if let Some(data) = user_data {
let server_id = data.parse::<u32>().unwrap_or(0);
if server_id == self.server {
if let Some(sender) = self.local_sessions.get(uid) {
let _ = sender.send(msg).await;
}
} else {
let payload = json!({
"target_uid": uid,
"msg": msg
}).to_string();
conn.publish::<_, _, ()>("fr-ws", payload).await?;
}
} else {
return Ok(false);
}
Ok(true)
}
pub async fn drop_user(&self, uid: &str) -> Result<()> {
let mut conn = self.redis.get_connection().await?;
conn.del::<_, ()>(format!("user:{}", uid)).await?;
self.local_sessions.remove(uid);
Ok(())
}
pub async fn drop_room(&self, room_name: &str) -> Result<()> {
let mut conn = self.redis.get_connection().await?;
conn.del::<_, ()>(format!("room:{}", room_name)).await?;
conn.del::<_, ()>(format!("room_msgs:{}", room_name)).await?;
Ok(())
}
pub async fn broadcast(&self, msg: String) -> Result<()> {
let mut conn = self.redis.get_connection().await?;
conn.publish::<_, _, ()>("fr-ws-broadcast", &msg).await?;
for entry in self.local_sessions.iter() {
let _ = entry.value().send(msg.clone()).await;
}
Ok(())
}
pub async fn get_room_msgs(&self, room_name: &str) -> Result<Vec<UserMsg>> {
let mut conn = self.redis.get_connection().await?;
let msgs_str: Vec<String> = conn.lrange(format!("room_msgs:{}", room_name), 0, -1).await?;
let mut msgs = Vec::new();
for m in msgs_str {
if let Ok(parsed) = serde_json::from_str(&m) {
msgs.push(parsed);
}
}
Ok(msgs)
}
}