use std::{
collections::HashMap,
sync::{
atomic::{AtomicUsize, Ordering},
Arc,
},
};
use harsh::Harsh;
use tokio::sync::mpsc;
use crate::{
commons::parse_json_string,
core::websocat::echo::{command::Command, connection_new::Connection},
};
use super::{command::Broadcast, server::EchoServer, WEBCHAT_BROADCAST_CHANNEL};
impl EchoServer {
pub async fn new(ctx: actix_web::web::Data<crate::server::AppContext>) -> (Self, Connection) {
let (this, cmd_tx) = {
let (cmd_tx, cmd_rx) = mpsc::unbounded_channel();
let this = Self {
cmd_rx,
ctx,
visitor_count: Arc::new(AtomicUsize::new(0)),
sessions: HashMap::new(),
users: HashMap::new(),
subscribes: HashMap::new(),
};
(this, cmd_tx)
};
let connection = Connection { cmd_tx };
this.start_server_time(&connection);
this.subscribe_channel(
&connection,
WEBCHAT_BROADCAST_CHANNEL.to_string(),
|conn, _channel, msg| {
if let Ok(message) = parse_json_string::<Broadcast>(msg) {
conn.broadcast(message.level, message.ty, message.msg);
}
},
);
(this, connection)
}
pub async fn run(mut self) -> std::io::Result<()> {
while let Some(cmd) = self.cmd_rx.recv().await {
match cmd {
Command::Connect {
conn_tx,
res_tx,
user_info,
} => {
let conn_id = self.handle_connect(conn_tx, user_info).await;
let _ = res_tx.send(conn_id);
}
Command::Autorization { id, conn, token } => {
self.handle_autorization(id, conn, token).await;
}
Command::Disconnect { conn } => {
self.handle_disconnect(conn).await;
}
Command::Message { conn, msg } => {
self.handle_message(conn, msg).await;
}
Command::Total { res_tx } => {
let count = &self.visitor_count.fetch_or(0, Ordering::SeqCst);
let _ = res_tx.send(*count);
}
Command::OnlineUsers { res_tx } => {
let conns = &self
.users
.clone()
.keys()
.take(10000) .cloned()
.collect::<Vec<String>>();
let _ = res_tx.send(conns.to_owned());
}
Command::AllConnections { res_tx } => {
let conns = &self
.users
.clone()
.into_iter()
.take(10000) .map(|(key, val)| (Harsh::default().encode(&[val.1 as u64]), val.0 .1, key))
.collect::<Vec<(String, String, String)>>();
let _ = res_tx.send(conns.to_owned());
}
Command::Closed { conn } => {
if let Some((tx, _)) = self.sessions.get(&conn) {
_ = tx.send("Close".into());
}
}
Command::Broadcast { level, ty, msg } => {
self.handle_broadcast(level, ty, msg).await;
}
Command::Servertime { msg } => {
self.handle_servertime(msg).await;
}
Command::Subscribe { id, conn, channel } => {
self.handle_subscribe(id, conn, channel).await;
}
Command::Unsubscribe { id, conn, channel } => {
self.handle_unsubscribe(id, conn, channel).await;
}
Command::TalkMsg {
id,
conn,
curr_user,
msg,
} => {
self.handle_chat_message(id, conn, &curr_user, msg).await;
}
Command::HistoryMsg { id, conn, to } => {
self.handle_history_msg(id, conn, &to).await;
}
Command::HistoryTalk { id, conn } => {
self.handle_history_talk(id, conn).await;
}
Command::Location { id, conn, msg } => {
self.handle_location_message(id, conn, msg).await;
}
}
}
Ok(())
}
}