use harsh::Harsh;
use rand::Rng;
use serde_json::json;
use std::sync::atomic::Ordering;
use tokio::sync::mpsc;
use super::{connection_new::Connection, server::EchoServer, ConnId, Msg};
use super::{
input::{PostLocationData, SubscribeChannel, TalkMessage},
output::{Level, OutputMessage, Ty},
service_chat, service_location, UserInfo,
};
use crate::core::auth0::jwt_token::JwtToken;
use crate::core::{looop, websocat::echo::input::Action};
impl EchoServer {
pub async fn handle_connect(
&mut self,
tx: mpsc::UnboundedSender<Msg>,
user_info: UserInfo,
) -> ConnId {
let id = rand::rng().random::<u64>() as usize;
let mut data = json!({ "connid": Harsh::default().encode(&[id as u64]) });
self.incrment_visitor_count();
let _user_id = user_info.0.to_owned();
data["user_id"] = json!(&_user_id);
data["nick_name"] = json!(user_info.1.to_owned());
data["user_role"] = json!(user_info.2.to_owned());
#[rustfmt::skip]
self.sessions.insert(id, (tx, Some(_user_id.to_owned())));
self.users
.insert(user_info.0.to_owned(), (user_info.to_owned(), id));
let out = OutputMessage {
id: None,
level: Level::Info,
ty: Ty::Connected,
msg: None,
data: Some(data),
};
if let Some((tx, _)) = self.sessions.get(&id) {
if let Some(out) = out.message() {
Self::do_send(tx, out);
}
}
let out = OutputMessage {
id: None,
level: Level::Info,
ty: Ty::UserOnline,
msg: None,
data: Some(json!(user_info)),
};
if let Some(out) = out.message() {
for (tx, user_id) in self.sessions.values() {
if let Some(user_id) = user_id {
if _user_id != *user_id {
Self::do_send(tx, &out);
}
}
}
}
id
}
pub async fn handle_autorization(&mut self, id: Option<String>, conn: ConnId, token: String) {
if let Some((tx, guest_id)) = self.sessions.get(&conn) {
let verify_result = match JwtToken::verify_token(&token) {
Ok(claims) => {
if !claims.is_expired() {
let user_id = claims.sub;
let user_name = claims.aud;
let user_info = (user_id.to_owned(), user_name, "user".to_owned());
if let Some(guest_id) = guest_id {
self.users.remove(guest_id);
}
#[rustfmt::skip]
self.sessions.insert(conn, (tx.clone(), Some(user_id.to_owned())));
self.users.insert(user_id.to_owned(), (user_info, conn));
(Some(user_id), None)
} else {
log::warn!("handle_autorization: user_id={}", claims.sub);
(None, Some("".to_string()))
}
}
Err(e) => {
log::error!("handle_autorization: error={}", e);
(None, Some(e.to_string()))
}
};
let result = matches!(verify_result, (Some(_), None));
let out = OutputMessage {
id,
level: Level::Info,
ty: if result { Ty::Success } else { Ty::Failed },
msg: if result {
None
} else {
Some(verify_result.1.unwrap())
},
data: if result {
Some(json!(verify_result.0.unwrap()))
} else {
None
},
};
if let Some((tx, _)) = self.sessions.get(&conn) {
if let Some(out) = out.message() {
Self::do_send(tx, out);
}
}
}
}
pub async fn handle_disconnect(&mut self, conn_id: ConnId) {
self.decrment_visitor_count();
if let Some((_, user_id)) = self.sessions.get(&conn_id) {
if let Some(user_id) = user_id {
self.users.remove(user_id);
let out = OutputMessage {
id: None,
level: Level::Info,
ty: Ty::UserOffline,
msg: None,
data: Some(json!(vec![user_id])),
};
if let Some(out) = out.message() {
for (tx, _user_id) in self.sessions.values() {
if let Some(_user_id) = _user_id {
if _user_id != user_id {
Self::do_send(tx, &out);
}
}
}
}
}
self.sessions.remove(&conn_id);
}
self.subscribes.remove(&conn_id);
}
pub async fn handle_message(&self, _conn: ConnId, _msg: impl Into<String>) {
if let Some((tx, _)) = self.sessions.get(&_conn) {
Self::do_send(tx, _msg);
}
}
pub async fn handle_subscribe(
&mut self,
id: Option<String>,
conn: ConnId,
channel: SubscribeChannel,
) {
let _channel = channel.to_string();
if let Some(channels) = self.subscribes.get_mut(&conn) {
if !channels.contains(&_channel) {
channels.push(_channel);
}
} else {
self.subscribes.insert(conn, vec![_channel]);
}
let out = OutputMessage {
id,
level: Level::Info,
ty: Ty::Success,
msg: None,
data: Some(json!({ "action": Action::Subscribe, "channel": channel })),
};
if let Some((tx, _)) = self.sessions.get(&conn) {
if let Some(out) = out.message() {
Self::do_send(tx, out);
}
}
}
pub async fn handle_unsubscribe(
&mut self,
id: Option<String>,
conn: ConnId,
channel: SubscribeChannel,
) {
let channel = channel.to_string();
if let Some(channels) = self.subscribes.get_mut(&conn) {
if channels.contains(&channel) {
channels.retain(|x| *x != channel);
}
let out = OutputMessage {
id,
level: Level::Info,
ty: Ty::Success,
msg: None,
data: Some(json!({ "action": Action::Unsubscribe, "channel": channel })),
};
if let Some((tx, _)) = self.sessions.get(&conn) {
if let Some(out) = out.message() {
Self::do_send(tx, out);
}
}
}
}
pub async fn handle_servertime(&self, msg: String) {
self.subscribes.iter().for_each(|(id, channels)| {
if channels.contains(&SubscribeChannel::ServerTime.to_string()) {
if let Some((tx, _)) = self.sessions.get(id) {
let out = OutputMessage {
id: None,
level: Level::Info,
ty: Ty::ServerTime,
msg: None,
data: Some(json!(msg)),
};
if let Some(out) = out.message() {
Self::do_send(tx, out);
}
}
}
});
}
pub async fn handle_chat_message(
&self,
id: Option<String>,
conn: ConnId,
curr_user: &UserInfo,
msg: TalkMessage,
) {
match service_chat::save_chat_message(
self.ctx.as_ref(),
id.clone(),
&curr_user.0,
msg.clone(),
)
.await
{
Ok(Some(msg)) => {
let out = OutputMessage {
id: id.clone(),
level: Level::Info,
ty: Ty::ChatMessage,
msg: None,
data: Some(json!(msg)),
};
if let Some((tx, _)) = self.sessions.get(&conn) {
if let Some(out) = out.message() {
Self::do_send(tx, out);
}
}
let out = OutputMessage {
id: id.clone(),
level: Level::Info,
ty: Ty::ChatMessage,
msg: None,
data: Some(json!(msg)),
};
if let Some(out) = out.message() {
for (tx, user_id) in self.sessions.values() {
if let Some(user_id) = user_id {
if msg.to == *user_id {
if curr_user.0 == msg.from && msg.from == msg.to {
continue;
}
Self::do_send(tx, out);
break;
}
}
}
}
}
Ok(None) => {}
Err(e) => {
self.err(id, conn, e.to_string());
}
}
}
pub async fn handle_location_message(
&self,
id: Option<String>,
conn: ConnId,
msg: PostLocationData,
) {
let curr_user: UserInfo = self.current_user(conn);
match service_location::save_location_message(
self.ctx.as_ref(),
id.clone(),
&curr_user.0,
&msg,
)
.await
{
Ok(Some(mut msg)) => {
msg.id = None;
let out = OutputMessage {
id: id.clone(),
level: Level::Info,
ty: Ty::Location,
msg: None,
data: Some(json!(msg)),
};
if let Some((tx, _)) = self.sessions.get(&conn) {
if let Some(out) = out.message() {
Self::do_send(tx, out);
}
}
}
Ok(None) => {}
Err(e) => {
self.err(id, conn, e.to_string());
}
}
}
pub fn current_user(&self, conn: ConnId) -> UserInfo {
let mut curr_user: Option<UserInfo> = None;
if let Some((_, Some(user_id))) = self.sessions.get(&conn) {
if let Some((_user_info, _)) = self.users.get(user_id) {
curr_user = Some((
_user_info.0.to_owned(), _user_info.1.to_owned(), _user_info.2.to_owned(), ));
}
}
curr_user.unwrap()
}
pub async fn handle_history_msg(&self, id: Option<String>, conn: ConnId, to: &str) {
if let Some((_, Some(from))) = self.sessions.get(&conn) {
match service_chat::query_history_message(self.ctx.as_ref(), from, to).await {
Ok(_msg) => {
let out = OutputMessage {
id,
level: Level::Info,
ty: Ty::HistoryMessage,
msg: None,
data: Some(json!(_msg)),
};
if let Some((tx, _)) = self.sessions.get(&conn) {
if let Some(out) = out.message() {
Self::do_send(tx, out);
}
}
}
Err(e) => {
self.err(id, conn, e.to_string());
}
}
}
}
pub async fn handle_history_talk(&self, id: Option<String>, conn: ConnId) {
if let Some((_, Some(user_id))) = self.sessions.get(&conn) {
match service_chat::query_history_talk(self.ctx.as_ref(), user_id).await {
Ok(_msg) => {
let out = OutputMessage {
id,
level: Level::Info,
ty: Ty::HistoryTalk,
msg: None,
data: Some(json!(_msg)),
};
if let Some((tx, _)) = self.sessions.get(&conn) {
if let Some(out) = out.message() {
Self::do_send(tx, out);
}
}
}
Err(e) => {
self.err(id, conn, e.to_string());
}
}
}
}
pub fn err(&self, id: Option<String>, conn: ConnId, e: String) {
let out = OutputMessage {
id,
level: Level::Err,
ty: Ty::Failed,
msg: Some(e.to_string()),
data: None,
};
if let Some((tx, _)) = self.sessions.get(&conn) {
if let Some(out) = out.message() {
Self::do_send(tx, out);
}
}
}
pub async fn handle_broadcast(&self, level: Level, ty: Ty, msg: impl Into<String>) {
let msg = msg.into();
let out = OutputMessage {
id: None,
level,
ty,
msg: None,
data: Some(json!(msg)),
};
if let Some(out) = out.message() {
for (tx, _) in self.sessions.values() {
Self::do_send(tx, &out);
}
}
}
pub fn do_send(tx: &mpsc::UnboundedSender<Msg>, msg: impl Into<String>) {
let _ = tx.send(msg.into());
}
pub fn incrment_visitor_count(&self) {
_ = self.visitor_count.fetch_add(1, Ordering::SeqCst);
}
pub fn decrment_visitor_count(&self) {
self.visitor_count.fetch_sub(1, Ordering::SeqCst);
}
#[allow(dead_code)]
pub fn start_server_time(&self, connection: &Connection) {
let conn = connection.clone();
tokio::task::spawn_local(async move {
looop::running(*looop::ONE_SEC, 100, move || {
conn.fire_servertime(OutputMessage::ts());
})
.await;
});
}
pub fn subscribe_channel<F>(&self, connection: &Connection, channel: String, f: F)
where
F: Fn(&Connection, &str, &str) + Send + 'static,
{
let ctx = self.ctx.clone();
if let Some(rudis) = ctx.rudis_opt() {
let conn = connection.clone();
tokio::task::spawn_local(async move {
match rudis.subscribe(&channel).await {
Ok(mut rx) => {
let mut interval =
tokio::time::interval(std::time::Duration::from_millis(100));
loop {
tokio::select! {
_ = interval.tick() => {
while let Ok(Some(message)) = rx.try_next() {
f(&conn, &channel, &message);
}
}
_ = tokio::signal::ctrl_c() => {
break;
}
}
}
}
Err(e) => {
log::error!("channel={}, error={}", channel, e);
}
}
});
}
}
}