1pub mod crdt;
2pub mod delta;
3pub mod entity;
4pub mod errors;
5pub mod view;
6
7pub use crdt::*;
8pub use delta::*;
9pub use entity::*;
10pub use errors::*;
11pub use sea_orm;
12pub use view::*;
13
14use chrono;
15use sea_orm::{Database, DatabaseConnection, ConnectionTrait, DatabaseBackend, Statement};
16use signer_core::{SignerKeys, SignerUser};
17use signer_db::MigratorTrait;
18use std::sync::Arc;
19use tokio::sync::{broadcast, Mutex};
20
21#[derive(Debug, Clone)]
23pub enum CrdtMutate {
24 UserPut(view::UserVO),
26 UserDel(String), MessagePut(view::MessageVO),
30 MessageDel(crate::crdt::crdt_message::MessageKey),
32 ChatPut(view::ChatVO),
34 ChatDel(String), }
37
38#[derive(Clone)]
39pub struct SignerMeta {
40 pub keys: SignerKeys,
41 pub conn: DatabaseConnection,
42 pub event_bus: broadcast::Sender<CrdtMutate>,
44 _event_bus_dummy_receiver: Arc<Mutex<broadcast::Receiver<CrdtMutate>>>,
45 pub write_mutex: Arc<Mutex<()>>,
47}
48
49impl SignerMeta {
50 pub fn event_bus(&self) -> broadcast::Receiver<CrdtMutate> {
52 self.event_bus.subscribe()
53 }
54
55 async fn enable_wal_mode(conn: &DatabaseConnection) -> Result<(), ViewError> {
57 let stmt = Statement::from_string(
58 DatabaseBackend::Sqlite,
59 "PRAGMA journal_mode=WAL;".to_string(),
60 );
61 conn.execute(stmt).await?;
62 tracing::info!("已为数据库连接启用 WAL 模式,提高并发读写性能");
63 Ok(())
64 }
65
66 async fn wal_checkpoint(conn: &DatabaseConnection) -> Result<(), ViewError> {
68 let stmt = Statement::from_string(
69 DatabaseBackend::Sqlite,
70 "PRAGMA wal_checkpoint(full);".to_string(),
71 );
72 conn.execute(stmt).await?;
73 tracing::info!("已执行 WAL checkpoint,将 WAL 内容落库");
74 Ok(())
75 }
76
77 pub async fn get_current_user(&self) -> Result<SignerUser, ViewError> {
79 let user_vo = view::UserVO::get(self, &self.keys.pub_key)
81 .await?
82 .ok_or(ViewError::InvalidViewObjectError("用户不存在".to_string()))?;
83
84 let signed_user_public: signer_core::SignerSigned<signer_core::SignerUser> =
86 serde_json::from_str(&user_vo.signed_user_public)?;
87
88 let user_public = signed_user_public.verify_to_value()?;
90
91 Ok(user_public)
92 }
93
94 pub async fn from_mem(keys: SignerKeys) -> Result<Self, ViewError> {
96 let conn = Database::connect("sqlite::memory:").await?;
97 signer_db::Migrator::up(&conn, None).await?;
98
99 Self::enable_wal_mode(&conn).await?;
101 Self::wal_checkpoint(&conn).await?;
103
104 let (event_bus, rx) = broadcast::channel(100);
106 let _event_bus_dummy_receiver = Arc::new(Mutex::new(rx));
107
108 let meta = SignerMeta {
109 keys,
110 conn,
111 event_bus,
112 _event_bus_dummy_receiver,
113 write_mutex: Arc::new(Mutex::new(())),
114 };
115
116 if let Err(_) = meta.get_current_user().await {
118 let default_user = signer_core::SignerUser {
120 pub_key: meta.keys.pub_key.clone(),
121 update_time: chrono::Utc::now().timestamp(),
122 ..Default::default()
123 };
124
125 let user_vo = view::UserVO::from_user_data(&meta.keys, &default_user).await?;
126 user_vo.put(&meta).await?;
127 }
128
129 Ok(meta)
130 }
131
132 pub async fn from_fs(path: &str) -> Result<Self, ViewError> {
134 if !std::fs::exists(path)? {
135 std::fs::create_dir_all(path)?;
136 }
137
138 let keys_path = format!("{}/signer_keys.json", path);
140 let keys_data = std::fs::read_to_string(&keys_path)?;
141 let keys: SignerKeys = serde_json::from_str(&keys_data)?;
142
143 let db_url = format!("sqlite://{}/signer.sqlite3?mode=rwc", path);
145 let conn = Database::connect(db_url).await?;
146 signer_db::Migrator::up(&conn, None).await?;
147
148 Self::enable_wal_mode(&conn).await?;
150 Self::wal_checkpoint(&conn).await?;
152
153 let (event_bus, rx) = broadcast::channel(100);
155 let _event_bus_dummy_receiver = Arc::new(Mutex::new(rx));
156
157 let meta = SignerMeta {
158 keys,
159 conn,
160 event_bus,
161 _event_bus_dummy_receiver,
162 write_mutex: Arc::new(Mutex::new(())),
163 };
164
165 if let Err(_) = meta.get_current_user().await {
167 let default_user = signer_core::SignerUser {
169 pub_key: meta.keys.pub_key.clone(),
170 update_time: chrono::Utc::now().timestamp(),
171 ..Default::default()
172 };
173
174 let user_vo = view::UserVO::from_user_data(&meta.keys, &default_user).await?;
175 user_vo.put(&meta).await?;
176 }
177
178 Ok(meta)
179 }
180
181 pub fn save_fs(&self, path: &str) -> Result<(), ViewError> {
183 std::fs::create_dir_all(path)?;
185
186 let keys_path = format!("{}/signer_keys.json", path);
188 let keys_data = serde_json::to_string_pretty(&self.keys)?;
189 std::fs::write(keys_path, keys_data)?;
190
191 Ok(())
192 }
193
194 pub fn save_fs_raw_keys(keys: &SignerKeys, path: &str) -> Result<(), ViewError> {
195 std::fs::create_dir_all(path)?;
197
198 let keys_path = format!("{}/signer_keys.json", path);
200 let keys_data = serde_json::to_string_pretty(keys)?;
201 std::fs::write(keys_path, keys_data)?;
202
203 Ok(())
204 }
205}