datafusion_ethers/
stream.rs

1use alloy::{
2    providers::{DynProvider, Provider},
3    rpc::types::eth::{BlockNumberOrTag, Filter, FilterBlockOption, Log},
4    transports::{RpcError, TransportErrorKind},
5};
6
7///////////////////////////////////////////////////////////////////////////////////////////////////
8
9#[derive(Debug, Clone)]
10pub struct StreamOptions {
11    /// Number of blocks to scan in one query. This should be small, as many
12    /// RPC providers impose limits on this parameter
13    pub block_stride: u64,
14}
15
16impl Default for StreamOptions {
17    fn default() -> Self {
18        Self {
19            block_stride: 10_000,
20        }
21    }
22}
23
24///////////////////////////////////////////////////////////////////////////////////////////////////
25
26#[derive(Debug, Clone, PartialEq, Eq)]
27pub struct StreamState {
28    pub last_seen_block: u64,
29}
30
31///////////////////////////////////////////////////////////////////////////////////////////////////
32
33pub struct StreamBatch {
34    pub logs: Vec<Log>,
35    pub state: StreamState,
36    pub block_range_all: (u64, u64),
37}
38
39///////////////////////////////////////////////////////////////////////////////////////////////////
40
41pub struct RawLogsStream;
42
43impl RawLogsStream {
44    // TODO: Re-org detection and handling
45    /// Streams batches of raw logs efficient and resumable pagination over `eth_getLogs` RPC endpoint,
46    pub fn paginate(
47        rpc_client: DynProvider,
48        mut filter: Filter,
49        options: StreamOptions,
50        resume_from_state: Option<StreamState>,
51    ) -> impl futures::Stream<Item = Result<StreamBatch, RpcError<TransportErrorKind>>> {
52        async_stream::try_stream! {
53            // Determine query's full block range, resolving symbolic block aliases like
54            // 'latest' and 'finalized' to block numbers
55            let block_range_all = Self::filter_to_block_range(&rpc_client, &filter.block_option).await?;
56
57            // Subtract from the full block range the range that was already processed
58            let block_range_unprocessed = if let Some(last_seen_block) = resume_from_state.map(|s| s.last_seen_block) {
59                (
60                    u64::max(last_seen_block + 1, block_range_all.0),
61                    block_range_all.1,
62                )
63            } else {
64                block_range_all
65            };
66
67            tracing::info!(
68                block_range_query = ?filter.block_option,
69                ?block_range_all,
70                ?block_range_unprocessed,
71                "Computed block ranges",
72            );
73
74            let mut block_range_to_scan = block_range_unprocessed;
75
76            while block_range_to_scan.0 <= block_range_to_scan.1 {
77                let block_range_page = (
78                    block_range_to_scan.0,
79                    u64::min(
80                        block_range_to_scan.1,
81                        block_range_to_scan.0 + options.block_stride - 1,
82                    ),
83                );
84
85                // Setup per-query filter
86                filter.block_option = FilterBlockOption::Range {
87                    from_block: Some(block_range_page.0.into()),
88                    to_block: Some(block_range_page.1.into()),
89                };
90
91                tracing::debug!(
92                    ?block_range_page,
93                    "Querying block range",
94                );
95
96                // Query the node
97                let logs = rpc_client.get_logs(&filter).await?;
98
99                yield StreamBatch {
100                    logs,
101                    state: StreamState { last_seen_block: block_range_page.1 },
102                    block_range_all,
103                };
104
105                // Update remaining range
106                block_range_to_scan.0 = block_range_page.1 + 1;
107            }
108        }
109    }
110
111    pub async fn filter_to_block_range(
112        rpc_client: &DynProvider,
113        block_option: &FilterBlockOption,
114    ) -> Result<(u64, u64), RpcError<TransportErrorKind>> {
115        match block_option {
116            FilterBlockOption::Range {
117                from_block: Some(from),
118                to_block: Some(to),
119            } => {
120                let from = match from {
121                    BlockNumberOrTag::Earliest => 0,
122                    BlockNumberOrTag::Number(n) => *n,
123                    _ => Err(RpcError::local_usage_str(&format!(
124                        "Invalid range: {block_option:?}"
125                    )))?,
126                };
127                let to = match to {
128                    BlockNumberOrTag::Number(n) => *n,
129                    BlockNumberOrTag::Latest
130                    | BlockNumberOrTag::Safe
131                    | BlockNumberOrTag::Finalized => {
132                        let Some(to_block) = rpc_client.get_block((*to).into()).await? else {
133                            Err(RpcError::local_usage_str(&format!(
134                                "Unable to resolve block: {to:?}"
135                            )))?
136                        };
137                        to_block.header.number
138                    }
139                    _ => Err(RpcError::local_usage_str(&format!(
140                        "Invalid range: {block_option:?}"
141                    )))?,
142                };
143                Ok((from, to))
144            }
145            FilterBlockOption::Range { .. } => Err(RpcError::local_usage_str(&format!(
146                "Invalid range: {block_option:?}"
147            )))?,
148            FilterBlockOption::AtBlockHash(_) => {
149                unimplemented!("Querying a single block by hash is not yet supported")
150            }
151        }
152    }
153}
154
155///////////////////////////////////////////////////////////////////////////////////////////////////