use std::collections::BTreeSet;
use tracing;
use sea_orm::{ColumnTrait, EntityTrait, QueryFilter, QueryOrder, QuerySelect};
use serde::{Deserialize, Serialize};
use crate::{
MessageViewFromModelError, SignerMeta, UserVO, ViewError,
crdt::crdt::{CrdtDelta, CrdtDeltaBox},
delta::chat_do::ChatDO,
entity::{chat, message},
view::MessageContent,
};
use super::MessageVO;
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub enum ChatVO {
Private(PrivateChatVO),
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct PrivateChatVO {
pub peers: Vec<String>,
}
impl ChatVO {
pub fn chat_variant(&self) -> String {
match self {
ChatVO::Private(_) => format!("Private"),
}
}
pub async fn chat_key(&self, meta: &SignerMeta) -> Result<String, std::io::Error> {
match self {
ChatVO::Private(private) => {
let self_key = &meta.keys.pub_key;
if private.peers.len() == 0 || private.peers.len() > 2 {
return Err(std::io::Error::new(
std::io::ErrorKind::InvalidInput,
"private chat peers length must less than 2 and greater than 0".to_string(),
));
}
let peers = &private.peers;
if peers.len() == 1 {
return Ok(peers[0].clone());
}
if *self_key == peers[0] {
Ok(peers[1].clone())
} else {
Ok(peers[0].clone())
}
}
}
}
pub async fn uncheck_message_count(
&self,
meta: &SignerMeta,
) -> Result<u64, MessageViewFromModelError> {
let chat_variant = self.chat_variant();
let chat_key = self.chat_key(meta).await?;
let user_key = meta.keys.pub_key.clone();
let id_vec: Vec<String> = message::Entity::find()
.select_only()
.column(message::Column::Id)
.filter(
message::Column::ChatVariant
.eq(&chat_variant)
.and(message::Column::ChatKey.eq(&chat_key))
.and(message::Column::ReceiverKeys.contains(user_key.clone()))
.and(message::Column::UserKey.ne(user_key.clone())),
)
.into_tuple()
.all(&meta.conn)
.await?;
let checked = serde_json::to_string(&MessageContent::Check)?;
let checked_id_vec: Vec<String> = message::Entity::find()
.select_only()
.column(message::Column::ParentId)
.filter(
message::Column::ChatVariant
.eq(&chat_variant)
.and(message::Column::ChatKey.eq(&chat_key))
.and(message::Column::Content.eq(&checked))
.and(message::Column::UserKey.eq(&user_key))
.and(message::Column::ParentId.is_in(id_vec.clone())),
)
.into_tuple()
.all(&meta.conn)
.await?;
let mut uncheck_set = BTreeSet::from_iter(id_vec);
for checked_id in checked_id_vec {
uncheck_set.remove(&checked_id);
}
Ok(uncheck_set.len() as u64)
}
pub async fn latest_message(&self, meta: &SignerMeta) -> Result<Option<MessageVO>, ViewError> {
let chat_variant = self.chat_variant();
let chat_key = self.chat_key(meta).await?;
let message = match self {
ChatVO::Private { .. } => {
message::Entity::find()
.filter(
message::Column::ChatVariant
.eq(chat_variant)
.and(message::Column::ChatKey.eq(chat_key))
.and(message::Column::ContentType.ne(MessageContent::Check.ty())),
)
.order_by_desc(message::Column::CreateTime)
.one(&meta.conn)
.await?
}
};
let message_vo = match message {
None => None,
Some(message) => Some(MessageVO::from_model(&meta.conn, &message).await?),
};
Ok(message_vo)
}
pub async fn latest_activity_message(&self, meta: &SignerMeta) -> Result<Option<MessageVO>, ViewError> {
let chat_variant = self.chat_variant();
let chat_key = self.chat_key(meta).await?;
let user_key = &meta.keys.pub_key;
let last_own_message = message::Entity::find()
.filter(
message::Column::ChatKey.eq(&chat_key)
.and(message::Column::ChatVariant.eq(&chat_variant))
.and(message::Column::UserKey.eq(user_key))
.and(message::Column::ContentType.ne(MessageContent::Check.ty())),
)
.order_by_desc(message::Column::CreateTime)
.one(&meta.conn)
.await?;
let latest_check = message::Entity::find()
.filter(
message::Column::ChatKey.eq(&chat_key)
.and(message::Column::ChatVariant.eq(&chat_variant))
.and(message::Column::UserKey.eq(user_key))
.and(message::Column::ContentType.eq(MessageContent::Check.ty()))
.and(message::Column::ParentId.is_not_null()),
)
.order_by_desc(message::Column::CreateTime)
.one(&meta.conn)
.await?;
let last_read_message = if let Some(check) = latest_check {
if let (Some(parent_id), Some(parent_user_key)) = (check.parent_id, check.parent_user_key) {
message::Entity::find_by_id((
parent_id,
chat_key.to_string(),
chat_variant.to_string(),
parent_user_key,
))
.one(&meta.conn)
.await?
} else {
None
}
} else {
None
};
let result_model = match (last_own_message, last_read_message) {
(Some(own), Some(read)) => {
if own.create_time >= read.create_time {
Some(own)
} else {
Some(read)
}
},
(Some(own), None) => Some(own),
(None, Some(read)) => Some(read),
(None, None) => None,
};
if let Some(model) = result_model {
Ok(Some(MessageVO::from_model(&meta.conn, &model).await?))
} else {
Ok(None)
}
}
pub async fn destinations(&self, meta: &SignerMeta) -> Result<Vec<String>, ViewError> {
match self {
ChatVO::Private { .. } => {
let chat_key = self.chat_key(meta).await?;
let user_vo = crate::entity::user::Entity::find_by_id(chat_key)
.one(&meta.conn)
.await?;
let user_vo: UserVO = match user_vo {
None => {
return Err(ViewError::InvalidViewObjectError(
"数据库中无用户信息".to_string(),
));
}
Some(user_vo) => user_vo.into(),
};
let public = user_vo.public()?;
Ok(public.servers.iter().map(|i| i.addr.clone()).collect())
}
}
}
pub async fn receiver_keys(&self, meta: &SignerMeta) -> Result<Vec<String>, ViewError> {
let receiver_keys = match self {
ChatVO::Private { .. } => {
let chat_key = self.chat_key(meta).await?;
vec![chat_key]
}
};
Ok(receiver_keys)
}
pub async fn list(meta: &SignerMeta) -> Result<Vec<ChatVO>, ViewError> {
let models = chat::Entity::find().all(&meta.conn).await?;
let mut chats = Vec::new();
for model in models {
let chat_vo = serde_json::from_str(&model.view_object)?;
chats.push(chat_vo);
}
Ok(chats)
}
pub async fn get(
meta: &SignerMeta,
chat_key: &str,
chat_variant: &str,
) -> Result<Option<ChatVO>, ViewError> {
let model = chat::Entity::find_by_id((chat_key.to_string(), chat_variant.to_string()))
.one(&meta.conn)
.await?;
Ok(model
.map(|m| serde_json::from_str(&m.view_object))
.transpose()?)
}
pub async fn put(&self, meta: &SignerMeta) -> Result<(), ViewError> {
let _write_lock = meta.write_mutex.lock().await;
let chat_key = self.chat_key(meta).await?;
let chat_variant = self.chat_variant();
let existing = chat::Entity::find_by_id((chat_key.to_string(), chat_variant.to_string()))
.one(&meta.conn)
.await?;
let delta = if let Some(existing_model) = existing {
let existing_vo: ChatVO = serde_json::from_str(&existing_model.view_object)?;
ChatDO::new(meta, &self, &existing_vo).await?
} else {
ChatDO::from_vo(meta, self.clone()).await?
};
let delta_box = CrdtDeltaBox::Chat(CrdtDelta::Put(delta));
delta_box.insert(meta).await?;
crate::crdt::reconcile(meta).await?;
if let Err(e) = meta.event_bus.send(crate::CrdtMutate::ChatPut(self.clone())) {
tracing::warn!("Failed to send ChatPut event: {}", e);
}
Ok(())
}
pub async fn put_many(chats: Vec<ChatVO>, meta: &SignerMeta) -> Result<(), ViewError> {
let _write_lock = meta.write_mutex.lock().await;
let mut delta_boxes = Vec::new();
for chat in &chats {
let chat_key = chat.chat_key(meta).await?;
let chat_variant = chat.chat_variant();
let existing = chat::Entity::find_by_id((chat_key.to_string(), chat_variant.to_string()))
.one(&meta.conn)
.await?;
let delta = if let Some(existing_model) = existing {
let existing_vo: ChatVO = serde_json::from_str(&existing_model.view_object)?;
ChatDO::new(meta, &chat, &existing_vo).await?
} else {
ChatDO::from_vo(meta, chat.clone()).await?
};
let delta_box = CrdtDeltaBox::Chat(CrdtDelta::Put(delta));
delta_boxes.push(delta_box);
}
for delta_box in delta_boxes {
delta_box.insert(meta).await?;
}
crate::crdt::reconcile(meta).await?;
for chat in chats {
if let Err(e) = meta.event_bus.send(crate::CrdtMutate::ChatPut(chat)) {
tracing::warn!("Failed to send ChatPut event: {}", e);
}
}
Ok(())
}
pub async fn del(
meta: &SignerMeta,
chat_key: &str,
chat_variant: &str,
) -> Result<(), ViewError> {
let _write_lock = meta.write_mutex.lock().await;
let delta_box = CrdtDeltaBox::Chat(CrdtDelta::Del((
chat_key.to_string(),
chat_variant.to_string(),
)));
delta_box.insert(meta).await?;
if let Err(e) = meta.event_bus.send(crate::CrdtMutate::ChatDel(chat_key.to_string())) {
tracing::warn!("Failed to send ChatDel event: {}", e);
}
Ok(())
}
pub async fn del_many(
chat_keys_and_variants: Vec<(String, String)>,
meta: &SignerMeta,
) -> Result<(), ViewError> {
let _write_lock = meta.write_mutex.lock().await;
let mut delta_boxes = Vec::new();
for (chat_key, chat_variant) in &chat_keys_and_variants {
let delta_box = CrdtDeltaBox::Chat(CrdtDelta::Del((
chat_key.to_string(),
chat_variant.to_string(),
)));
delta_boxes.push(delta_box);
}
for delta_box in delta_boxes {
delta_box.insert(meta).await?;
}
crate::crdt::reconcile(meta).await?;
for (chat_key, _) in chat_keys_and_variants {
if let Err(e) = meta.event_bus.send(crate::CrdtMutate::ChatDel(chat_key)) {
tracing::warn!("Failed to send ChatDel event: {}", e);
}
}
Ok(())
}
}