use lazy_static::lazy_static;
use mongodb::bson::doc;
use std::collections::HashSet;
use crate::services::user_service::get_username;
use super::input::TalkMessage;
lazy_static! {
pub static ref CHAT_MSG_DB: &'static str = "chat_db";
pub static ref CHAT_MSG_COL: &'static str = "t_chat_msg";
}
#[derive(serde::Serialize, serde::Deserialize, Debug, Clone)]
pub struct ChatMessage {
pub id: String,
pub from: String,
pub to: String,
pub to_role: String,
pub msg: String,
pub msg_type: String,
pub ts: i64,
}
#[derive(serde::Serialize, serde::Deserialize, Debug, Clone, Default)]
pub struct ChatMessageVo {
pub id: String,
pub from: String,
pub from_name: String,
pub from_avator: String,
pub to: String,
pub to_name: String,
pub to_avator: String,
pub to_role: String,
pub msg: String,
pub msg_type: String,
pub ts: i64,
}
impl ChatMessage {
pub fn build(curr_user: &str, msg_id: &str, msg: &TalkMessage) -> Self {
Self {
id: msg_id.to_string(),
from: curr_user.to_string(),
to: msg.to.to_owned(),
to_role: msg.to_role.to_owned(),
msg: msg.msg.trim().to_owned(),
msg_type: msg.msg_type.to_owned(),
ts: crate::commons::timestamp_millis(),
}
}
}
impl ChatMessageVo {
pub fn from_message(message: &ChatMessage) -> Self {
Self {
id: message.id.to_owned(),
from: message.from.to_owned(),
to: message.to.to_owned(),
to_role: message.to_role.to_owned(),
msg: message.msg.to_owned(),
msg_type: message.msg_type.to_owned(),
ts: message.ts.to_owned(),
..Default::default()
}
}
}
pub async fn save_chat_message(
ctx: &crate::server::AppContext,
id: Option<String>,
curr_user: &str,
msg: TalkMessage,
) -> Result<Option<ChatMessage>, anyhow::Error> {
if let Some(id) = id {
let msg = ChatMessage::build(curr_user, &id, &msg);
if msg.msg.is_empty() {
return Ok(None);
}
if let Some(mongo) = ctx.mongo_opt() {
let _ = mongo
.insert_one(*CHAT_MSG_DB, *CHAT_MSG_COL, msg.clone())
.await?;
log::info!("publish_chat_message: msg={:?}", msg);
Ok(Some(msg))
} else {
log::warn!("TalkMessage: id={}, msg=mongo client not enable!", id);
Ok(None)
}
} else {
Ok(None)
}
}
pub async fn save_chat_message2(
context: &crate::server::AppContext,
id: Option<String>,
curr_user: &str,
msg: TalkMessage,
) -> Result<Option<ChatMessageVo>, anyhow::Error> {
if let Some(id) = id {
let msg = ChatMessage::build(curr_user, &id, &msg);
if msg.msg.is_empty() {
return Ok(None);
}
if let Some(mongo) = context.mongo_opt() {
let msg_id = mongo
.insert_one(*CHAT_MSG_DB, *CHAT_MSG_COL, msg.clone())
.await?;
log::info!("publish_chat_message: msg={:?}", msg);
let msg = mongo
.find_one::<ChatMessage>(*CHAT_MSG_DB, *CHAT_MSG_COL, &msg_id)
.await?;
if let Some(msg) = msg {
let mut target = ChatMessageVo::from_message(&msg);
let from_name = get_username(&msg.from, context.mysql()).await?;
let to_name = get_username(&msg.to, context.mysql()).await?;
if let Some(name) = from_name {
target.from_name = name;
}
if let Some(name) = to_name {
target.to_name = name;
}
target.from_avator = format!("http://192.168.1.2:9200/avator/{}", &msg.from);
target.to_avator = format!("http://192.168.1.2:9200/avator/{}", &msg.to);
return Ok(Some(target));
}
Ok(None)
} else {
log::warn!("TalkMessage: id={}, msg=mongo client not enable!", id);
Ok(None)
}
} else {
Ok(None)
}
}
pub async fn query_history_message(
context: &crate::server::AppContext,
_from: &str,
_to: &str,
) -> Result<Vec<ChatMessage>, anyhow::Error> {
if let Some(mongo) = context.mongo_opt() {
let _filter = doc! {"$or": [ {"from": _from, "to": _to}, {"from": _to, "to": _from}]};
let _sort = doc! {"ts": -1};
let _list = mongo
.select::<ChatMessage>(*CHAT_MSG_DB, *CHAT_MSG_COL, _filter, _sort, (0, 300))
.await?;
return Ok(_list);
}
Ok(Vec::new())
}
pub async fn query_history_msg(
context: &crate::server::AppContext,
_from: &str,
_to: &str,
) -> Result<Vec<ChatMessageVo>, anyhow::Error> {
let mut list: Vec<ChatMessageVo> = Vec::new();
if let Some(mongo) = context.mongo_opt() {
let _filter = doc! {"$or": [ {"from": _from, "to": _to}, {"from": _to, "to": _from}]};
let _sort = doc! {"ts": -1};
let _list = mongo
.select::<ChatMessage>(*CHAT_MSG_DB, *CHAT_MSG_COL, _filter, _sort, (0, 300))
.await?;
for item in _list.into_iter() {
let mut target = ChatMessageVo::from_message(&item);
let from_name = get_username(_from, context.mysql()).await?;
let to_name = get_username(_to, context.mysql()).await?;
if let Some(name) = from_name {
target.from_name = name;
}
if let Some(name) = to_name {
target.to_name = name;
}
target.from_avator = format!("http://192.168.1.2:9200/avator/{}", _from);
target.to_avator = format!("http://192.168.1.2:9200/avator/{}", _to);
list.push(target);
}
return Ok(list);
}
Ok(list)
}
pub async fn query_last_message(
ctx: &crate::server::AppContext,
_from: &str,
_to: &str,
) -> Result<Option<ChatMessageVo>, anyhow::Error> {
if let Some(mongo) = ctx.mongo_opt() {
let _filter = doc! {"$or": [ {"from": _from, "to": _to}, {"from": _to, "to": _from}]};
let _sort = doc! {"ts": -1};
let _list = mongo
.select::<ChatMessage>(*CHAT_MSG_DB, *CHAT_MSG_COL, _filter, _sort, (0, 1))
.await?;
if _list.is_empty() {
return Ok(None);
}
let mut target = ChatMessageVo::from_message(&_list[0]);
let from_name = get_username(_from, ctx.mysql()).await?;
let to_name = get_username(_to, ctx.mysql()).await?;
if let Some(name) = from_name {
target.from_name = name;
}
if let Some(name) = to_name {
target.to_name = name;
}
target.from_avator = format!("http://192.168.1.2:9200/avator/{}", _from);
target.to_avator = format!("http://192.168.1.2:9200/avator/{}", _to);
return Ok(Some(target));
}
Ok(None)
}
pub async fn query_history_talk(
ctx: &crate::server::AppContext,
_from: &str,
) -> Result<Vec<String>, anyhow::Error> {
if let Some(mongo) = ctx.mongo_opt() {
let _filter1 = doc! {"from": _from};
let _filter2 = doc! {"to": _from};
let mut _list1 = mongo
.distinct::<String>(*CHAT_MSG_DB, *CHAT_MSG_COL, "to", _filter1)
.await?;
let _list2 = mongo
.distinct::<String>(*CHAT_MSG_DB, *CHAT_MSG_COL, "from", _filter2)
.await?;
_list1.extend(_list2);
let unique_set: HashSet<String> = _list1.into_iter().collect();
let _list1: Vec<String> = unique_set.into_iter().collect();
return Ok(_list1);
}
Ok(Vec::new())
}