use sea_orm::{ColumnTrait, ConnectionTrait, EntityTrait, QueryFilter, QueryOrder};
use tracing;
use serde::{Deserialize, Serialize};
use crate::{
crdt::crdt::{CrdtDelta, CrdtDeltaBox}, delta::message_do::MessageDO, entity::{chat, message}, MessageViewFromModelError, SignerMeta, ViewError
};
use super::ChatVO;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct MessageVO {
pub chat: ChatVO,
pub id: String,
pub user_key: String,
pub parent_id: Option<String>,
pub parent_user_key: Option<String>,
pub content: MessageContent,
pub receiver_keys: Vec<String>,
pub create_time: i64,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub enum MessageContent {
Text(String),
Image(String),
Video(String),
Audio(String),
File(String),
Check,
}
impl MessageVO {
pub async fn create(
meta: &SignerMeta,
chat_vo: &ChatVO,
content: &MessageContent,
) -> Result<Self, ViewError> {
let mut receiver_keys = chat_vo.receiver_keys(&meta).await?;
receiver_keys.retain(|key| key != &meta.keys.pub_key);
Ok(Self {
chat: chat_vo.clone(),
id: uuid::Uuid::new_v4().to_string(),
parent_id: None,
parent_user_key: None,
content: content.clone(),
user_key: meta.keys.pub_key.clone(),
receiver_keys,
create_time: chrono::Utc::now().timestamp_millis(),
})
}
pub async fn from_model(
c: &impl ConnectionTrait,
model: &message::Model,
) -> Result<Self, MessageViewFromModelError> {
let chat_model = chat::Entity::find()
.filter(
chat::Column::ChatKey
.eq(&model.chat_key)
.and(chat::Column::ChatVariant.eq(&model.chat_variant)),
)
.one(c)
.await?
.expect("chat not found");
let chat_vo = serde_json::from_str(&chat_model.view_object)?;
Ok(Self {
chat: chat_vo,
id: model.id.clone(),
parent_id: model.parent_id.clone(),
parent_user_key: model.parent_user_key.clone(),
content: serde_json::from_str(&model.content)?,
user_key: model.user_key.clone(),
receiver_keys: if model.receiver_keys.is_empty() {
vec![]
} else {
model
.receiver_keys
.split(',')
.map(|s| s.to_string())
.collect()
},
create_time: model.create_time.and_utc().timestamp_millis(),
})
}
pub async fn create_check_message(&self, meta: &SignerMeta) -> Result<MessageVO, ViewError> {
let message = MessageVO {
chat: self.chat.clone(),
id: uuid::Uuid::new_v4().to_string(),
parent_id: Some(self.id.clone()),
parent_user_key: Some(self.user_key.clone()),
content: MessageContent::Check,
user_key: meta.keys.pub_key.clone(),
receiver_keys: vec![],
create_time: chrono::Utc::now().timestamp_millis(),
};
Ok(message)
}
pub async fn has_user_check(&self, meta: &SignerMeta) -> Result<bool, ViewError> {
let chat_key = self.chat.chat_key(meta).await?;
let chat_variant = self.chat.chat_variant();
let check_message = message::Entity::find()
.filter(
message::Column::ParentId.eq(&self.id)
.and(message::Column::ParentUserKey.eq(&self.user_key))
.and(message::Column::UserKey.eq(&meta.keys.pub_key))
.and(message::Column::ChatKey.eq(&chat_key))
.and(message::Column::ChatVariant.eq(&chat_variant))
.and(message::Column::ContentType.eq(MessageContent::Check.ty())),
)
.one(&meta.conn)
.await?;
Ok(check_message.is_some())
}
pub async fn get_check_messages(&self, meta: &SignerMeta) -> Result<Vec<MessageVO>, ViewError> {
let chat_key = self.chat.chat_key(meta).await?;
let chat_variant = self.chat.chat_variant();
let models = message::Entity::find()
.filter(
message::Column::ParentId.eq(&self.id)
.and(message::Column::ParentUserKey.eq(&self.user_key))
.and(message::Column::ChatKey.eq(&chat_key))
.and(message::Column::ChatVariant.eq(&chat_variant))
.and(message::Column::ContentType.eq(MessageContent::Check.ty())),
)
.order_by_asc(message::Column::CreateTime)
.all(&meta.conn)
.await?;
let mut check_messages = Vec::new();
for model in models {
let message_vo = MessageVO::from_model(&meta.conn, &model).await?;
check_messages.push(message_vo);
}
Ok(check_messages)
}
pub async fn list(meta: &SignerMeta) -> Result<Vec<MessageVO>, ViewError> {
let models = message::Entity::find()
.filter(message::Column::ContentType.ne(MessageContent::Check.ty()))
.order_by_asc(message::Column::CreateTime)
.all(&meta.conn)
.await?;
let mut messages = Vec::new();
for model in models {
let message_vo = MessageVO::from_model(&meta.conn, &model).await?;
messages.push(message_vo);
}
Ok(messages)
}
pub async fn list_by_chat(
meta: &SignerMeta,
chat_key: &str,
chat_variant: &str,
) -> Result<Vec<MessageVO>, ViewError> {
let models = message::Entity::find()
.filter(
message::Column::ChatKey
.eq(chat_key)
.and(message::Column::ChatVariant.eq(chat_variant))
.and(message::Column::ContentType.ne(MessageContent::Check.ty())),
)
.order_by_asc(message::Column::CreateTime)
.all(&meta.conn)
.await?;
let mut messages = Vec::new();
for model in models {
let message_vo = MessageVO::from_model(&meta.conn, &model).await?;
messages.push(message_vo);
}
Ok(messages)
}
pub async fn get(
meta: &SignerMeta,
id: &str,
chat_key: &str,
chat_variant: &str,
user_key: &str,
) -> Result<Option<MessageVO>, ViewError> {
let model = message::Entity::find_by_id((
id.to_string(),
chat_key.to_string(),
chat_variant.to_string(),
user_key.to_string(),
))
.one(&meta.conn)
.await?;
match model {
Some(model) => {
let message_vo = MessageVO::from_model(&meta.conn, &model).await?;
Ok(Some(message_vo))
}
None => Ok(None),
}
}
pub async fn put(&self, meta: &SignerMeta) -> Result<(), ViewError> {
let _write_lock = meta.write_mutex.lock().await;
let chat_key = self.chat.chat_key(meta).await?;
let chat_variant = self.chat.chat_variant();
let existing = message::Entity::find_by_id((
self.id.to_string(),
chat_key.to_string(),
chat_variant.to_string(),
self.user_key.to_string(),
))
.one(&meta.conn)
.await?;
let delta = if let Some(existing_model) = existing {
let existing_vo = MessageVO::from_model(&meta.conn, &existing_model).await?;
MessageDO::new(&self, &existing_vo)?
} else {
MessageDO::from(self.clone())
};
let delta_box = CrdtDeltaBox::Message(CrdtDelta::Put(delta));
delta_box.insert(meta).await?;
crate::crdt::reconcile(meta).await?;
if let Err(e) = meta.event_bus.send(crate::CrdtMutate::MessagePut(self.clone())) {
tracing::warn!("Failed to send MessagePut event: {}", e);
}
Ok(())
}
pub async fn put_many(messages: Vec<MessageVO>, meta: &SignerMeta) -> Result<(), ViewError> {
let _write_lock = meta.write_mutex.lock().await;
let mut delta_boxes = Vec::new();
for message in &messages {
let chat_key = message.chat.chat_key(meta).await?;
let chat_variant = message.chat.chat_variant();
let existing = message::Entity::find_by_id((
message.id.to_string(),
chat_key.to_string(),
chat_variant.to_string(),
message.user_key.to_string(),
))
.one(&meta.conn)
.await?;
let delta = if let Some(existing_model) = existing {
let existing_vo = MessageVO::from_model(&meta.conn, &existing_model).await?;
MessageDO::new(&message, &existing_vo)?
} else {
MessageDO::from(message.clone())
};
let delta_box = CrdtDeltaBox::Message(CrdtDelta::Put(delta));
delta_boxes.push(delta_box);
}
for delta_box in delta_boxes {
delta_box.insert(meta).await?;
}
crate::crdt::reconcile(meta).await?;
for message in messages {
if let Err(e) = meta.event_bus.send(crate::CrdtMutate::MessagePut(message)) {
tracing::warn!("Failed to send MessagePut event: {}", e);
}
}
Ok(())
}
pub async fn del(
meta: &SignerMeta,
id: &str,
chat_key: &str,
chat_variant: &str,
user_key: &str,
) -> Result<(), ViewError> {
let _write_lock = meta.write_mutex.lock().await;
let delta_box =
CrdtDeltaBox::Message(CrdtDelta::Del(crate::crdt::crdt_message::MessageKey {
id: id.to_string(),
user_key: user_key.to_string(),
chat_key: chat_key.to_string(),
chat_variant: chat_variant.to_string(),
}));
delta_box.insert(meta).await?;
if let Err(e) = meta.event_bus.send(crate::CrdtMutate::MessageDel(
crate::crdt::crdt_message::MessageKey {
id: id.to_string(),
user_key: user_key.to_string(),
chat_key: chat_key.to_string(),
chat_variant: chat_variant.to_string(),
},
)) {
tracing::warn!("Failed to send MessageDel event: {}", e);
}
Ok(())
}
pub async fn del_many(
message_keys: Vec<crate::crdt::crdt_message::MessageKey>,
meta: &SignerMeta,
) -> Result<(), ViewError> {
let _write_lock = meta.write_mutex.lock().await;
let mut delta_boxes = Vec::new();
for message_key in &message_keys {
let delta_box =
CrdtDeltaBox::Message(CrdtDelta::Del(message_key.clone()));
delta_boxes.push(delta_box);
}
for delta_box in delta_boxes {
delta_box.insert(meta).await?;
}
crate::crdt::reconcile(meta).await?;
for message_key in message_keys {
if let Err(e) = meta.event_bus.send(crate::CrdtMutate::MessageDel(message_key)) {
tracing::warn!("Failed to send MessageDel event: {}", e);
}
}
Ok(())
}
pub async fn find_messages_to_check(
meta: &SignerMeta,
base_create_time: i64,
user_key: &str,
) -> Result<Vec<MessageVO>, ViewError> {
use sea_orm::{EntityTrait, QueryFilter, QueryOrder, ColumnTrait, Condition};
use crate::entity::message;
let base_condition = Condition::all()
.add(message::Column::CreateTime.lte(chrono::DateTime::from_timestamp_millis(base_create_time).unwrap()))
.add(message::Column::ReceiverKeys.contains(user_key))
.add(message::Column::ContentType.ne(MessageContent::Check.ty()));
let models = message::Entity::find()
.filter(base_condition)
.order_by_asc(message::Column::CreateTime)
.all(&meta.conn)
.await?;
let mut messages_to_check = Vec::new();
for model in models {
let message_vo = MessageVO::from_model(&meta.conn, &model).await?;
let has_check = message_vo.has_user_check(meta).await?;
if !has_check {
messages_to_check.push(message_vo);
}
}
Ok(messages_to_check)
}
}
impl MessageContent {
pub fn ty(&self) -> String {
match self {
MessageContent::Text(_) => "text".to_string(),
MessageContent::Image(_) => "image".to_string(),
MessageContent::Video(_) => "video".to_string(),
MessageContent::Audio(_) => "audio".to_string(),
MessageContent::File(_) => "file".to_string(),
MessageContent::Check => "check".to_string(),
}
}
}