signer-crdt 0.4.1

Signer CRDT (Conflict-free Replicated Data Type) package.
Documentation
use sea_orm::prelude::*;
use tracing;
use serde::{Deserialize, Serialize};
use signer_core::{SignerKeys, SignerSigned, SignerUser};

use crate::{
    CrdtDelta, CrdtDeltaBox, SignerMeta, ViewError,
    crdt::reconcile,
    entity::{self, user},
    user_do::UserDO,
};

#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct UserVO {
    pub pub_key: String,
    pub user_public: String,
    pub signed_user_public: String,
}

impl From<entity::user::Model> for UserVO {
    fn from(model: entity::user::Model) -> Self {
        Self {
            pub_key: model.pub_key,
            user_public: model.user_public,
            signed_user_public: model.signed_user_public,
        }
    }
}

impl UserVO {
    pub async fn from_user_data(keys: &SignerKeys, user: &SignerUser) -> Result<Self, ViewError> {
        let vo = Self {
            pub_key: user.pub_key.clone(),
            user_public: serde_json::to_string(user)?,
            signed_user_public: serde_json::to_string(&SignerSigned::from_value(keys, user)?)?,
        };

        Ok(vo)
    }

    pub fn public(&self) -> Result<SignerUser, ViewError> {
        let user_public = serde_json::from_str(&self.user_public)?;
        Ok(user_public)
    }

    // List all users
    pub async fn list(meta: &SignerMeta) -> Result<Vec<UserVO>, ViewError> {
        let models = user::Entity::find().all(&meta.conn).await?;

        let users = models
            .into_iter()
            .map(|model| UserVO::from(model))
            .collect();

        Ok(users)
    }

    // Get a user by primary key
    pub async fn get(meta: &SignerMeta, pub_key: &str) -> Result<Option<UserVO>, ViewError> {
        let model = user::Entity::find_by_id(pub_key).one(&meta.conn).await?;

        Ok(model.map(|m| UserVO::from(m)))
    }

    // Put (create or update) a user
    pub async fn put(&self, meta: &SignerMeta) -> Result<(), ViewError> {
        // 获取写操作锁,确保 VO 写操作串行执行
        let _write_lock = meta.write_mutex.lock().await;
        
        // First, get the existing user if it exists
        let existing = user::Entity::find_by_id(&self.pub_key)
            .one(&meta.conn)
            .await?;

        let delta = if let Some(existing_model) = existing {
            // Create a delta object for the update
            let existing_vo = UserVO::from(existing_model);
            UserDO::new(&existing_vo, self)?
        } else {
            // Create a new object
            UserDO::from(self.clone())
        };

        // Apply the delta through the CRDT system
        let delta_box = CrdtDeltaBox::User(CrdtDelta::Put(delta));
        delta_box.insert(meta).await?;

        // Reconcile to apply the changes to the database
        reconcile(meta).await?;

        // 通过事件总线发送事件
        if let Err(e) = meta.event_bus.send(crate::CrdtMutate::UserPut(self.clone())) {
            tracing::warn!("Failed to send UserPut event: {}", e);
        }

        Ok(())
    }

    // Put (create or update) multiple users
    pub async fn put_many(users: Vec<UserVO>, meta: &SignerMeta) -> Result<(), ViewError> {
        // 获取写操作锁,确保 VO 写操作串行执行
        let _write_lock = meta.write_mutex.lock().await;

        let mut delta_boxes = Vec::new();

        // Collect all delta boxes
        for user in &users {
            // First, get the existing user if it exists
            let existing = user::Entity::find_by_id(&user.pub_key)
                .one(&meta.conn)
                .await?;

            let delta = if let Some(existing_model) = existing {
                // Create a delta object for the update
                let existing_vo = UserVO::from(existing_model);
                UserDO::new(&existing_vo, user)?
            } else {
                // Create a new object
                UserDO::from(user.clone())
            };

            // Apply the delta through the CRDT system
            let delta_box = CrdtDeltaBox::User(CrdtDelta::Put(delta));
            delta_boxes.push(delta_box);
        }

        // Insert all delta boxes
        for delta_box in delta_boxes {
            delta_box.insert(meta).await?;
        }

        // Reconcile to apply the changes to the database (only once)
        reconcile(meta).await?;

        // 通过事件总线发送事件 for each user
        for user in users {
            if let Err(e) = meta.event_bus.send(crate::CrdtMutate::UserPut(user)) {
                tracing::warn!("Failed to send UserPut event: {}", e);
            }
        }

        Ok(())
    }

    // Delete a user by primary key
    pub async fn del(meta: &SignerMeta, pub_key: &str) -> Result<(), ViewError> {
        // 获取写操作锁,确保 VO 写操作串行执行
        let _write_lock = meta.write_mutex.lock().await;
        
        // Create a delta object for the deletion
        let delta_box = CrdtDeltaBox::User(CrdtDelta::Del(pub_key.to_string()));
        delta_box.insert(meta).await?;

        // 通过事件总线发送事件
        if let Err(e) = meta.event_bus.send(crate::CrdtMutate::UserDel(pub_key.to_string())) {
            tracing::warn!("Failed to send UserDel event: {}", e);
        }

        Ok(())
    }

    // Delete multiple users by primary keys
    pub async fn del_many(pub_keys: Vec<String>, meta: &SignerMeta) -> Result<(), ViewError> {
        // 获取写操作锁,确保 VO 写操作串行执行
        let _write_lock = meta.write_mutex.lock().await;

        let mut delta_boxes = Vec::new();

        // Collect all delta boxes
        for pub_key in &pub_keys {
            let delta_box = CrdtDeltaBox::User(CrdtDelta::Del(pub_key.to_string()));
            delta_boxes.push(delta_box);
        }

        // Insert all delta boxes
        for delta_box in delta_boxes {
            delta_box.insert(meta).await?;
        }

        // Reconcile to apply the changes to the database (only once)
        reconcile(meta).await?;

        // 通过事件总线发送事件 for each user
        for pub_key in pub_keys {
            if let Err(e) = meta.event_bus.send(crate::CrdtMutate::UserDel(pub_key)) {
                tracing::warn!("Failed to send UserDel event: {}", e);
            }
        }

        Ok(())
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use crate::SignerMeta;
    use signer_core::SignerKeys;

    #[tokio::test]
    async fn test_user_vo_put_and_get() {
        // 创建一个测试用户
        let keys = SignerKeys::generate().unwrap();

        // 使用 SignerMeta::from_mem 创建实例
        let meta = SignerMeta::from_mem(keys.clone()).await.unwrap();

        // 创建 UserVO
        let user = signer_core::SignerUser {
            pub_key: keys.pub_key.clone(),
            username: "test_user".to_string(),
            ..Default::default()
        };

        let user_vo = UserVO::from_user_data(&keys, &user).await.unwrap();

        // 写入 UserVO
        user_vo.put(&meta).await.unwrap();

        // 读取 UserVO
        let stored_user_vo = UserVO::get(&meta, &keys.pub_key).await.unwrap();

        // 验证 UserVO 是否正确存储和读取
        assert!(stored_user_vo.is_some());
        let stored_vo = stored_user_vo.unwrap();
        assert_eq!(stored_vo.pub_key, user_vo.pub_key);
        assert_eq!(stored_vo.user_public, user_vo.user_public);
        assert_eq!(stored_vo.signed_user_public, user_vo.signed_user_public);
    }
}