datafusion_ethers/
stream.rs1use alloy::{
2 providers::{DynProvider, Provider},
3 rpc::types::eth::{BlockNumberOrTag, Filter, FilterBlockOption, Log},
4 transports::{RpcError, TransportErrorKind},
5};
6
7#[derive(Debug, Clone)]
10pub struct StreamOptions {
11 pub block_stride: u64,
14
15 pub use_block_timestamp_fallback: bool,
23}
24
25impl Default for StreamOptions {
26 fn default() -> Self {
27 Self {
28 block_stride: 10_000,
29 use_block_timestamp_fallback: false,
30 }
31 }
32}
33
34#[derive(Debug, Clone, PartialEq, Eq)]
37pub struct StreamState {
38 pub last_seen_block: u64,
39}
40
41pub struct StreamBatch {
44 pub logs: Vec<Log>,
45 pub state: StreamState,
46 pub block_range_all: (u64, u64),
47}
48
49pub struct RawLogsStream;
52
53impl RawLogsStream {
54 pub fn paginate(
57 rpc_client: DynProvider<alloy::network::AnyNetwork>,
58 mut filter: Filter,
59 options: StreamOptions,
60 resume_from_state: Option<StreamState>,
61 ) -> impl futures::Stream<Item = Result<StreamBatch, RpcError<TransportErrorKind>>> {
62 async_stream::try_stream! {
63 let block_range_all = Self::filter_to_block_range(&rpc_client, &filter.block_option).await?;
66
67 let block_range_unprocessed = if let Some(last_seen_block) = resume_from_state.map(|s| s.last_seen_block) {
69 (
70 u64::max(last_seen_block + 1, block_range_all.0),
71 block_range_all.1,
72 )
73 } else {
74 block_range_all
75 };
76
77 tracing::info!(
78 block_range_query = ?filter.block_option,
79 ?block_range_all,
80 ?block_range_unprocessed,
81 "Computed block ranges",
82 );
83
84 let mut block_range_to_scan = block_range_unprocessed;
85
86 while block_range_to_scan.0 <= block_range_to_scan.1 {
87 let block_range_page = (
88 block_range_to_scan.0,
89 u64::min(
90 block_range_to_scan.1,
91 block_range_to_scan.0 + options.block_stride - 1,
92 ),
93 );
94
95 filter.block_option = FilterBlockOption::Range {
97 from_block: Some(block_range_page.0.into()),
98 to_block: Some(block_range_page.1.into()),
99 };
100
101 tracing::debug!(
102 ?block_range_page,
103 "Querying block range",
104 );
105
106 let mut logs = rpc_client.get_logs(&filter).await?;
108
109 if options.use_block_timestamp_fallback && logs.iter().any(|l| l.block_timestamp.is_none()) {
111 Self::populate_block_timestamps_fallback(&rpc_client, &mut logs).await?;
112 }
113
114 yield StreamBatch {
115 logs,
116 state: StreamState { last_seen_block: block_range_page.1 },
117 block_range_all,
118 };
119
120 block_range_to_scan.0 = block_range_page.1 + 1;
122 }
123 }
124 }
125
126 async fn populate_block_timestamps_fallback(
130 rpc_client: &DynProvider<alloy::network::AnyNetwork>,
131 logs: &mut [Log],
132 ) -> Result<(), RpcError<TransportErrorKind>> {
133 let first_log = logs.first().unwrap();
134 let last_log = logs.last().unwrap();
135
136 let first_block = rpc_client
137 .get_block_by_hash(first_log.block_hash.unwrap())
138 .await?
139 .unwrap();
140
141 let last_block = if first_log.block_hash == last_log.block_hash {
142 first_block.clone()
143 } else {
144 rpc_client
145 .get_block_by_hash(last_log.block_hash.unwrap())
146 .await?
147 .unwrap()
148 };
149
150 let time_per_block = ((last_block.header.timestamp - first_block.header.timestamp) as f64)
151 / ((last_block.number() - first_block.number()) as f64);
152
153 tracing::debug!(
154 block_range = ?(first_block.number(), last_block.number()),
155 time_per_block,
156 "Populating block timestamps via fallback mechanism",
157 );
158
159 for log in logs.iter_mut() {
160 let block_offset = log.block_number.unwrap() - first_block.number();
161 let time_offset = time_per_block * (block_offset as f64);
162 let ts = first_block.header.timestamp + (time_offset.floor() as u64);
163 log.block_timestamp = Some(ts);
164 }
165
166 Ok(())
167 }
168
169 pub async fn filter_to_block_range(
170 rpc_client: &DynProvider<alloy::network::AnyNetwork>,
171 block_option: &FilterBlockOption,
172 ) -> Result<(u64, u64), RpcError<TransportErrorKind>> {
173 match block_option {
174 FilterBlockOption::Range {
175 from_block: Some(from),
176 to_block: Some(to),
177 } => {
178 let from = match from {
179 BlockNumberOrTag::Earliest => 0,
180 BlockNumberOrTag::Number(n) => *n,
181 _ => Err(RpcError::local_usage_str(&format!(
182 "Invalid range: {block_option:?}"
183 )))?,
184 };
185 let to = match to {
186 BlockNumberOrTag::Number(n) => *n,
187 BlockNumberOrTag::Latest
188 | BlockNumberOrTag::Safe
189 | BlockNumberOrTag::Finalized => {
190 let Some(to_block) = rpc_client.get_block((*to).into()).await? else {
191 Err(RpcError::local_usage_str(&format!(
192 "Unable to resolve block: {to:?}"
193 )))?
194 };
195 to_block.header.number
196 }
197 _ => Err(RpcError::local_usage_str(&format!(
198 "Invalid range: {block_option:?}"
199 )))?,
200 };
201 Ok((from, to))
202 }
203 FilterBlockOption::Range { .. } => Err(RpcError::local_usage_str(&format!(
204 "Invalid range: {block_option:?}"
205 )))?,
206 FilterBlockOption::AtBlockHash(_) => {
207 unimplemented!("Querying a single block by hash is not yet supported")
208 }
209 }
210 }
211}
212
213