event_scanner/event_scanner/scanner.rs
1//! Core scanner types and subscription management.
2//!
3//! This module defines [`EventScanner`], [`EventSubscription`], and [`StartProof`] which together
4//! provide the main interface for subscribing to and streaming blockchain events.
5
6use alloy::network::{Ethereum, Network};
7use tokio::sync::mpsc;
8use tokio_stream::wrappers::ReceiverStream;
9
10use crate::{
11 block_range_scanner::BlockRangeScanner,
12 event_scanner::{
13 EventScannerResult, Unspecified, filter::EventFilter, listener::EventListener,
14 },
15};
16
17/// An event scanner configured in mode `Mode` and bound to network `N`.
18///
19/// Create an instance via [`EventScannerBuilder`](crate::EventScannerBuilder), register
20/// subscriptions with [`EventScanner::subscribe`], then start the scanner with the mode-specific
21/// `start()` method.
22///
23/// # Starting the scanner
24///
25/// All scanner modes follow the same general startup pattern:
26///
27/// - **Register subscriptions first**: call [`EventScanner::subscribe`] before starting the scanner
28/// with `start()`. The scanner sends events only to subscriptions that have already been
29/// registered.
30/// - **Non-blocking start**: `start()` returns immediately after spawning background tasks.
31/// Subscription streams yield events asynchronously.
32/// - **Errors after startup**: most runtime failures are delivered through subscription streams as
33/// [`ScannerError`](crate::ScannerError) items, rather than being returned from `start()`.
34#[derive(Debug)]
35pub struct EventScanner<Mode = Unspecified, N: Network = Ethereum> {
36 pub(crate) config: Mode,
37 pub(crate) block_range_scanner: BlockRangeScanner<N>,
38 pub(crate) listeners: Vec<EventListener>,
39}
40
41impl<Mode, N: Network> EventScanner<Mode, N> {
42 pub fn new(config: Mode, block_range_scanner: BlockRangeScanner<N>) -> Self {
43 Self { config, block_range_scanner, listeners: Vec::new() }
44 }
45}
46
47/// A subscription to scanner events that requires proof the scanner has started.
48///
49/// Created by [`EventScanner::subscribe()`](crate::EventScanner::subscribe), this type holds the
50/// underlying stream but prevents access until [`stream()`](EventSubscription::stream) is called
51/// with a valid [`StartProof`].
52///
53/// This pattern ensures at compile time that `EventScanner::start()` is called before attempting to
54/// read from the event stream.
55///
56/// # Example
57///
58/// ```no_run
59/// # use alloy::{network::Ethereum, providers::{Provider, ProviderBuilder}};
60/// # use event_scanner::{EventFilter, EventScannerBuilder, Message};
61/// # use robust_provider::RobustProviderBuilder;
62/// # use tokio_stream::StreamExt;
63/// #
64/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
65/// # let contract_address = alloy::primitives::address!("0xd8dA6BF26964af9d7eed9e03e53415d37aa96045");
66/// # let provider = ProviderBuilder::new().connect("ws://localhost:8545").await?;
67/// # let provider = RobustProviderBuilder::new(provider).build().await?;
68/// let mut scanner = EventScannerBuilder::live().connect(provider).await?;
69///
70/// let filter = EventFilter::new().contract_address(contract_address);
71///
72/// // Create subscription (cannot access stream yet)
73/// let subscription = scanner.subscribe(filter);
74///
75/// // Start scanner and get proof of it starting
76/// let proof = scanner.start().await?;
77///
78/// // Now access the stream with the proof
79/// let mut stream = subscription.stream(&proof);
80///
81/// while let Some(msg) = stream.next().await {
82/// // process events
83/// }
84/// # Ok(())
85/// # }
86/// ```
87pub struct EventSubscription {
88 inner: ReceiverStream<EventScannerResult>,
89}
90
91impl EventSubscription {
92 /// Creates a new subscription wrapping the given stream.
93 pub(crate) fn new(inner: ReceiverStream<EventScannerResult>) -> Self {
94 Self { inner }
95 }
96
97 /// Access the event stream.
98 ///
99 /// Requires a reference to a [`StartProof`] as proof that the scanner
100 /// has been started. The proof is obtained by calling
101 /// `EventScanner::start()`.
102 ///
103 /// # Arguments
104 ///
105 /// * `_proof` - Proof that the scanner has been started
106 #[must_use]
107 pub fn stream(self, _proof: &StartProof) -> ReceiverStream<EventScannerResult> {
108 self.inner
109 }
110}
111
112impl<Mode, N: Network> EventScanner<Mode, N> {
113 /// Returns the configured stream buffer capacity.
114 #[must_use]
115 pub fn buffer_capacity(&self) -> usize {
116 self.block_range_scanner.buffer_capacity()
117 }
118
119 /// Registers an event subscription.
120 ///
121 /// Each call creates a separate subscription with its own buffer.
122 ///
123 /// # Ordering
124 ///
125 /// Ordering is guaranteed only within a single returned subscription. There is no ordering
126 /// guarantee across subscriptions created by multiple calls to this method.
127 #[must_use]
128 pub fn subscribe(&mut self, filter: EventFilter) -> EventSubscription {
129 let (sender, receiver) =
130 mpsc::channel::<EventScannerResult>(self.block_range_scanner.buffer_capacity());
131 self.listeners.push(EventListener { filter, sender });
132 EventSubscription::new(ReceiverStream::new(receiver))
133 }
134}
135
136/// Proof that the scanner has been started.
137///
138/// This proof is returned by `EventScanner::start()` and must be passed to
139/// [`EventSubscription::stream()`] to access the event stream. This ensures at compile
140/// time that the scanner is started before attempting to read events.
141///
142/// # Example
143///
144/// ```no_run
145/// # use alloy::{network::Ethereum, providers::{Provider, ProviderBuilder}};
146/// # use event_scanner::{EventFilter, EventScannerBuilder, Message};
147/// # use robust_provider::RobustProviderBuilder;
148/// # use tokio_stream::StreamExt;
149/// #
150/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
151/// # let contract_address = alloy::primitives::address!("0xd8dA6BF26964af9d7eed9e03e53415d37aa96045");
152/// # let provider = ProviderBuilder::new().connect("ws://localhost:8545").await?;
153/// # let provider = RobustProviderBuilder::new(provider).build().await?;
154/// let mut scanner = EventScannerBuilder::sync().from_block(0).connect(provider).await?;
155///
156/// let filter = EventFilter::new().contract_address(contract_address);
157///
158/// // Create subscription (cannot access stream yet)
159/// let subscription = scanner.subscribe(filter);
160///
161/// // Start scanner and get proof of it starting
162/// let proof = scanner.start().await?;
163///
164/// // Now access the stream with the proof
165/// let mut stream = subscription.stream(&proof);
166///
167/// while let Some(msg) = stream.next().await {
168/// // process events
169/// }
170/// # Ok(())
171/// # }
172/// ```
173#[derive(Debug, Clone)]
174pub struct StartProof {
175 /// Private field prevents construction outside this crate
176 _private: (),
177}
178
179impl StartProof {
180 /// Creates a new start proof.
181 #[must_use]
182 pub(crate) fn new() -> Self {
183 Self { _private: () }
184 }
185}
186
187#[cfg(test)]
188mod tests {
189 use alloy::{
190 providers::{RootProvider, mock::Asserter},
191 rpc::client::RpcClient,
192 };
193
194 use crate::{BlockRangeScannerBuilder, Historic};
195
196 use super::*;
197
198 #[tokio::test]
199 async fn test_historic_event_stream_listeners_vector_updates() -> anyhow::Result<()> {
200 let provider = RootProvider::<Ethereum>::new(RpcClient::mocked(Asserter::new()));
201 let brs = BlockRangeScannerBuilder::new().connect(provider).await?;
202
203 let mut scanner = EventScanner::new(Historic::default(), brs);
204
205 assert!(scanner.listeners.is_empty());
206
207 let _stream1 = scanner.subscribe(EventFilter::new());
208 assert_eq!(scanner.listeners.len(), 1);
209
210 let _stream2 = scanner.subscribe(EventFilter::new());
211 let _stream3 = scanner.subscribe(EventFilter::new());
212 assert_eq!(scanner.listeners.len(), 3);
213
214 Ok(())
215 }
216
217 #[tokio::test]
218 async fn test_historic_event_stream_channel_capacity() -> anyhow::Result<()> {
219 let provider = RootProvider::<Ethereum>::new(RpcClient::mocked(Asserter::new()));
220
221 let brs = BlockRangeScannerBuilder::new().connect(provider.clone()).await?;
222
223 let mut scanner = EventScanner::new(Historic::default(), brs);
224
225 let _ = scanner.subscribe(EventFilter::new());
226 let sender = &scanner.listeners[0].sender;
227 assert_eq!(sender.capacity(), scanner.block_range_scanner.buffer_capacity());
228
229 let custom_capacity = 1000;
230
231 let brs = BlockRangeScannerBuilder::new()
232 .buffer_capacity(custom_capacity)
233 .connect(provider)
234 .await?;
235
236 let mut scanner = EventScanner::new(Historic::default(), brs);
237
238 let _ = scanner.subscribe(EventFilter::new());
239 let sender = &scanner.listeners[0].sender;
240 assert_eq!(sender.capacity(), custom_capacity);
241
242 Ok(())
243 }
244}