use alloy::primitives::TxHash;
use alloy::rpc::types::Log;
use anyhow::{anyhow, Result};
use log::{debug, info};
use std::collections::{HashMap, VecDeque};
use std::sync::Arc;
use tokio::sync::{mpsc, Mutex};
use tokio::time::Duration;
#[derive(Debug)]
pub struct EventQueue {
sender: Arc<EventSender>,
receiver: Arc<Mutex<mpsc::Receiver<Log>>>,
chain_id: u64,
}
#[derive(Debug)]
pub struct EventSender {
inner: mpsc::Sender<Log>,
recent_events: Arc<Mutex<HashMap<(TxHash, u64), Log>>>,
event_order: Arc<Mutex<VecDeque<(TxHash, u64)>>>,
max_events: usize,
chain_id: u64,
}
impl EventQueue {
pub fn new(buffer_size: usize, max_events: usize, chain_id: u64) -> Self {
let (sender, receiver) = mpsc::channel(buffer_size);
let event_sender = Arc::new(EventSender {
inner: sender,
recent_events: Arc::new(Mutex::new(HashMap::with_capacity(max_events))),
event_order: Arc::new(Mutex::new(VecDeque::with_capacity(max_events))),
max_events,
chain_id,
});
Self {
sender: event_sender,
receiver: Arc::new(Mutex::new(receiver)),
chain_id,
}
}
pub fn get_sender(&self) -> Arc<EventSender> {
self.sender.clone()
}
pub async fn next_event(&self) -> Option<Log> {
self.receiver.lock().await.recv().await
}
pub async fn get_events_batch(&self, max_events: usize) -> Vec<Log> {
let mut receiver = self.receiver.lock().await;
let mut events = Vec::with_capacity(max_events);
if let Some(event) = receiver.recv().await {
events.push(event);
for _ in 1..max_events {
if let Ok(event) = receiver.try_recv() {
events.push(event);
} else {
break;
}
}
}
debug!(
"[Chain {}] Retrieved {} events in batch",
self.chain_id,
events.len()
);
events
}
pub async fn get_all_available_events(&self) -> Vec<Log> {
let mut receiver = self.receiver.lock().await;
let mut events = Vec::new();
while let Ok(event) = receiver.try_recv() {
info!(
"[Chain {}] Received event: tx={}, log_index={}",
self.chain_id,
event.transaction_hash.unwrap_or_default(),
event.log_index.unwrap_or_default()
);
events.push(event);
}
events
}
pub async fn get_events_with_batching(&self, batch_timeout: Duration) -> Vec<Log> {
let mut receiver = self.receiver.lock().await;
let mut events = Vec::new();
if let Some(event) = receiver.recv().await {
events.push(event);
tokio::time::sleep(batch_timeout).await;
while let Ok(event) = receiver.try_recv() {
events.push(event);
}
}
debug!(
"[Chain {}] Retrieved {} events with {}ms batch timeout",
self.chain_id,
events.len(),
batch_timeout.as_millis()
);
events
}
pub async fn has_event(&self, transaction_hash: TxHash, log_index: u64) -> bool {
self.sender
.recent_events
.lock()
.await
.contains_key(&(transaction_hash, log_index))
}
}
impl EventSender {
pub async fn send(&self, event: Log) -> Result<()> {
let transaction_hash = event
.transaction_hash
.ok_or_else(|| anyhow!("Log missing transaction hash"))?;
let log_index = event
.log_index
.ok_or_else(|| anyhow!("Log missing log index"))?;
{
let mut recent_events = self.recent_events.lock().await;
let mut event_order = self.event_order.lock().await;
let key = (transaction_hash, log_index);
if recent_events.contains_key(&key) {
debug!(
"[Chain {}] Skipped duplicate event: tx={}, log_index={}",
self.chain_id, transaction_hash, log_index
);
return Ok(());
}
if recent_events.len() >= self.max_events {
if let Some(old_key) = event_order.pop_front() {
recent_events.remove(&old_key);
debug!(
"[Chain {}] Pruned oldest event: tx={}, log_index={}",
self.chain_id, old_key.0, old_key.1
);
}
}
recent_events.insert(key, event.clone());
event_order.push_back(key);
info!(
"[Chain {}] Added event to recent_events: tx={}, log_index={}",
self.chain_id, transaction_hash, log_index
);
}
self.inner
.send(event)
.await
.map_err(|e| anyhow!("Failed to send event: {}", e))?;
Ok(())
}
}
pub fn create_event_queue(
buffer_size: usize,
max_events: usize,
chain_id: u64,
) -> (EventQueue, Arc<EventSender>) {
let queue = EventQueue::new(buffer_size, max_events, chain_id);
let sender = queue.get_sender();
(queue, sender)
}