signer-crdt 0.4.1

Signer CRDT (Conflict-free Replicated Data Type) package.
Documentation
//! Check 消息集线器
//!
//! 用于收集在短时间内发起的 Check 请求,并进行批量处理,
//! 以减少数据库操作和网络请求的次数。

use std::sync::Arc;
use tokio::sync::Mutex;
use tokio::time::Duration;
use tracing;
use crate::{SignerMeta, ViewError};
use super::MessageVO;

/// Check 消息集线器
pub struct CheckHub {
    /// 内部状态,使用 Arc<Mutex<>> 包装以支持跨线程共享和修改
    inner: Arc<Mutex<CheckHubInner>>,
}

/// CheckHub 的内部状态
struct CheckHubInner {
    /// 缓冲的消息列表
    buffered_messages: Vec<MessageVO>,
    /// 定时器的句柄,用于取消之前的定时任务
    timer_handle: Option<tokio::task::JoinHandle<()>>,
    /// 定时器的超时时间 (50ms)
    timeout: Duration,
}

impl CheckHub {
    /// 创建一个新的 CheckHub 实例
    pub fn new() -> Self {
        Self {
            inner: Arc::new(Mutex::new(CheckHubInner {
                buffered_messages: Vec::new(),
                timer_handle: None,
                timeout: Duration::from_millis(50),
            })),
        }
    }

    /// 请求为一条消息发送 Check
    ///
    /// 这个方法会将消息添加到缓冲区,并启动一个 50ms 的定时器。
    /// 当定时器到期时,会处理缓冲区中的所有消息。
    /// 
    /// # 参数
    /// * `meta`: SignerMeta 实例
    /// * `message`: 需要发送 Check 的消息
    pub async fn request_check(&self, meta: Arc<SignerMeta>, message: MessageVO) -> Result<Vec<MessageVO>, ViewError> {
        let inner = self.inner.clone();
        let check_messages = vec![];
        
        // 获取锁并修改内部状态
        {
            let mut guard = inner.lock().await;
            guard.buffered_messages.push(message);
            
            // 如果已经有一个定时器在运行,则取消它(可选,取决于是否需要防抖)
            // 这里我们选择每次都重置定时器,实现防抖效果
            if let Some(handle) = guard.timer_handle.take() {
                handle.abort();
            }
            
            // 启动新的定时器
            let inner_clone = inner.clone();
            let meta_clone = meta.clone();
            let timeout = guard.timeout;
            let handle = tokio::spawn(async move {
                tokio::time::sleep(timeout).await;
                // 定时器到期,处理缓冲区中的消息
                // 注意:这里的结果被忽略了,因为在异步任务中无法返回结果
                // 如果需要知道处理结果,可以考虑使用 channel 或其他方式
                let _ = Self::process_buffer(inner_clone, meta_clone).await;
            });
            
            guard.timer_handle = Some(handle);
        }
        
        // 立即返回已创建的 Check 消息
        Ok(check_messages)
    }

    /// 处理缓冲区中的消息
    ///
    /// 这个方法会在定时器到期时被调用。
    /// 注意:此方法不直接处理 Envelope 的创建和推送,这些操作应在调用方完成。
    async fn process_buffer(inner: Arc<Mutex<CheckHubInner>>, meta: Arc<SignerMeta>) -> Result<Vec<MessageVO>, ViewError> {
        let buffered_messages = {
            let mut guard = inner.lock().await;
            // 清空缓冲区并获取其中的消息
            std::mem::take(&mut guard.buffered_messages)
        };
        
        if buffered_messages.is_empty() {
            return Ok(vec![]);
        }
        
        tracing::debug!("开始处理 Check 消息缓冲区,共 {} 条消息", buffered_messages.len());
        
        // 1. 选出 create_time 最大且 id 最小的消息作为基准
        let base_message = buffered_messages
            .iter()
            .max_by(|a, b| {
                // 首先比较 create_time
                let time_cmp = a.create_time.cmp(&b.create_time);
                if time_cmp != std::cmp::Ordering::Equal {
                    return time_cmp;
                }
                // 如果 create_time 相同,则比较 id (字典序)
                a.id.cmp(&b.id).reverse() // reverse because we want the smaller id
            });
        
        let base_message = match base_message {
            Some(m) => m,
            None => {
                tracing::warn!("缓冲区为空,无法选出基准消息");
                return Ok(vec![]);
            }
        };
        
        tracing::debug!("选出的基准消息: id={}, create_time={}", base_message.id, base_message.create_time);
        
        // 2. 查询数据库找出所有需要发送 Check 的消息
        // 条件:
        // - create_time <= base_message.create_time
        // - receiver_keys 包含当前用户公钥 (meta.keys.pub_key)
        // - content_type 不是 "check"
        // - 尚未为该消息创建 Check (调用 has_user_check)
        let current_user_key = &meta.keys.pub_key;
        let messages_to_check = MessageVO::find_messages_to_check(
            &*meta,
            base_message.create_time,
            current_user_key,
        ).await?;
        
        if messages_to_check.is_empty() {
            tracing::debug!("没有需要发送 Check 的消息");
            return Ok(vec![]);
        }
        
        tracing::debug!("找到 {} 条需要发送 Check 的消息", messages_to_check.len());
        
        // 3. 为这些消息批量创建 Check 消息
        let mut check_messages = Vec::new();
        for msg in &messages_to_check {
            let check_msg = msg.create_check_message(&meta).await?;
            check_messages.push(check_msg);
        }
        
        // 4. 批量保存 Check 消息到数据库
        MessageVO::put_many(check_messages.clone(), &meta).await?;
        
        tracing::info!("成功批量处理 {} 条 Check 消息", check_messages.len());
        
        Ok(check_messages)
    }
}