signer-crdt 0.4.1

Signer CRDT (Conflict-free Replicated Data Type) package.
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
use sea_orm::{ColumnTrait, ConnectionTrait, EntityTrait, QueryFilter, QueryOrder};
use tracing;
use serde::{Deserialize, Serialize};

use crate::{
    crdt::crdt::{CrdtDelta, CrdtDeltaBox}, delta::message_do::MessageDO, entity::{chat, message}, MessageViewFromModelError, SignerMeta, ViewError
};

use super::ChatVO;

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct MessageVO {
    pub chat: ChatVO,
    pub id: String,
    pub user_key: String,
    pub parent_id: Option<String>,
    pub parent_user_key: Option<String>,
    pub content: MessageContent,
    pub receiver_keys: Vec<String>,
    pub create_time: i64,
}

#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub enum MessageContent {
    // 文本内容
    Text(String),
    // 图像链接
    Image(String),
    // 视频链接
    Video(String),
    // 音频链接
    Audio(String),
    // 文件链接
    File(String),
    // 已读
    Check,
}

impl MessageVO {
    pub async fn create(
        meta: &SignerMeta,
        chat_vo: &ChatVO,
        content: &MessageContent,
    ) -> Result<Self, ViewError> {
        let mut receiver_keys = chat_vo.receiver_keys(&meta).await?;
        
        // 移除发送者自己的公钥,避免发送者收到自己的 Check 消息
        receiver_keys.retain(|key| key != &meta.keys.pub_key);
        
        Ok(Self {
            chat: chat_vo.clone(),
            id: uuid::Uuid::new_v4().to_string(),
            parent_id: None,
            parent_user_key: None,
            content: content.clone(),
            user_key: meta.keys.pub_key.clone(),
            receiver_keys,
            create_time: chrono::Utc::now().timestamp_millis(),
        })
    }

    pub async fn from_model(
        c: &impl ConnectionTrait,
        model: &message::Model,
    ) -> Result<Self, MessageViewFromModelError> {
        let chat_model = chat::Entity::find()
            .filter(
                chat::Column::ChatKey
                    .eq(&model.chat_key)
                    .and(chat::Column::ChatVariant.eq(&model.chat_variant)),
            )
            .one(c)
            .await?
            .expect("chat not found");
        let chat_vo = serde_json::from_str(&chat_model.view_object)?;

        Ok(Self {
            chat: chat_vo,
            id: model.id.clone(),
            parent_id: model.parent_id.clone(),
            parent_user_key: model.parent_user_key.clone(),
            content: serde_json::from_str(&model.content)?,
            user_key: model.user_key.clone(),
            receiver_keys: if model.receiver_keys.is_empty() {
                vec![]
            } else {
                model
                    .receiver_keys
                    .split(',')
                    .map(|s| s.to_string())
                    .collect()
            },
            create_time: model.create_time.and_utc().timestamp_millis(),
        })
    }

    pub async fn create_check_message(&self, meta: &SignerMeta) -> Result<MessageVO, ViewError> {
        let message = MessageVO {
            chat: self.chat.clone(),
            id: uuid::Uuid::new_v4().to_string(),
            parent_id: Some(self.id.clone()),
            parent_user_key: Some(self.user_key.clone()),
            content: MessageContent::Check,
            user_key: meta.keys.pub_key.clone(),
            receiver_keys: vec![],
            create_time: chrono::Utc::now().timestamp_millis(),
        };

        Ok(message)
    }

    /// 检查当前用户是否已为指定消息创建了检查信息
    /// 返回 true 表示已存在检查信息,false 表示不存在
    pub async fn has_user_check(&self, meta: &SignerMeta) -> Result<bool, ViewError> {
        let chat_key = self.chat.chat_key(meta).await?;
        let chat_variant = self.chat.chat_variant();
        
        // 查询数据库中是否存在以当前消息为父消息且内容类型为 Check 的消息
        let check_message = message::Entity::find()
            .filter(
                message::Column::ParentId.eq(&self.id)
                    .and(message::Column::ParentUserKey.eq(&self.user_key))
                    .and(message::Column::UserKey.eq(&meta.keys.pub_key))
                    .and(message::Column::ChatKey.eq(&chat_key))
                    .and(message::Column::ChatVariant.eq(&chat_variant))
                    .and(message::Column::ContentType.eq(MessageContent::Check.ty())),
            )
            .one(&meta.conn)
            .await?;

        Ok(check_message.is_some())
    }

    /// 获取以当前消息为父消息的所有 check 类型消息
    /// 用于计算消息的已读进度
    pub async fn get_check_messages(&self, meta: &SignerMeta) -> Result<Vec<MessageVO>, ViewError> {
        let chat_key = self.chat.chat_key(meta).await?;
        let chat_variant = self.chat.chat_variant();
        
        // 查询数据库中所有以当前消息为父消息且内容类型为 Check 的消息
        let models = message::Entity::find()
            .filter(
                message::Column::ParentId.eq(&self.id)
                    .and(message::Column::ParentUserKey.eq(&self.user_key))
                    .and(message::Column::ChatKey.eq(&chat_key))
                    .and(message::Column::ChatVariant.eq(&chat_variant))
                    .and(message::Column::ContentType.eq(MessageContent::Check.ty())),
            )
            .order_by_asc(message::Column::CreateTime)
            .all(&meta.conn)
            .await?;

        let mut check_messages = Vec::new();
        for model in models {
            let message_vo = MessageVO::from_model(&meta.conn, &model).await?;
            check_messages.push(message_vo);
        }

        Ok(check_messages)
    }

    // List all messages (excluding check messages)
    pub async fn list(meta: &SignerMeta) -> Result<Vec<MessageVO>, ViewError> {
        let models = message::Entity::find()
            .filter(message::Column::ContentType.ne(MessageContent::Check.ty()))
            .order_by_asc(message::Column::CreateTime)
            .all(&meta.conn)
            .await?;

        let mut messages = Vec::new();
        for model in models {
            let message_vo = MessageVO::from_model(&meta.conn, &model).await?;
            messages.push(message_vo);
        }

        Ok(messages)
    }

    // List messages by chat (excluding check messages)
    pub async fn list_by_chat(
        meta: &SignerMeta,
        chat_key: &str,
        chat_variant: &str,
    ) -> Result<Vec<MessageVO>, ViewError> {
        let models = message::Entity::find()
            .filter(
                message::Column::ChatKey
                    .eq(chat_key)
                    .and(message::Column::ChatVariant.eq(chat_variant))
                    .and(message::Column::ContentType.ne(MessageContent::Check.ty())),
            )
            .order_by_asc(message::Column::CreateTime)
            .all(&meta.conn)
            .await?;

        let mut messages = Vec::new();
        for model in models {
            let message_vo = MessageVO::from_model(&meta.conn, &model).await?;
            messages.push(message_vo);
        }

        Ok(messages)
    }

    // Get a message by primary key (id, chat_key, chat_variant, user_key)
    pub async fn get(
        meta: &SignerMeta,
        id: &str,
        chat_key: &str,
        chat_variant: &str,
        user_key: &str,
    ) -> Result<Option<MessageVO>, ViewError> {
        let model = message::Entity::find_by_id((
            id.to_string(),
            chat_key.to_string(),
            chat_variant.to_string(),
            user_key.to_string(),
        ))
        .one(&meta.conn)
        .await?;

        match model {
            Some(model) => {
                let message_vo = MessageVO::from_model(&meta.conn, &model).await?;
                Ok(Some(message_vo))
            }
            None => Ok(None),
        }
    }

    // Put (create or update) a message
    pub async fn put(&self, meta: &SignerMeta) -> Result<(), ViewError> {
        // 获取写操作锁,确保 VO 写操作串行执行
        let _write_lock = meta.write_mutex.lock().await;
        
        let chat_key = self.chat.chat_key(meta).await?;
        let chat_variant = self.chat.chat_variant();

        // First, get the existing message if it exists
        let existing = message::Entity::find_by_id((
            self.id.to_string(),
            chat_key.to_string(),
            chat_variant.to_string(),
            self.user_key.to_string(),
        ))
        .one(&meta.conn)
        .await?;

        let delta = if let Some(existing_model) = existing {
            // Create a delta object for the update
            let existing_vo = MessageVO::from_model(&meta.conn, &existing_model).await?;
            MessageDO::new(&self, &existing_vo)?
        } else {
            // Create a new object
            MessageDO::from(self.clone())
        };

        // Apply the delta through the CRDT system
        let delta_box = CrdtDeltaBox::Message(CrdtDelta::Put(delta));
        delta_box.insert(meta).await?;

        // Reconcile to apply the changes to the database
        crate::crdt::reconcile(meta).await?;

        // 通过事件总线发送事件
        if let Err(e) = meta.event_bus.send(crate::CrdtMutate::MessagePut(self.clone())) {
            tracing::warn!("Failed to send MessagePut event: {}", e);
        }

        Ok(())
    }

    // Put (create or update) multiple messages
    pub async fn put_many(messages: Vec<MessageVO>, meta: &SignerMeta) -> Result<(), ViewError> {
        // 获取写操作锁,确保 VO 写操作串行执行
        let _write_lock = meta.write_mutex.lock().await;

        let mut delta_boxes = Vec::new();

        // Collect all delta boxes
        for message in &messages {
            let chat_key = message.chat.chat_key(meta).await?;
            let chat_variant = message.chat.chat_variant();

            // First, get the existing message if it exists
            let existing = message::Entity::find_by_id((
                message.id.to_string(),
                chat_key.to_string(),
                chat_variant.to_string(),
                message.user_key.to_string(),
            ))
            .one(&meta.conn)
            .await?;

            let delta = if let Some(existing_model) = existing {
                // Create a delta object for the update
                let existing_vo = MessageVO::from_model(&meta.conn, &existing_model).await?;
                MessageDO::new(&message, &existing_vo)?
            } else {
                // Create a new object
                MessageDO::from(message.clone())
            };

            // Apply the delta through the CRDT system
            let delta_box = CrdtDeltaBox::Message(CrdtDelta::Put(delta));
            delta_boxes.push(delta_box);
        }

        // Insert all delta boxes
        for delta_box in delta_boxes {
            delta_box.insert(meta).await?;
        }

        // Reconcile to apply the changes to the database (only once)
        crate::crdt::reconcile(meta).await?;

        // 通过事件总线发送事件 for each message
        for message in messages {
            if let Err(e) = meta.event_bus.send(crate::CrdtMutate::MessagePut(message)) {
                tracing::warn!("Failed to send MessagePut event: {}", e);
            }
        }

        Ok(())
    }


    // Delete a message by primary key
    pub async fn del(
        meta: &SignerMeta,
        id: &str,
        chat_key: &str,
        chat_variant: &str,
        user_key: &str,
    ) -> Result<(), ViewError> {
        // 获取写操作锁,确保 VO 写操作串行执行
        let _write_lock = meta.write_mutex.lock().await;
        
        // Create a delta object for the deletion
        let delta_box =
            CrdtDeltaBox::Message(CrdtDelta::Del(crate::crdt::crdt_message::MessageKey {
                id: id.to_string(),
                user_key: user_key.to_string(),
                chat_key: chat_key.to_string(),
                chat_variant: chat_variant.to_string(),
            }));
        delta_box.insert(meta).await?;

        // 通过事件总线发送事件
        if let Err(e) = meta.event_bus.send(crate::CrdtMutate::MessageDel(
            crate::crdt::crdt_message::MessageKey {
                id: id.to_string(),
                user_key: user_key.to_string(),
                chat_key: chat_key.to_string(),
                chat_variant: chat_variant.to_string(),
            },
        )) {
            tracing::warn!("Failed to send MessageDel event: {}", e);
        }

        Ok(())
    }

    // Delete multiple messages by primary keys
    pub async fn del_many(
        message_keys: Vec<crate::crdt::crdt_message::MessageKey>,
        meta: &SignerMeta,
    ) -> Result<(), ViewError> {
        // 获取写操作锁,确保 VO 写操作串行执行
        let _write_lock = meta.write_mutex.lock().await;

        let mut delta_boxes = Vec::new();

        // Collect all delta boxes
        for message_key in &message_keys {
            let delta_box =
                CrdtDeltaBox::Message(CrdtDelta::Del(message_key.clone()));
            delta_boxes.push(delta_box);
        }

        // Insert all delta boxes
        for delta_box in delta_boxes {
            delta_box.insert(meta).await?;
        }

        // Reconcile to apply the changes to the database (only once)
        crate::crdt::reconcile(meta).await?;

        // 通过事件总线发送事件 for each message
        for message_key in message_keys {
            if let Err(e) = meta.event_bus.send(crate::CrdtMutate::MessageDel(message_key)) {
                tracing::warn!("Failed to send MessageDel event: {}", e);
            }
        }

        Ok(())
    }

    /// 查询所有需要发送 Check 的消息
    ///
    /// 条件:
    /// - create_time <= base_create_time
    /// - receiver_keys 包含 user_key
    /// - content_type 不是 "check"
    /// - 尚未为该消息创建 Check (即不存在 parent_id = message.id 且 parent_user_key = message.user_key 且 user_key = current_user_key 的 check 消息)
    pub async fn find_messages_to_check(
        meta: &SignerMeta,
        base_create_time: i64,
        user_key: &str,
    ) -> Result<Vec<MessageVO>, ViewError> {
        use sea_orm::{EntityTrait, QueryFilter, QueryOrder, ColumnTrait, Condition};
        use crate::entity::message;
        
        // 首先查询满足基本条件的消息 (时间、接收者、非check)
        let base_condition = Condition::all()
            .add(message::Column::CreateTime.lte(chrono::DateTime::from_timestamp_millis(base_create_time).unwrap()))
            .add(message::Column::ReceiverKeys.contains(user_key))
            .add(message::Column::ContentType.ne(MessageContent::Check.ty()));
            
        let models = message::Entity::find()
            .filter(base_condition)
            .order_by_asc(message::Column::CreateTime)
            .all(&meta.conn)
            .await?;

        let mut messages_to_check = Vec::new();
        
        // 然后逐一检查这些消息是否已经创建了 Check
        for model in models {
            let message_vo = MessageVO::from_model(&meta.conn, &model).await?;
            
            // 检查是否已存在 Check
            let has_check = message_vo.has_user_check(meta).await?;
            
            if !has_check {
                messages_to_check.push(message_vo);
            }
        }

        Ok(messages_to_check)
    }
}

impl MessageContent {
    pub fn ty(&self) -> String {
        match self {
            MessageContent::Text(_) => "text".to_string(),
            MessageContent::Image(_) => "image".to_string(),
            MessageContent::Video(_) => "video".to_string(),
            MessageContent::Audio(_) => "audio".to_string(),
            MessageContent::File(_) => "file".to_string(),
            MessageContent::Check => "check".to_string(),
        }
    }
}