signer-crdt 0.4.1

Signer CRDT (Conflict-free Replicated Data Type) package.
Documentation
use chrono::{TimeZone as _, Utc};
use sea_orm::{
    ActiveModelTrait,
    ActiveValue::{NotSet, Set},
    ColumnTrait, DatabaseTransaction, EntityTrait, ModelTrait, QueryFilter,
};
use serde::{Deserialize, Serialize};

use crate::{
    SignerMeta,
    crdt::errors::CrdtError,
    delta::{deltaobject::DeltaField, message_do::MessageDO},
    entity::message,
    view::MessageVO,
};

use super::{CrdtType, crdt::CrdtDelta};

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct MessageKey {
    pub id: String,
    pub user_key: String,
    pub chat_key: String,
    pub chat_variant: String,
}

impl CrdtType for MessageDO {
    // (chat_key, chat_variant, user_key, id)
    type PrimaryKey = MessageKey;

    async fn put(
        &self,
        core: &SignerMeta,
        tx: &DatabaseTransaction,
    ) -> Result<CrdtDelta<Self>, CrdtError> {
        let chat_key = self.chat.chat_key(core).await?;
        let chat_variant = self.chat.chat_variant();

        let r = message::Entity::find()
            .filter(
                message::Column::Id
                    .eq(&self.id)
                    .and(message::Column::ChatKey.eq(&chat_key))
                    .and(message::Column::ChatVariant.eq(&chat_variant))
                    .and(message::Column::UserKey.eq(&self.user_key)),
            )
            .one(tx)
            .await?;
        if let Some(val) = &r {
            let vo = MessageVO::from_model(tx, val).await?;
            if !self.has_changed(&vo) {
                return Ok(CrdtDelta::Skip);
            }
        }

        let m = message::ActiveModel {
            id: Set(self.id.clone()),
            chat_variant: Set(self.chat.chat_variant()),
            chat_key: Set(self.chat.chat_key(core).await?),
            user_key: Set(self.user_key.clone()),
            parent_id: self.parent_id.clone().into(),
            parent_user_key: self.parent_user_key.clone().into(),
            content: match &self.content {
                DeltaField::Set(val) => Set(serde_json::to_string(val)?),
                DeltaField::NotSet => NotSet,
            },
            content_type: match &self.content_type {
                DeltaField::Set(val) => Set(val.clone()),
                DeltaField::NotSet => NotSet,
            },
            receiver_keys: match &self.receiver_keys {
                DeltaField::Set(val) => {
                    if val.is_empty() {
                        // 如果 receiver_keys 为空,插入 NULL
                        Set(String::new()) // 或者使用 NotSet
                    } else {
                        Set(val.join(","))
                    }
                },
                DeltaField::NotSet => NotSet,
            },
            create_time: match &self.create_time {
                DeltaField::Set(val) => Set(Utc.timestamp_millis_opt(*val).unwrap().naive_utc()),
                DeltaField::NotSet => NotSet,
            },
        };

        match r {
            Some(val) => {
                let e = m.update(tx).await?;
                let message_do = MessageDO::new(
                    &MessageVO::from_model(tx, &e).await?,
                    &MessageVO::from_model(tx, &val).await?,
                )?;
                Ok(CrdtDelta::Put(message_do))
            }
            None => {
                let e = m.insert(tx).await?;
                Ok(CrdtDelta::Del(MessageKey {
                    chat_key: e.chat_key,
                    chat_variant: e.chat_variant,
                    user_key: e.user_key,
                    id: e.id,
                }))
            }
        }
    }

    async fn del(
        _core: &SignerMeta,
        key: &Self::PrimaryKey,
        tx: &DatabaseTransaction,
    ) -> Result<CrdtDelta<Self>, CrdtError> {
        let r = message::Entity::find()
            .filter(
                message::Column::Id.eq(&key.id).and(
                    message::Column::UserKey
                        .eq(&key.user_key)
                        .and(message::Column::ChatKey.eq(&key.chat_key))
                        .and(message::Column::ChatVariant.eq(&key.chat_variant)),
                ),
            )
            .one(tx)
            .await?;
        if let Some(val) = r {
            let message_do = MessageDO::from(MessageVO::from_model(tx, &val).await?);
            val.delete(tx).await?;
            Ok(CrdtDelta::Put(message_do))
        } else {
            Ok(CrdtDelta::Skip)
        }
    }
}