use std::sync::Arc;
use tokio::sync::Mutex;
use tokio::time::Duration;
use tracing;
use crate::{SignerMeta, ViewError};
use super::MessageVO;
pub struct CheckHub {
inner: Arc<Mutex<CheckHubInner>>,
}
struct CheckHubInner {
buffered_messages: Vec<MessageVO>,
timer_handle: Option<tokio::task::JoinHandle<()>>,
timeout: Duration,
}
impl CheckHub {
pub fn new() -> Self {
Self {
inner: Arc::new(Mutex::new(CheckHubInner {
buffered_messages: Vec::new(),
timer_handle: None,
timeout: Duration::from_millis(50),
})),
}
}
pub async fn request_check(&self, meta: Arc<SignerMeta>, message: MessageVO) -> Result<Vec<MessageVO>, ViewError> {
let inner = self.inner.clone();
let check_messages = vec![];
{
let mut guard = inner.lock().await;
guard.buffered_messages.push(message);
if let Some(handle) = guard.timer_handle.take() {
handle.abort();
}
let inner_clone = inner.clone();
let meta_clone = meta.clone();
let timeout = guard.timeout;
let handle = tokio::spawn(async move {
tokio::time::sleep(timeout).await;
let _ = Self::process_buffer(inner_clone, meta_clone).await;
});
guard.timer_handle = Some(handle);
}
Ok(check_messages)
}
async fn process_buffer(inner: Arc<Mutex<CheckHubInner>>, meta: Arc<SignerMeta>) -> Result<Vec<MessageVO>, ViewError> {
let buffered_messages = {
let mut guard = inner.lock().await;
std::mem::take(&mut guard.buffered_messages)
};
if buffered_messages.is_empty() {
return Ok(vec![]);
}
tracing::debug!("开始处理 Check 消息缓冲区,共 {} 条消息", buffered_messages.len());
let base_message = buffered_messages
.iter()
.max_by(|a, b| {
let time_cmp = a.create_time.cmp(&b.create_time);
if time_cmp != std::cmp::Ordering::Equal {
return time_cmp;
}
a.id.cmp(&b.id).reverse() });
let base_message = match base_message {
Some(m) => m,
None => {
tracing::warn!("缓冲区为空,无法选出基准消息");
return Ok(vec![]);
}
};
tracing::debug!("选出的基准消息: id={}, create_time={}", base_message.id, base_message.create_time);
let current_user_key = &meta.keys.pub_key;
let messages_to_check = MessageVO::find_messages_to_check(
&*meta,
base_message.create_time,
current_user_key,
).await?;
if messages_to_check.is_empty() {
tracing::debug!("没有需要发送 Check 的消息");
return Ok(vec![]);
}
tracing::debug!("找到 {} 条需要发送 Check 的消息", messages_to_check.len());
let mut check_messages = Vec::new();
for msg in &messages_to_check {
let check_msg = msg.create_check_message(&meta).await?;
check_messages.push(check_msg);
}
MessageVO::put_many(check_messages.clone(), &meta).await?;
tracing::info!("成功批量处理 {} 条 Check 消息", check_messages.len());
Ok(check_messages)
}
}