Skip to main content

event_scanner/event_scanner/
scanner.rs

1//! Core scanner types and subscription management.
2//!
3//! This module defines [`EventScanner`], [`EventSubscription`], and [`StartProof`] which together
4//! provide the main interface for subscribing to and streaming blockchain events.
5
6use alloy::network::{Ethereum, Network};
7use tokio::sync::mpsc;
8use tokio_stream::wrappers::ReceiverStream;
9
10use crate::{
11    block_range_scanner::BlockRangeScanner,
12    event_scanner::{
13        EventScannerResult, Unspecified, filter::EventFilter, listener::EventListener,
14    },
15};
16
17/// An event scanner configured in mode `Mode` and bound to network `N`.
18///
19/// Create an instance via [`EventScannerBuilder`](crate::EventScannerBuilder), register
20/// subscriptions with [`EventScanner::subscribe`], then start the scanner with the mode-specific
21/// `start()` method.
22///
23/// # Starting the scanner
24///
25/// All scanner modes follow the same general startup pattern:
26///
27/// - **Register subscriptions first**: call [`EventScanner::subscribe`] before starting the scanner
28///   with `start()`. The scanner sends events only to subscriptions that have already been
29///   registered.
30/// - **Non-blocking start**: `start()` returns immediately after spawning background tasks.
31///   Subscription streams yield events asynchronously.
32/// - **Errors after startup**: most runtime failures are delivered through subscription streams as
33///   [`ScannerError`](crate::ScannerError) items, rather than being returned from `start()`.
34#[derive(Debug)]
35pub struct EventScanner<Mode = Unspecified, N: Network = Ethereum> {
36    pub(crate) config: Mode,
37    pub(crate) block_range_scanner: BlockRangeScanner<N>,
38    pub(crate) listeners: Vec<EventListener>,
39}
40
41impl<Mode, N: Network> EventScanner<Mode, N> {
42    pub fn new(config: Mode, block_range_scanner: BlockRangeScanner<N>) -> Self {
43        Self { config, block_range_scanner, listeners: Vec::new() }
44    }
45}
46
47/// A subscription to scanner events that requires proof the scanner has started.
48///
49/// Created by [`EventScanner::subscribe()`](crate::EventScanner::subscribe), this type holds the
50/// underlying stream but prevents access until [`stream()`](EventSubscription::stream) is called
51/// with a valid [`StartProof`].
52///
53/// This pattern ensures at compile time that `EventScanner::start()` is called before attempting to
54/// read from the event stream.
55///
56/// # Example
57///
58/// ```no_run
59/// # use alloy::{network::Ethereum, providers::{Provider, ProviderBuilder}};
60/// # use event_scanner::{EventFilter, EventScannerBuilder, Message};
61/// # use robust_provider::RobustProviderBuilder;
62/// # use tokio_stream::StreamExt;
63/// #
64/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
65/// # let contract_address = alloy::primitives::address!("0xd8dA6BF26964af9d7eed9e03e53415d37aa96045");
66/// # let provider = ProviderBuilder::new().connect("ws://localhost:8545").await?;
67/// # let provider = RobustProviderBuilder::new(provider).build().await?;
68/// let mut scanner = EventScannerBuilder::live().connect(provider).await?;
69///
70/// let filter = EventFilter::new().contract_address(contract_address);
71///
72/// // Create subscription (cannot access stream yet)
73/// let subscription = scanner.subscribe(filter);
74///
75/// // Start scanner and get proof of it starting
76/// let proof = scanner.start().await?;
77///
78/// // Now access the stream with the proof
79/// let mut stream = subscription.stream(&proof);
80///
81/// while let Some(msg) = stream.next().await {
82///     // process events
83/// }
84/// # Ok(())
85/// # }
86/// ```
87pub struct EventSubscription {
88    inner: ReceiverStream<EventScannerResult>,
89}
90
91impl EventSubscription {
92    /// Creates a new subscription wrapping the given stream.
93    pub(crate) fn new(inner: ReceiverStream<EventScannerResult>) -> Self {
94        Self { inner }
95    }
96
97    /// Access the event stream.
98    ///
99    /// Requires a reference to a [`StartProof`] as proof that the scanner
100    /// has been started. The proof is obtained by calling
101    /// `EventScanner::start()`.
102    ///
103    /// # Arguments
104    ///
105    /// * `_proof` - Proof that the scanner has been started
106    #[must_use]
107    pub fn stream(self, _proof: &StartProof) -> ReceiverStream<EventScannerResult> {
108        self.inner
109    }
110}
111
112impl<Mode, N: Network> EventScanner<Mode, N> {
113    /// Returns the configured stream buffer capacity.
114    #[must_use]
115    pub fn buffer_capacity(&self) -> usize {
116        self.block_range_scanner.buffer_capacity()
117    }
118
119    /// Registers an event subscription.
120    ///
121    /// Each call creates a separate subscription with its own buffer.
122    ///
123    /// # Ordering
124    ///
125    /// Ordering is guaranteed only within a single returned subscription. There is no ordering
126    /// guarantee across subscriptions created by multiple calls to this method.
127    #[must_use]
128    pub fn subscribe(&mut self, filter: EventFilter) -> EventSubscription {
129        let (sender, receiver) =
130            mpsc::channel::<EventScannerResult>(self.block_range_scanner.buffer_capacity());
131        self.listeners.push(EventListener { filter, sender });
132        EventSubscription::new(ReceiverStream::new(receiver))
133    }
134}
135
136/// Proof that the scanner has been started.
137///
138/// This proof is returned by `EventScanner::start()` and must be passed to
139/// [`EventSubscription::stream()`] to access the event stream. This ensures at compile
140/// time that the scanner is started before attempting to read events.
141///
142/// # Example
143///
144/// ```no_run
145/// # use alloy::{network::Ethereum, providers::{Provider, ProviderBuilder}};
146/// # use event_scanner::{EventFilter, EventScannerBuilder, Message};
147/// # use robust_provider::RobustProviderBuilder;
148/// # use tokio_stream::StreamExt;
149/// #
150/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
151/// # let contract_address = alloy::primitives::address!("0xd8dA6BF26964af9d7eed9e03e53415d37aa96045");
152/// # let provider = ProviderBuilder::new().connect("ws://localhost:8545").await?;
153/// # let provider = RobustProviderBuilder::new(provider).build().await?;
154/// let mut scanner = EventScannerBuilder::sync().from_block(0).connect(provider).await?;
155///
156/// let filter = EventFilter::new().contract_address(contract_address);
157///
158/// // Create subscription (cannot access stream yet)
159/// let subscription = scanner.subscribe(filter);
160///
161/// // Start scanner and get proof of it starting
162/// let proof = scanner.start().await?;
163///
164/// // Now access the stream with the proof
165/// let mut stream = subscription.stream(&proof);
166///
167/// while let Some(msg) = stream.next().await {
168///     // process events
169/// }
170/// # Ok(())
171/// # }
172/// ```
173#[derive(Debug, Clone)]
174pub struct StartProof {
175    /// Private field prevents construction outside this crate
176    _private: (),
177}
178
179impl StartProof {
180    /// Creates a new start proof.
181    #[must_use]
182    pub(crate) fn new() -> Self {
183        Self { _private: () }
184    }
185}
186
187#[cfg(test)]
188mod tests {
189    use alloy::{
190        providers::{RootProvider, mock::Asserter},
191        rpc::client::RpcClient,
192    };
193
194    use crate::{BlockRangeScannerBuilder, Historic};
195
196    use super::*;
197
198    #[tokio::test]
199    async fn test_historic_event_stream_listeners_vector_updates() -> anyhow::Result<()> {
200        let provider = RootProvider::<Ethereum>::new(RpcClient::mocked(Asserter::new()));
201        let brs = BlockRangeScannerBuilder::new().connect(provider).await?;
202
203        let mut scanner = EventScanner::new(Historic::default(), brs);
204
205        assert!(scanner.listeners.is_empty());
206
207        let _stream1 = scanner.subscribe(EventFilter::new());
208        assert_eq!(scanner.listeners.len(), 1);
209
210        let _stream2 = scanner.subscribe(EventFilter::new());
211        let _stream3 = scanner.subscribe(EventFilter::new());
212        assert_eq!(scanner.listeners.len(), 3);
213
214        Ok(())
215    }
216
217    #[tokio::test]
218    async fn test_historic_event_stream_channel_capacity() -> anyhow::Result<()> {
219        let provider = RootProvider::<Ethereum>::new(RpcClient::mocked(Asserter::new()));
220
221        let brs = BlockRangeScannerBuilder::new().connect(provider.clone()).await?;
222
223        let mut scanner = EventScanner::new(Historic::default(), brs);
224
225        let _ = scanner.subscribe(EventFilter::new());
226        let sender = &scanner.listeners[0].sender;
227        assert_eq!(sender.capacity(), scanner.block_range_scanner.buffer_capacity());
228
229        let custom_capacity = 1000;
230
231        let brs = BlockRangeScannerBuilder::new()
232            .buffer_capacity(custom_capacity)
233            .connect(provider)
234            .await?;
235
236        let mut scanner = EventScanner::new(Historic::default(), brs);
237
238        let _ = scanner.subscribe(EventFilter::new());
239        let sender = &scanner.listeners[0].sender;
240        assert_eq!(sender.capacity(), custom_capacity);
241
242        Ok(())
243    }
244}