use crate::rpc::{
LogRange, RpcError,
backfill::backfill,
config::{HttpRpcSettings, ProviderSettings},
heads::{last_block, stream_heads_with_logs},
};
use alloy::rpc::types::Filter;
use async_stream::stream;
use futures_util::{Stream, StreamExt, pin_mut};
use tracing::debug;
pub struct LogStreamConfig {
pub wss_endpoints: Vec<String>,
pub http_options: Vec<HttpRpcSettings>,
pub events: Vec<String>,
pub from_block: u64,
pub to_block: Option<u64>,
}
pub async fn stream_logs(
providers: &ProviderSettings,
filter: Option<&Filter>,
) -> impl Stream<Item = Result<LogRange, RpcError>> {
stream! {
let heads = stream_heads_with_logs(providers, filter).await;
pin_mut!(heads);
while let Some(result) = heads.next().await {
yield result;
}
}
}
pub async fn backfill_then_watch_logs(
providers: &ProviderSettings,
filter: Option<&Filter>,
) -> impl Stream<Item = Result<LogRange, RpcError>> {
let init_from_block = filter.and_then(|f| f.get_from_block()).unwrap_or(0);
let init_to_block = filter.and_then(|f| f.get_to_block());
stream! {
let mut from_block = init_from_block;
let mut to_block = match last_block(providers).await {
Ok(b) => init_to_block.unwrap_or(b - 2),
Err(e) => {
yield Err(e);
return;
}
};
while to_block > from_block {
if to_block - from_block < 30 && init_to_block.is_none() {
break; }
debug!("Starting backfill for block range {} to {}", from_block, to_block);
let filter = filter.cloned().unwrap_or_else(Filter::new).from_block(from_block).to_block(to_block);
let stream = backfill(providers, &filter).await;
pin_mut!(stream);
while let Some(result) = stream.next().await {
match result {
Ok(range) => {
from_block = range.1 + 1;
yield Ok(range);
}
Err(e) => {
yield Err(e);
return;
}
}
}
if init_to_block.is_none() {
match last_block(providers).await {
Ok(b) => to_block = b - 2,
Err(e) => {
yield Err(e);
return;
}
}
}
}
if filter.and_then(|f| f.get_to_block()).is_none() {
debug!("No more backfilling needed, streaming live logs from block {}", from_block);
let filter = filter.cloned().unwrap_or_else(Filter::new).from_block(from_block);
let heads = stream_heads_with_logs(providers, Some(&filter)).await;
pin_mut!(heads);
while let Some(result) = heads.next().await {
yield result;
}
}
}
}