use chrono::{TimeZone as _, Utc};
use sea_orm::{
ActiveModelTrait,
ActiveValue::{NotSet, Set},
ColumnTrait, DatabaseTransaction, EntityTrait, ModelTrait, QueryFilter,
};
use serde::{Deserialize, Serialize};
use crate::{
SignerMeta,
crdt::errors::CrdtError,
delta::{deltaobject::DeltaField, message_do::MessageDO},
entity::message,
view::MessageVO,
};
use super::{CrdtType, crdt::CrdtDelta};
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct MessageKey {
pub id: String,
pub user_key: String,
pub chat_key: String,
pub chat_variant: String,
}
impl CrdtType for MessageDO {
type PrimaryKey = MessageKey;
async fn put(
&self,
core: &SignerMeta,
tx: &DatabaseTransaction,
) -> Result<CrdtDelta<Self>, CrdtError> {
let chat_key = self.chat.chat_key(core).await?;
let chat_variant = self.chat.chat_variant();
let r = message::Entity::find()
.filter(
message::Column::Id
.eq(&self.id)
.and(message::Column::ChatKey.eq(&chat_key))
.and(message::Column::ChatVariant.eq(&chat_variant))
.and(message::Column::UserKey.eq(&self.user_key)),
)
.one(tx)
.await?;
if let Some(val) = &r {
let vo = MessageVO::from_model(tx, val).await?;
if !self.has_changed(&vo) {
return Ok(CrdtDelta::Skip);
}
}
let m = message::ActiveModel {
id: Set(self.id.clone()),
chat_variant: Set(self.chat.chat_variant()),
chat_key: Set(self.chat.chat_key(core).await?),
user_key: Set(self.user_key.clone()),
parent_id: self.parent_id.clone().into(),
parent_user_key: self.parent_user_key.clone().into(),
content: match &self.content {
DeltaField::Set(val) => Set(serde_json::to_string(val)?),
DeltaField::NotSet => NotSet,
},
content_type: match &self.content_type {
DeltaField::Set(val) => Set(val.clone()),
DeltaField::NotSet => NotSet,
},
receiver_keys: match &self.receiver_keys {
DeltaField::Set(val) => {
if val.is_empty() {
Set(String::new()) } else {
Set(val.join(","))
}
},
DeltaField::NotSet => NotSet,
},
create_time: match &self.create_time {
DeltaField::Set(val) => Set(Utc.timestamp_millis_opt(*val).unwrap().naive_utc()),
DeltaField::NotSet => NotSet,
},
};
match r {
Some(val) => {
let e = m.update(tx).await?;
let message_do = MessageDO::new(
&MessageVO::from_model(tx, &e).await?,
&MessageVO::from_model(tx, &val).await?,
)?;
Ok(CrdtDelta::Put(message_do))
}
None => {
let e = m.insert(tx).await?;
Ok(CrdtDelta::Del(MessageKey {
chat_key: e.chat_key,
chat_variant: e.chat_variant,
user_key: e.user_key,
id: e.id,
}))
}
}
}
async fn del(
_core: &SignerMeta,
key: &Self::PrimaryKey,
tx: &DatabaseTransaction,
) -> Result<CrdtDelta<Self>, CrdtError> {
let r = message::Entity::find()
.filter(
message::Column::Id.eq(&key.id).and(
message::Column::UserKey
.eq(&key.user_key)
.and(message::Column::ChatKey.eq(&key.chat_key))
.and(message::Column::ChatVariant.eq(&key.chat_variant)),
),
)
.one(tx)
.await?;
if let Some(val) = r {
let message_do = MessageDO::from(MessageVO::from_model(tx, &val).await?);
val.delete(tx).await?;
Ok(CrdtDelta::Put(message_do))
} else {
Ok(CrdtDelta::Skip)
}
}
}