use crate::{ColdResult, ColdStorageError, ColdStorageRead, Filter, HeaderSpecifier, RpcLog};
use alloy::{primitives::BlockNumber, rpc::types::FilterBlockOption};
use tokio::sync::mpsc;
#[derive(Debug)]
pub struct StreamParams {
pub from: BlockNumber,
pub to: BlockNumber,
pub max_logs: usize,
pub sender: mpsc::Sender<ColdResult<RpcLog>>,
pub deadline: tokio::time::Instant,
}
pub async fn produce_log_stream_default<B: ColdStorageRead>(
backend: &B,
filter: &Filter,
params: StreamParams,
) {
let StreamParams { from, to, max_logs, sender, deadline } = params;
let anchor_hash = match backend.get_header(HeaderSpecifier::Number(to)).await {
Ok(Some(h)) => h.hash(),
Ok(None) => return, Err(e) => {
let _ = sender.send(Err(e)).await;
return;
}
};
let mut total = 0usize;
let mut block_filter = filter.clone();
for block_num in from..=to {
if tokio::time::Instant::now() > deadline {
let _ = sender.send(Err(ColdStorageError::StreamDeadlineExceeded)).await;
return;
}
match backend.get_header(HeaderSpecifier::Number(to)).await {
Ok(Some(h)) if h.hash() == anchor_hash => {}
Ok(_) => {
let _ = sender.send(Err(ColdStorageError::ReorgDetected)).await;
return;
}
Err(e) => {
let _ = sender.send(Err(e)).await;
return;
}
}
let remaining = max_logs.saturating_sub(total);
block_filter.block_option = FilterBlockOption::Range {
from_block: Some(block_num.into()),
to_block: Some(block_num.into()),
};
let block_logs = match backend.get_logs(&block_filter, remaining).await {
Ok(logs) => logs,
Err(ColdStorageError::TooManyLogs { .. }) => {
let _ = sender.send(Err(ColdStorageError::TooManyLogs { limit: max_logs })).await;
return;
}
Err(e) => {
let _ = sender.send(Err(e)).await;
return;
}
};
total += block_logs.len();
for log in block_logs {
match tokio::time::timeout_at(deadline, sender.send(Ok(log))).await {
Ok(Ok(())) => {}
Ok(Err(_)) => return, Err(_) => {
let _ = sender.send(Err(ColdStorageError::StreamDeadlineExceeded)).await;
return;
}
}
}
if total >= max_logs {
return;
}
}
}