signer-crdt 0.4.1

Signer CRDT (Conflict-free Replicated Data Type) package.
Documentation
pub mod crdt;
pub mod delta;
pub mod entity;
pub mod errors;
pub mod view;

pub use crdt::*;
pub use delta::*;
pub use entity::*;
pub use errors::*;
pub use sea_orm;
pub use view::*;

use chrono;
use sea_orm::{Database, DatabaseConnection, ConnectionTrait, DatabaseBackend, Statement};
use signer_core::{SignerKeys, SignerUser};
use signer_db::MigratorTrait;
use std::sync::Arc;
use tokio::sync::{broadcast, Mutex};

/// CRDT 变更事件类型
#[derive(Debug, Clone)]
pub enum CrdtMutate {
    /// UserVO 被创建或更新
    UserPut(view::UserVO),
    /// UserVO 被删除
    UserDel(String), // pub_key
    /// MessageVO 被创建或更新
    MessagePut(view::MessageVO),
    /// MessageVO 被删除
    MessageDel(crate::crdt::crdt_message::MessageKey),
    /// ChatVO 被创建或更新
    ChatPut(view::ChatVO),
    /// ChatVO 被删除
    ChatDel(String), // chat_key
}

#[derive(Clone)]
pub struct SignerMeta {
    pub keys: SignerKeys,
    pub conn: DatabaseConnection,
    /// 事件总线 Sender
    pub event_bus: broadcast::Sender<CrdtMutate>,
    _event_bus_dummy_receiver: Arc<Mutex<broadcast::Receiver<CrdtMutate>>>,
    /// VO 写操作锁,确保所有 VO 的 put/del 操作串行执行
    pub write_mutex: Arc<Mutex<()>>,
}

impl SignerMeta {
    /// 获取事件总线 Receiver
    pub fn event_bus(&self) -> broadcast::Receiver<CrdtMutate> {
        self.event_bus.subscribe()
    }

    /// 为数据库连接启用 WAL 模式以提高并发性能
    async fn enable_wal_mode(conn: &DatabaseConnection) -> Result<(), ViewError> {
        let stmt = Statement::from_string(
            DatabaseBackend::Sqlite,
            "PRAGMA journal_mode=WAL;".to_string(),
        );
        conn.execute(stmt).await?;
        tracing::info!("已为数据库连接启用 WAL 模式,提高并发读写性能");
        Ok(())
    }

    /// 执行 WAL checkpoint,将 WAL 内容落库
    async fn wal_checkpoint(conn: &DatabaseConnection) -> Result<(), ViewError> {
        let stmt = Statement::from_string(
            DatabaseBackend::Sqlite,
            "PRAGMA wal_checkpoint(full);".to_string(),
        );
        conn.execute(stmt).await?;
        tracing::info!("已执行 WAL checkpoint,将 WAL 内容落库");
        Ok(())
    }

    /// 通过查询数据库获取当前用户的 SignerUser 结构体
    pub async fn get_current_user(&self) -> Result<SignerUser, ViewError> {
        // 从数据库中获取用户信息
        let user_vo = view::UserVO::get(self, &self.keys.pub_key)
            .await?
            .ok_or(ViewError::InvalidViewObjectError("用户不存在".to_string()))?;

        // 解析签名的用户公钥信息
        let signed_user_public: signer_core::SignerSigned<signer_core::SignerUser> =
            serde_json::from_str(&user_vo.signed_user_public)?;

        // 验证签名
        let user_public = signed_user_public.verify_to_value()?;

        Ok(user_public)
    }

    /// 创建一个基于内存数据库的 SignerMeta 实例
    pub async fn from_mem(keys: SignerKeys) -> Result<Self, ViewError> {
        let conn = Database::connect("sqlite::memory:").await?;
        signer_db::Migrator::up(&conn, None).await?;
        
        // 启用 WAL 模式以提高并发性能
        Self::enable_wal_mode(&conn).await?;
        // 执行 WAL checkpoint,将 WAL 内容落库
        Self::wal_checkpoint(&conn).await?;

        // 创建事件总线,缓冲区大小为 100
        let (event_bus, rx) = broadcast::channel(100);
        let _event_bus_dummy_receiver = Arc::new(Mutex::new(rx));

        let meta = SignerMeta {
            keys,
            conn,
            event_bus,
            _event_bus_dummy_receiver,
            write_mutex: Arc::new(Mutex::new(())),
        };

        // 自检逻辑:检查当前用户的UserVO是否存在,如果不存在则创建默认的UserVO
        if let Err(_) = meta.get_current_user().await {
            // 当前用户不存在,创建默认的UserVO
            let default_user = signer_core::SignerUser {
                pub_key: meta.keys.pub_key.clone(),
                update_time: chrono::Utc::now().timestamp(),
                ..Default::default()
            };

            let user_vo = view::UserVO::from_user_data(&meta.keys, &default_user).await?;
            user_vo.put(&meta).await?;
        }

        Ok(meta)
    }

    /// 从文件系统路径加载 SignerKeys 并创建 SignerMeta 实例
    pub async fn from_fs(path: &str) -> Result<Self, ViewError> {
        if !std::fs::exists(path)? {
            std::fs::create_dir_all(path)?;
        }

        // 加载 SignerKeys
        let keys_path = format!("{}/signer_keys.json", path);
        let keys_data = std::fs::read_to_string(&keys_path)?;
        let keys: SignerKeys = serde_json::from_str(&keys_data)?;

        // 连接数据库
        let db_url = format!("sqlite://{}/signer.sqlite3?mode=rwc", path);
        let conn = Database::connect(db_url).await?;
        signer_db::Migrator::up(&conn, None).await?;
        
        // 启用 WAL 模式以提高并发性能
        Self::enable_wal_mode(&conn).await?;
        // 执行 WAL checkpoint,将 WAL 内容落库
        Self::wal_checkpoint(&conn).await?;

        // 创建事件总线,缓冲区大小为 100
        let (event_bus, rx) = broadcast::channel(100);
        let _event_bus_dummy_receiver = Arc::new(Mutex::new(rx));

        let meta = SignerMeta {
            keys,
            conn,
            event_bus,
            _event_bus_dummy_receiver,
            write_mutex: Arc::new(Mutex::new(())),
        };

        // 自检逻辑:检查当前用户的UserVO是否存在,如果不存在则创建默认的UserVO
        if let Err(_) = meta.get_current_user().await {
            // 当前用户不存在,创建默认的UserVO
            let default_user = signer_core::SignerUser {
                pub_key: meta.keys.pub_key.clone(),
                update_time: chrono::Utc::now().timestamp(),
                ..Default::default()
            };

            let user_vo = view::UserVO::from_user_data(&meta.keys, &default_user).await?;
            user_vo.put(&meta).await?;
        }

        Ok(meta)
    }

    /// 将当前 SignerMeta 的 SignerKeys 保存到文件系统
    pub fn save_fs(&self, path: &str) -> Result<(), ViewError> {
        // 确保目录存在
        std::fs::create_dir_all(path)?;

        // 保存 SignerKeys
        let keys_path = format!("{}/signer_keys.json", path);
        let keys_data = serde_json::to_string_pretty(&self.keys)?;
        std::fs::write(keys_path, keys_data)?;

        Ok(())
    }

    pub fn save_fs_raw_keys(keys: &SignerKeys, path: &str) -> Result<(), ViewError> {
        // 确保目录存在
        std::fs::create_dir_all(path)?;

        // 保存 SignerKeys
        let keys_path = format!("{}/signer_keys.json", path);
        let keys_data = serde_json::to_string_pretty(keys)?;
        std::fs::write(keys_path, keys_data)?;

        Ok(())
    }
}