datafusion_ethers/
stream.rs1use alloy::{
2 providers::{DynProvider, Provider},
3 rpc::types::eth::{BlockNumberOrTag, Filter, FilterBlockOption, Log},
4 transports::{RpcError, TransportErrorKind},
5};
6
7#[derive(Debug, Clone)]
10pub struct StreamOptions {
11 pub block_stride: u64,
14}
15
16impl Default for StreamOptions {
17 fn default() -> Self {
18 Self {
19 block_stride: 10_000,
20 }
21 }
22}
23
24#[derive(Debug, Clone, PartialEq, Eq)]
27pub struct StreamState {
28 pub last_seen_block: u64,
29}
30
31pub struct StreamBatch {
34 pub logs: Vec<Log>,
35 pub state: StreamState,
36 pub block_range_all: (u64, u64),
37}
38
39pub struct RawLogsStream;
42
43impl RawLogsStream {
44 pub fn paginate(
47 rpc_client: DynProvider,
48 mut filter: Filter,
49 options: StreamOptions,
50 resume_from_state: Option<StreamState>,
51 ) -> impl futures::Stream<Item = Result<StreamBatch, RpcError<TransportErrorKind>>> {
52 async_stream::try_stream! {
53 let block_range_all = Self::filter_to_block_range(&rpc_client, &filter.block_option).await?;
56
57 let block_range_unprocessed = if let Some(last_seen_block) = resume_from_state.map(|s| s.last_seen_block) {
59 (
60 u64::max(last_seen_block + 1, block_range_all.0),
61 block_range_all.1,
62 )
63 } else {
64 block_range_all
65 };
66
67 tracing::info!(
68 block_range_query = ?filter.block_option,
69 ?block_range_all,
70 ?block_range_unprocessed,
71 "Computed block ranges",
72 );
73
74 let mut block_range_to_scan = block_range_unprocessed;
75
76 while block_range_to_scan.0 <= block_range_to_scan.1 {
77 let block_range_page = (
78 block_range_to_scan.0,
79 u64::min(
80 block_range_to_scan.1,
81 block_range_to_scan.0 + options.block_stride - 1,
82 ),
83 );
84
85 filter.block_option = FilterBlockOption::Range {
87 from_block: Some(block_range_page.0.into()),
88 to_block: Some(block_range_page.1.into()),
89 };
90
91 tracing::debug!(
92 ?block_range_page,
93 "Querying block range",
94 );
95
96 let logs = rpc_client.get_logs(&filter).await?;
98
99 yield StreamBatch {
100 logs,
101 state: StreamState { last_seen_block: block_range_page.1 },
102 block_range_all,
103 };
104
105 block_range_to_scan.0 = block_range_page.1 + 1;
107 }
108 }
109 }
110
111 pub async fn filter_to_block_range(
112 rpc_client: &DynProvider,
113 block_option: &FilterBlockOption,
114 ) -> Result<(u64, u64), RpcError<TransportErrorKind>> {
115 match block_option {
116 FilterBlockOption::Range {
117 from_block: Some(from),
118 to_block: Some(to),
119 } => {
120 let from = match from {
121 BlockNumberOrTag::Earliest => 0,
122 BlockNumberOrTag::Number(n) => *n,
123 _ => Err(RpcError::local_usage_str(&format!(
124 "Invalid range: {block_option:?}"
125 )))?,
126 };
127 let to = match to {
128 BlockNumberOrTag::Number(n) => *n,
129 BlockNumberOrTag::Latest
130 | BlockNumberOrTag::Safe
131 | BlockNumberOrTag::Finalized => {
132 let Some(to_block) = rpc_client.get_block((*to).into()).await? else {
133 Err(RpcError::local_usage_str(&format!(
134 "Unable to resolve block: {to:?}"
135 )))?
136 };
137 to_block.header.number
138 }
139 _ => Err(RpcError::local_usage_str(&format!(
140 "Invalid range: {block_option:?}"
141 )))?,
142 };
143 Ok((from, to))
144 }
145 FilterBlockOption::Range { .. } => Err(RpcError::local_usage_str(&format!(
146 "Invalid range: {block_option:?}"
147 )))?,
148 FilterBlockOption::AtBlockHash(_) => {
149 unimplemented!("Querying a single block by hash is not yet supported")
150 }
151 }
152 }
153}
154
155