Skip to main content

signer_crdt/
lib.rs

1pub mod crdt;
2pub mod delta;
3pub mod entity;
4pub mod errors;
5pub mod view;
6
7pub use crdt::*;
8pub use delta::*;
9pub use entity::*;
10pub use errors::*;
11pub use sea_orm;
12pub use view::*;
13
14use chrono;
15use sea_orm::{Database, DatabaseConnection, ConnectionTrait, DatabaseBackend, Statement};
16use signer_core::{SignerKeys, SignerUser};
17use signer_db::MigratorTrait;
18use std::sync::Arc;
19use tokio::sync::{broadcast, Mutex};
20
21/// CRDT 变更事件类型
22#[derive(Debug, Clone)]
23pub enum CrdtMutate {
24    /// UserVO 被创建或更新
25    UserPut(view::UserVO),
26    /// UserVO 被删除
27    UserDel(String), // pub_key
28    /// MessageVO 被创建或更新
29    MessagePut(view::MessageVO),
30    /// MessageVO 被删除
31    MessageDel(crate::crdt::crdt_message::MessageKey),
32    /// ChatVO 被创建或更新
33    ChatPut(view::ChatVO),
34    /// ChatVO 被删除
35    ChatDel(String), // chat_key
36}
37
38#[derive(Clone)]
39pub struct SignerMeta {
40    pub keys: SignerKeys,
41    pub conn: DatabaseConnection,
42    /// 事件总线 Sender
43    pub event_bus: broadcast::Sender<CrdtMutate>,
44    _event_bus_dummy_receiver: Arc<Mutex<broadcast::Receiver<CrdtMutate>>>,
45    /// VO 写操作锁,确保所有 VO 的 put/del 操作串行执行
46    pub write_mutex: Arc<Mutex<()>>,
47}
48
49impl SignerMeta {
50    /// 获取事件总线 Receiver
51    pub fn event_bus(&self) -> broadcast::Receiver<CrdtMutate> {
52        self.event_bus.subscribe()
53    }
54
55    /// 为数据库连接启用 WAL 模式以提高并发性能
56    async fn enable_wal_mode(conn: &DatabaseConnection) -> Result<(), ViewError> {
57        let stmt = Statement::from_string(
58            DatabaseBackend::Sqlite,
59            "PRAGMA journal_mode=WAL;".to_string(),
60        );
61        conn.execute(stmt).await?;
62        tracing::info!("已为数据库连接启用 WAL 模式,提高并发读写性能");
63        Ok(())
64    }
65
66    /// 执行 WAL checkpoint,将 WAL 内容落库
67    async fn wal_checkpoint(conn: &DatabaseConnection) -> Result<(), ViewError> {
68        let stmt = Statement::from_string(
69            DatabaseBackend::Sqlite,
70            "PRAGMA wal_checkpoint(full);".to_string(),
71        );
72        conn.execute(stmt).await?;
73        tracing::info!("已执行 WAL checkpoint,将 WAL 内容落库");
74        Ok(())
75    }
76
77    /// 通过查询数据库获取当前用户的 SignerUser 结构体
78    pub async fn get_current_user(&self) -> Result<SignerUser, ViewError> {
79        // 从数据库中获取用户信息
80        let user_vo = view::UserVO::get(self, &self.keys.pub_key)
81            .await?
82            .ok_or(ViewError::InvalidViewObjectError("用户不存在".to_string()))?;
83
84        // 解析签名的用户公钥信息
85        let signed_user_public: signer_core::SignerSigned<signer_core::SignerUser> =
86            serde_json::from_str(&user_vo.signed_user_public)?;
87
88        // 验证签名
89        let user_public = signed_user_public.verify_to_value()?;
90
91        Ok(user_public)
92    }
93
94    /// 创建一个基于内存数据库的 SignerMeta 实例
95    pub async fn from_mem(keys: SignerKeys) -> Result<Self, ViewError> {
96        let conn = Database::connect("sqlite::memory:").await?;
97        signer_db::Migrator::up(&conn, None).await?;
98        
99        // 启用 WAL 模式以提高并发性能
100        Self::enable_wal_mode(&conn).await?;
101        // 执行 WAL checkpoint,将 WAL 内容落库
102        Self::wal_checkpoint(&conn).await?;
103
104        // 创建事件总线,缓冲区大小为 100
105        let (event_bus, rx) = broadcast::channel(100);
106        let _event_bus_dummy_receiver = Arc::new(Mutex::new(rx));
107
108        let meta = SignerMeta {
109            keys,
110            conn,
111            event_bus,
112            _event_bus_dummy_receiver,
113            write_mutex: Arc::new(Mutex::new(())),
114        };
115
116        // 自检逻辑:检查当前用户的UserVO是否存在,如果不存在则创建默认的UserVO
117        if let Err(_) = meta.get_current_user().await {
118            // 当前用户不存在,创建默认的UserVO
119            let default_user = signer_core::SignerUser {
120                pub_key: meta.keys.pub_key.clone(),
121                update_time: chrono::Utc::now().timestamp(),
122                ..Default::default()
123            };
124
125            let user_vo = view::UserVO::from_user_data(&meta.keys, &default_user).await?;
126            user_vo.put(&meta).await?;
127        }
128
129        Ok(meta)
130    }
131
132    /// 从文件系统路径加载 SignerKeys 并创建 SignerMeta 实例
133    pub async fn from_fs(path: &str) -> Result<Self, ViewError> {
134        if !std::fs::exists(path)? {
135            std::fs::create_dir_all(path)?;
136        }
137
138        // 加载 SignerKeys
139        let keys_path = format!("{}/signer_keys.json", path);
140        let keys_data = std::fs::read_to_string(&keys_path)?;
141        let keys: SignerKeys = serde_json::from_str(&keys_data)?;
142
143        // 连接数据库
144        let db_url = format!("sqlite://{}/signer.sqlite3?mode=rwc", path);
145        let conn = Database::connect(db_url).await?;
146        signer_db::Migrator::up(&conn, None).await?;
147        
148        // 启用 WAL 模式以提高并发性能
149        Self::enable_wal_mode(&conn).await?;
150        // 执行 WAL checkpoint,将 WAL 内容落库
151        Self::wal_checkpoint(&conn).await?;
152
153        // 创建事件总线,缓冲区大小为 100
154        let (event_bus, rx) = broadcast::channel(100);
155        let _event_bus_dummy_receiver = Arc::new(Mutex::new(rx));
156
157        let meta = SignerMeta {
158            keys,
159            conn,
160            event_bus,
161            _event_bus_dummy_receiver,
162            write_mutex: Arc::new(Mutex::new(())),
163        };
164
165        // 自检逻辑:检查当前用户的UserVO是否存在,如果不存在则创建默认的UserVO
166        if let Err(_) = meta.get_current_user().await {
167            // 当前用户不存在,创建默认的UserVO
168            let default_user = signer_core::SignerUser {
169                pub_key: meta.keys.pub_key.clone(),
170                update_time: chrono::Utc::now().timestamp(),
171                ..Default::default()
172            };
173
174            let user_vo = view::UserVO::from_user_data(&meta.keys, &default_user).await?;
175            user_vo.put(&meta).await?;
176        }
177
178        Ok(meta)
179    }
180
181    /// 将当前 SignerMeta 的 SignerKeys 保存到文件系统
182    pub fn save_fs(&self, path: &str) -> Result<(), ViewError> {
183        // 确保目录存在
184        std::fs::create_dir_all(path)?;
185
186        // 保存 SignerKeys
187        let keys_path = format!("{}/signer_keys.json", path);
188        let keys_data = serde_json::to_string_pretty(&self.keys)?;
189        std::fs::write(keys_path, keys_data)?;
190
191        Ok(())
192    }
193
194    pub fn save_fs_raw_keys(keys: &SignerKeys, path: &str) -> Result<(), ViewError> {
195        // 确保目录存在
196        std::fs::create_dir_all(path)?;
197
198        // 保存 SignerKeys
199        let keys_path = format!("{}/signer_keys.json", path);
200        let keys_data = serde_json::to_string_pretty(keys)?;
201        std::fs::write(keys_path, keys_data)?;
202
203        Ok(())
204    }
205}