Skip to main content

event_scanner/event_scanner/modes/sync/
from_block.rs

1//! Syncs from a starting block, then transitions to live streaming.
2//!
3//! Streams events from a specified starting block to the present, then automatically
4//! continues with live streaming. See [`EventScannerBuilder::sync().from_block()`][from_block]
5//! for usage details.
6//!
7//! [from_block]: crate::EventScannerBuilder#method.from_block-2
8
9use alloy::{eips::BlockId, network::Network};
10
11use crate::{
12    ScannerError,
13    event_scanner::{
14        EventScanner, StartProof,
15        block_range_handler::{BlockRangeHandler, StreamHandler},
16        builder::{EventScannerBuilder, SyncFromBlock},
17    },
18};
19use robust_provider::IntoRobustProvider;
20
21impl EventScannerBuilder<SyncFromBlock> {
22    /// Sets the number of confirmations required before a block is considered stable enough to
23    /// scan in the live phase.
24    ///
25    /// This affects the post-sync live streaming phase; higher values reduce reorg risk at the
26    /// cost of increased event delivery latency.
27    #[must_use]
28    pub fn block_confirmations(mut self, confirmations: u64) -> Self {
29        self.config.block_confirmations = confirmations;
30        self
31    }
32
33    /// Sets the maximum number of block-range fetches to process concurrently when
34    /// synchronizing from a specific block.
35    ///
36    /// Higher values can improve throughput by issuing multiple RPC requests
37    /// concurrently, at the cost of additional load on the provider.
38    ///
39    /// **Note**: This limit applies **per listener**. With N listeners and a limit of M,
40    /// up to N × M concurrent RPC requests may be in-flight simultaneously.
41    ///
42    /// Must be greater than 0.
43    ///
44    /// Defaults to [`DEFAULT_MAX_CONCURRENT_FETCHES`][default].
45    ///
46    /// [default]: crate::event_scanner::builder::DEFAULT_MAX_CONCURRENT_FETCHES
47    #[must_use]
48    pub fn max_concurrent_fetches(mut self, max_concurrent_fetches: usize) -> Self {
49        self.config.max_concurrent_fetches = max_concurrent_fetches;
50        self
51    }
52
53    /// Connects to an existing provider.
54    ///
55    /// # Errors
56    ///
57    /// Returns an error if:
58    /// * The provider connection fails
59    /// * The max block range is zero
60    pub async fn connect<N: Network>(
61        self,
62        provider: impl IntoRobustProvider<N>,
63    ) -> Result<EventScanner<SyncFromBlock, N>, ScannerError> {
64        if self.config.max_concurrent_fetches == 0 {
65            return Err(ScannerError::InvalidMaxConcurrentFetches);
66        }
67
68        let scanner = self.build(provider).await?;
69
70        let provider = scanner.block_range_scanner.provider();
71
72        if let BlockId::Hash(from_hash) = scanner.config.from_block {
73            provider.get_block_by_hash(from_hash.into()).await?;
74        }
75
76        Ok(scanner)
77    }
78}
79
80impl<N: Network> EventScanner<SyncFromBlock, N> {
81    /// Starts the scanner in [`SyncFromBlock`] mode.
82    ///
83    /// See [`EventScanner`] for general startup notes.
84    ///
85    /// # Errors
86    ///
87    /// * [`ScannerError::Timeout`] - if an RPC call required for startup times out.
88    /// * [`ScannerError::RpcError`] - if an RPC call required for startup fails.
89    /// * [`ScannerError::BlockNotFound`] - if `from_block` cannot be resolved.
90    pub async fn start(self) -> Result<StartProof, ScannerError> {
91        info!(
92            from_block = ?self.config.from_block,
93            block_confirmations = self.config.block_confirmations,
94            listener_count = self.listeners.len(),
95            "Starting EventScanner in SyncFromBlock mode"
96        );
97
98        let stream = self
99            .block_range_scanner
100            .stream_from(self.config.from_block, self.config.block_confirmations)
101            .await?;
102
103        let buffer_capacity = self.buffer_capacity();
104
105        let handler = StreamHandler::new(
106            self.block_range_scanner.provider().clone(),
107            self.listeners,
108            self.config.max_concurrent_fetches,
109            buffer_capacity,
110        );
111
112        tokio::spawn(async move {
113            handler.handle(stream).await;
114        });
115
116        Ok(StartProof::new())
117    }
118}
119
120#[cfg(test)]
121mod tests {
122    use alloy::{
123        eips::BlockNumberOrTag,
124        network::Ethereum,
125        node_bindings::Anvil,
126        primitives::keccak256,
127        providers::{Provider, ProviderBuilder, RootProvider, ext::AnvilApi, mock::Asserter},
128        rpc::client::RpcClient,
129    };
130
131    use crate::{
132        block_range_scanner::{
133            DEFAULT_BLOCK_CONFIRMATIONS, DEFAULT_MAX_BLOCK_RANGE, DEFAULT_STREAM_BUFFER_CAPACITY,
134        },
135        event_scanner::builder::DEFAULT_MAX_CONCURRENT_FETCHES,
136    };
137
138    use super::*;
139
140    #[test]
141    fn sync_scanner_builder_pattern() {
142        let builder = EventScannerBuilder::sync()
143            .from_block(50)
144            .max_block_range(25)
145            .block_confirmations(5)
146            .max_concurrent_fetches(10)
147            .buffer_capacity(33);
148
149        assert_eq!(builder.block_range_scanner.max_block_range, 25);
150        assert_eq!(builder.config.block_confirmations, 5);
151        assert_eq!(builder.config.max_concurrent_fetches, 10);
152        assert_eq!(builder.config.from_block, BlockNumberOrTag::Number(50).into());
153        assert_eq!(builder.block_range_scanner.buffer_capacity, 33);
154    }
155
156    #[test]
157    fn sync_scanner_builder_default_values() {
158        let builder = EventScannerBuilder::sync().from_block(BlockNumberOrTag::Earliest);
159
160        assert_eq!(builder.config.from_block, BlockNumberOrTag::Earliest.into());
161        assert_eq!(builder.config.block_confirmations, DEFAULT_BLOCK_CONFIRMATIONS);
162        assert_eq!(builder.config.max_concurrent_fetches, DEFAULT_MAX_CONCURRENT_FETCHES);
163        assert_eq!(builder.block_range_scanner.max_block_range, DEFAULT_MAX_BLOCK_RANGE);
164        assert_eq!(builder.block_range_scanner.buffer_capacity, DEFAULT_STREAM_BUFFER_CAPACITY);
165    }
166
167    #[tokio::test]
168    async fn accepts_zero_confirmations() -> anyhow::Result<()> {
169        let anvil = Anvil::new().try_spawn().unwrap();
170        let provider = ProviderBuilder::new().connect_http(anvil.endpoint_url());
171
172        let scanner = EventScannerBuilder::sync()
173            .from_block(BlockNumberOrTag::Earliest)
174            .block_confirmations(0)
175            .connect(provider)
176            .await?;
177
178        assert_eq!(scanner.config.block_confirmations, 0);
179
180        Ok(())
181    }
182
183    #[test]
184    fn sync_scanner_builder_last_call_wins() {
185        let builder = EventScannerBuilder::sync()
186            .from_block(2)
187            .max_block_range(25)
188            .max_block_range(55)
189            .max_block_range(105)
190            .block_confirmations(5)
191            .block_confirmations(7)
192            .max_concurrent_fetches(10)
193            .max_concurrent_fetches(20)
194            .buffer_capacity(20)
195            .buffer_capacity(40);
196
197        assert_eq!(builder.block_range_scanner.max_block_range, 105);
198        assert_eq!(builder.config.from_block, BlockNumberOrTag::Number(2).into());
199        assert_eq!(builder.config.block_confirmations, 7);
200        assert_eq!(builder.config.max_concurrent_fetches, 20);
201        assert_eq!(builder.block_range_scanner.buffer_capacity, 40);
202    }
203
204    #[tokio::test]
205    async fn returns_error_with_zero_max_concurrent_fetches() {
206        let provider = RootProvider::<Ethereum>::new(RpcClient::mocked(Asserter::new()));
207        let result = EventScannerBuilder::sync()
208            .from_block(0)
209            .max_concurrent_fetches(0)
210            .connect(provider)
211            .await;
212
213        assert!(matches!(result, Err(ScannerError::InvalidMaxConcurrentFetches)));
214    }
215
216    #[tokio::test]
217    async fn test_sync_from_block_returns_error_with_zero_max_block_range() {
218        let provider = RootProvider::<Ethereum>::new(RpcClient::mocked(Asserter::new()));
219        let result =
220            EventScannerBuilder::sync().from_block(100).max_block_range(0).connect(provider).await;
221
222        match result {
223            Err(ScannerError::InvalidMaxBlockRange) => {}
224            _ => panic!("Expected InvalidMaxBlockRange error"),
225        }
226    }
227
228    #[tokio::test]
229    async fn returns_error_with_zero_buffer_capacity() {
230        let provider = RootProvider::<Ethereum>::new(RpcClient::mocked(Asserter::new()));
231        let result =
232            EventScannerBuilder::sync().from_block(100).buffer_capacity(0).connect(provider).await;
233
234        assert!(matches!(result, Err(ScannerError::InvalidBufferCapacity)));
235    }
236
237    #[tokio::test]
238    async fn test_sync_from_block_scanner_with_valid_from_hash() {
239        let anvil = Anvil::new().try_spawn().unwrap();
240        let provider = ProviderBuilder::new().connect_http(anvil.endpoint_url());
241
242        provider.anvil_mine(Some(5), None).await.unwrap();
243
244        let block_5_hash =
245            provider.get_block_by_number(5.into()).await.unwrap().unwrap().header.hash;
246
247        let result =
248            EventScannerBuilder::sync().from_block(block_5_hash).connect(provider.clone()).await;
249
250        assert!(result.is_ok());
251    }
252
253    #[tokio::test]
254    async fn test_sync_from_block_scanner_with_invalid_from_hash() {
255        let anvil = Anvil::new().try_spawn().unwrap();
256        let provider = ProviderBuilder::new().connect_http(anvil.endpoint_url());
257
258        let random_hash = keccak256("Invalid Hash");
259        let result = EventScannerBuilder::sync().from_block(random_hash).connect(provider).await;
260
261        assert!(matches!(result, Err(ScannerError::BlockNotFound)));
262    }
263}