datafusion_ethers/
stream.rs

1use alloy::{
2    providers::{DynProvider, Provider},
3    rpc::types::eth::{BlockNumberOrTag, Filter, FilterBlockOption, Log},
4    transports::{RpcError, TransportErrorKind},
5};
6
7///////////////////////////////////////////////////////////////////////////////////////////////////
8
9#[derive(Debug, Clone)]
10pub struct StreamOptions {
11    /// Number of blocks to scan in one query. This should be small, as many
12    /// RPC providers impose limits on this parameter
13    pub block_stride: u64,
14
15    /// Many providers don't yet return `blockTimestamp` from `eth_getLogs` RPC endpoint
16    /// and in such cases `block_timestamp` column will be `null`.
17    /// If you enable this fallback the library will perform additional call to `eth_getBlock`
18    /// to populate the timestam, but this may result in significant performance penalty when
19    /// fetching many log records.
20    ///
21    /// See: https://github.com/ethereum/execution-apis/issues/295
22    pub use_block_timestamp_fallback: bool,
23}
24
25impl Default for StreamOptions {
26    fn default() -> Self {
27        Self {
28            block_stride: 10_000,
29            use_block_timestamp_fallback: false,
30        }
31    }
32}
33
34///////////////////////////////////////////////////////////////////////////////////////////////////
35
36#[derive(Debug, Clone, PartialEq, Eq)]
37pub struct StreamState {
38    pub last_seen_block: u64,
39}
40
41///////////////////////////////////////////////////////////////////////////////////////////////////
42
43pub struct StreamBatch {
44    pub logs: Vec<Log>,
45    pub state: StreamState,
46    pub block_range_all: (u64, u64),
47}
48
49///////////////////////////////////////////////////////////////////////////////////////////////////
50
51pub struct RawLogsStream;
52
53impl RawLogsStream {
54    // TODO: Re-org detection and handling
55    /// Streams batches of raw logs efficient and resumable pagination over `eth_getLogs` RPC endpoint,
56    pub fn paginate(
57        rpc_client: DynProvider<alloy::network::AnyNetwork>,
58        mut filter: Filter,
59        options: StreamOptions,
60        resume_from_state: Option<StreamState>,
61    ) -> impl futures::Stream<Item = Result<StreamBatch, RpcError<TransportErrorKind>>> {
62        async_stream::try_stream! {
63            // Determine query's full block range, resolving symbolic block aliases like
64            // 'latest' and 'finalized' to block numbers
65            let block_range_all = Self::filter_to_block_range(&rpc_client, &filter.block_option).await?;
66
67            // Subtract from the full block range the range that was already processed
68            let block_range_unprocessed = if let Some(last_seen_block) = resume_from_state.map(|s| s.last_seen_block) {
69                (
70                    u64::max(last_seen_block + 1, block_range_all.0),
71                    block_range_all.1,
72                )
73            } else {
74                block_range_all
75            };
76
77            tracing::info!(
78                block_range_query = ?filter.block_option,
79                ?block_range_all,
80                ?block_range_unprocessed,
81                "Computed block ranges",
82            );
83
84            let mut block_range_to_scan = block_range_unprocessed;
85
86            while block_range_to_scan.0 <= block_range_to_scan.1 {
87                let block_range_page = (
88                    block_range_to_scan.0,
89                    u64::min(
90                        block_range_to_scan.1,
91                        block_range_to_scan.0 + options.block_stride - 1,
92                    ),
93                );
94
95                // Setup per-query filter
96                filter.block_option = FilterBlockOption::Range {
97                    from_block: Some(block_range_page.0.into()),
98                    to_block: Some(block_range_page.1.into()),
99                };
100
101                tracing::debug!(
102                    ?block_range_page,
103                    "Querying block range",
104                );
105
106                // Query the node
107                let mut logs = rpc_client.get_logs(&filter).await?;
108
109                // Resolve timestamp if needed
110                if options.use_block_timestamp_fallback && logs.iter().any(|l| l.block_timestamp.is_none()) {
111                    Self::populate_block_timestamps_fallback(&rpc_client, &mut logs).await?;
112                }
113
114                yield StreamBatch {
115                    logs,
116                    state: StreamState { last_seen_block: block_range_page.1 },
117                    block_range_all,
118                };
119
120                // Update remaining range
121                block_range_to_scan.0 = block_range_page.1 + 1;
122            }
123        }
124    }
125
126    /// This function will determine the timestamps for the first and last block in the batch
127    /// and approximates the timestamps for blocks in between by interpolating.
128    /// NOTE: This might not be reproducible if stride changes between runs
129    async fn populate_block_timestamps_fallback(
130        rpc_client: &DynProvider<alloy::network::AnyNetwork>,
131        logs: &mut [Log],
132    ) -> Result<(), RpcError<TransportErrorKind>> {
133        let first_log = logs.first().unwrap();
134        let last_log = logs.last().unwrap();
135
136        let first_block = rpc_client
137            .get_block_by_hash(first_log.block_hash.unwrap())
138            .await?
139            .unwrap();
140
141        let last_block = if first_log.block_hash == last_log.block_hash {
142            first_block.clone()
143        } else {
144            rpc_client
145                .get_block_by_hash(last_log.block_hash.unwrap())
146                .await?
147                .unwrap()
148        };
149
150        let time_per_block = ((last_block.header.timestamp - first_block.header.timestamp) as f64)
151            / ((last_block.number() - first_block.number()) as f64);
152
153        tracing::debug!(
154            block_range = ?(first_block.number(), last_block.number()),
155            time_per_block,
156            "Populating block timestamps via fallback mechanism",
157        );
158
159        for log in logs.iter_mut() {
160            let block_offset = log.block_number.unwrap() - first_block.number();
161            let time_offset = time_per_block * (block_offset as f64);
162            let ts = first_block.header.timestamp + (time_offset.floor() as u64);
163            log.block_timestamp = Some(ts);
164        }
165
166        Ok(())
167    }
168
169    pub async fn filter_to_block_range(
170        rpc_client: &DynProvider<alloy::network::AnyNetwork>,
171        block_option: &FilterBlockOption,
172    ) -> Result<(u64, u64), RpcError<TransportErrorKind>> {
173        match block_option {
174            FilterBlockOption::Range {
175                from_block: Some(from),
176                to_block: Some(to),
177            } => {
178                let from = match from {
179                    BlockNumberOrTag::Earliest => 0,
180                    BlockNumberOrTag::Number(n) => *n,
181                    _ => Err(RpcError::local_usage_str(&format!(
182                        "Invalid range: {block_option:?}"
183                    )))?,
184                };
185                let to = match to {
186                    BlockNumberOrTag::Number(n) => *n,
187                    BlockNumberOrTag::Latest
188                    | BlockNumberOrTag::Safe
189                    | BlockNumberOrTag::Finalized => {
190                        let Some(to_block) = rpc_client.get_block((*to).into()).await? else {
191                            Err(RpcError::local_usage_str(&format!(
192                                "Unable to resolve block: {to:?}"
193                            )))?
194                        };
195                        to_block.header.number
196                    }
197                    _ => Err(RpcError::local_usage_str(&format!(
198                        "Invalid range: {block_option:?}"
199                    )))?,
200                };
201                Ok((from, to))
202            }
203            FilterBlockOption::Range { .. } => Err(RpcError::local_usage_str(&format!(
204                "Invalid range: {block_option:?}"
205            )))?,
206            FilterBlockOption::AtBlockHash(_) => {
207                unimplemented!("Querying a single block by hash is not yet supported")
208            }
209        }
210    }
211}
212
213///////////////////////////////////////////////////////////////////////////////////////////////////