EventScannerBuilder

Struct EventScannerBuilder 

Source
pub struct EventScannerBuilder<M> { /* private fields */ }

Implementations§

Source§

impl EventScannerBuilder<Historic>

Source

pub fn from_block(self, block_id: impl Into<BlockId>) -> Self

Source

pub fn to_block(self, block_id: impl Into<BlockId>) -> Self

Source

pub async fn connect<N: Network>( self, provider: impl IntoRobustProvider<N>, ) -> Result<EventScanner<Historic, N>, ScannerError>

Connects to an existing provider with block range validation.

Validates that the maximum of from_block and to_block does not exceed the latest block on the chain.

§Errors

Returns an error if:

  • The provider connection fails
  • The specified block range exceeds the latest block on the chain
  • The max block range is zero
Source§

impl EventScannerBuilder<LatestEvents>

Source

pub fn block_confirmations(self, confirmations: u64) -> Self

Source

pub fn from_block(self, block_id: impl Into<BlockId>) -> Self

Source

pub fn to_block(self, block_id: impl Into<BlockId>) -> Self

Source

pub async fn connect<N: Network>( self, provider: impl IntoRobustProvider<N>, ) -> Result<EventScanner<LatestEvents, N>, ScannerError>

Connects to an existing provider.

§Errors

Returns an error if:

  • The provider connection fails
  • The event count is zero
  • The max block range is zero
Source§

impl EventScannerBuilder<Live>

Source

pub fn block_confirmations(self, confirmations: u64) -> Self

Source

pub async fn connect<N: Network>( self, provider: impl IntoRobustProvider<N>, ) -> Result<EventScanner<Live, N>, ScannerError>

Connects to an existing provider.

§Errors

Returns an error if:

  • The provider connection fails
  • The max block range is zero
Source§

impl EventScannerBuilder<SyncFromBlock>

Source

pub fn block_confirmations(self, confirmations: u64) -> Self

Source

pub async fn connect<N: Network>( self, provider: impl IntoRobustProvider<N>, ) -> Result<EventScanner<SyncFromBlock, N>, ScannerError>

Connects to an existing provider.

§Errors

Returns an error if:

  • The provider connection fails
  • The max block range is zero
Source§

impl EventScannerBuilder<SyncFromLatestEvents>

Source

pub fn block_confirmations(self, confirmations: u64) -> Self

Source

pub async fn connect<N: Network>( self, provider: impl IntoRobustProvider<N>, ) -> Result<EventScanner<SyncFromLatestEvents, N>, ScannerError>

Connects to an existing provider.

§Errors

Returns an error if:

  • The provider connection fails
  • The event count is zero
  • The max block range is zero
Source§

impl EventScannerBuilder<Synchronize>

Source

pub fn from_latest( self, count: usize, ) -> EventScannerBuilder<SyncFromLatestEvents>

Scans the latest count matching events per registered listener, then automatically transitions to live streaming mode.

This method combines two scanning phases into a single operation:

  1. Latest events phase: Collects up to count most recent events by scanning backwards from the current chain tip. Events are delivered in chronological order.
  2. Automatic transition: Emits Notification::SwitchingToLive to signal the mode change
  3. Live streaming phase: Continuously monitors and streams new events as they arrive on-chain
§Example
// Fetch the latest 10 events, then stream new events continuously
let provider = ProviderBuilder::new().connect("ws://localhost:8545").await?;
let robust_provider = RobustProviderBuilder::new(provider).build().await?;
let mut scanner = EventScannerBuilder::sync()
    .from_latest(10)
    .connect(robust_provider)
    .await?;

let filter = EventFilter::new().contract_address(contract_address);
let mut stream = scanner.subscribe(filter);

scanner.start().await?;

while let Some(msg) = stream.next().await {
    match msg {
        Ok(Message::Data(logs)) => {
            println!("Received {} events", logs.len());
        }
        Ok(Message::Notification(notification)) => {
            println!("Notification received: {:?}", notification);
            // You'll see Notification::SwitchingToLive when transitioning
        }
        Err(e) => {
            eprintln!("Error: {}", e);
        }
    }
}
§How it works

The scanner captures the latest block number before starting to establish a clear boundary between phases. The “latest events” phase scans from the current latest block to the genesis block, while the live phase starts from the block after the latest block. This design prevents duplicate events and handles race conditions where new blocks arrive during setup.

§Key behaviors
  • No duplicates: Events are not delivered twice across the phase transition
  • Flexible count: If fewer than count events exist, returns all available events
  • Reorg handling: Both phases handle reorgs appropriately:
    • Latest events phase: resets and rescans on reorg detection
    • Live phase: resets stream to the first post-reorg block that satisfies the configured block confirmations
  • Continuous operation: Live phase continues indefinitely until the scanner is dropped
§Arguments
  • count - Maximum number of recent events to collect per listener before switching to live streaming (must be greater than 0)
§Important notes
  • The live phase continues indefinitely until the scanner is dropped or encounters an error
§Detailed reorg behavior
  • Latest events phase: Restart the scanner. On detecting a reorg, emits Notification::ReorgDetected, resets the rewind start to the new tip, and continues until collectors accumulate count logs. Final delivery to listeners preserves chronological order.
  • Live streaming phase: Starts from latest_block + 1 and respects the configured block confirmations. On reorg, emits Notification::ReorgDetected, adjusts the next confirmed window (possibly re-emitting confirmed portions), and continues streaming.
Source

pub fn from_block( self, block_id: impl Into<BlockId>, ) -> EventScannerBuilder<SyncFromBlock>

Streams events from a specific starting block to the present, then automatically transitions to live streaming mode.

This method combines two scanning phases into a single operation:

  1. Historical sync phase: Streams events from from_block up to the current confirmed tip
  2. Automatic transition: Emits Notification::SwitchingToLive to signal the mode change
  3. Live streaming phase: Continuously monitors and streams new events as they arrive on-chain
§Example
// Sync from block 1_000_000 to present, then stream new events
let provider = ProviderBuilder::new().connect("ws://localhost:8545").await?;
let robust_provider = RobustProviderBuilder::new(provider).build().await?;
let mut scanner = EventScannerBuilder::sync()
    .from_block(1_000_000)
    .connect(robust_provider)
    .await?;

let filter = EventFilter::new().contract_address(contract_address);
let mut stream = scanner.subscribe(filter);

scanner.start().await?;

while let Some(msg) = stream.next().await {
    match msg {
        Ok(Message::Data(logs)) => {
            println!("Received {} events", logs.len());
        }
        Ok(Message::Notification(notification)) => {
            println!("Notification received: {:?}", notification);
            // You'll see Notification::SwitchingToLive when transitioning
        }
        Err(e) => {
            eprintln!("Error: {}", e);
        }
    }
}

Using block tags:

// Sync from genesis block
let provider = ProviderBuilder::new().connect("ws://localhost:8545").await?;
let robust_provider = RobustProviderBuilder::new(provider).build().await?;
let mut scanner = EventScannerBuilder::sync()
    .from_block(BlockNumberOrTag::Earliest)
    .connect(robust_provider)
    .await?;
§How it works

The scanner first streams all events from the specified starting block up to the current confirmed tip (respecting block_confirmations). Once caught up, it seamlessly transitions to live mode and continues streaming new events as blocks are produced.

§Key behaviors
  • No duplicates: Events are not delivered twice across the phase transition
  • Chronological order: Historical events are delivered oldest to newest
  • Seamless transition: Automatically switches to live mode when caught up
  • Continuous operation: Live phase continues indefinitely until the scanner is dropped
§Arguments
  • block_id - Starting block id
§Important notes
  • The live phase continues indefinitely until the scanner is dropped or encounters an error
§Reorg behavior
  • Historical sync phase: Streams events in chronological order without reorg detection
  • Live streaming phase: Respects the configured block confirmations. On reorg, emits Notification::ReorgDetected, adjusts the next confirmed window (possibly re-emitting confirmed portions), and continues streaming.
Source§

impl EventScannerBuilder<Unspecified>

Source

pub fn historic() -> EventScannerBuilder<Historic>

Streams events from a historical block range.

§Example
// Stream all events from genesis to latest block
let provider = ProviderBuilder::new().connect("ws://localhost:8545").await?;
let robust_provider = RobustProviderBuilder::new(provider).build().await?;
let mut scanner = EventScannerBuilder::historic().connect(robust_provider).await?;

let filter = EventFilter::new().contract_address(contract_address);
let mut stream = scanner.subscribe(filter);

scanner.start().await?;

while let Some(Ok(Message::Data(logs))) = stream.next().await {
    println!("Received {} logs", logs.len());
}

Specifying a custom block range:

// Stream events between blocks [1_000_000, 2_000_000]
let provider = ProviderBuilder::new().connect("ws://localhost:8545").await?;
let robust_provider = RobustProviderBuilder::new(provider).build().await?;
let mut scanner = EventScannerBuilder::historic()
    .from_block(1_000_000)
    .to_block(2_000_000)
    .connect(robust_provider)
    .await?;
§How it works

The scanner streams events in chronological order (oldest to newest) within the specified block range. Events are delivered in batches as they are fetched from the provider, with batch sizes controlled by the max_block_range configuration.

§Key behaviors
  • Continuous streaming: Events are delivered in multiple messages as they are fetched
  • Chronological order: Events are always delivered oldest to newest
  • Default range: By default, scans from Earliest to Latest block
  • Batch control: Use .max_block_range(n) to control how many blocks are queried per RPC call
  • Completion: The scanner completes when the entire range has been processed
Source

pub fn live() -> EventScannerBuilder<Live>

Streams new events as blocks are produced on-chain.

§Example
// Stream new events as they arrive
let provider = ProviderBuilder::new().connect("ws://localhost:8545").await?;
let robust_provider = RobustProviderBuilder::new(provider).build().await?;
let mut scanner = EventScannerBuilder::live()
    .block_confirmations(20)
    .connect(robust_provider)
    .await?;

let filter = EventFilter::new().contract_address(contract_address);
let mut stream = scanner.subscribe(filter);

scanner.start().await?;

while let Some(msg) = stream.next().await {
    match msg {
        Ok(Message::Data(logs)) => {
            println!("Received {} new events", logs.len());
        }
        Ok(Message::Notification(notification)) => {
            println!("Notification received: {:?}", notification);
        }
        Err(e) => {
            eprintln!("Error: {}", e);
        }
    }
}
§How it works

The scanner subscribes to new blocks via WebSocket and streams events from confirmed blocks. The block_confirmations setting determines how many blocks to wait before considering a block confirmed, providing protection against chain reorganizations.

§Key behaviors
  • Real-time streaming: Events are delivered as new blocks are confirmed
  • Reorg protection: Waits for configured confirmations before emitting events
  • Continuous operation: Runs indefinitely until the scanner is dropped or encounters an error
  • Default confirmations: By default, waits for 12 block confirmations
§Reorg behavior

When a reorg is detected:

  1. Emits Notification::ReorgDetected to all listeners
  2. Adjusts the next confirmed range using block_confirmations
  3. Re-emits events from the corrected confirmed block range
  4. Continues streaming from the new chain state
Source

pub fn sync() -> EventScannerBuilder<Synchronize>

Creates a builder for sync mode scanners that combine historical catch-up with live streaming.

This method returns a builder that must be further narrowed down:

// Sync from block mode
EventScannerBuilder::sync().from_block(1_000_000);
// Sync from latest events mode
EventScannerBuilder::sync().from_latest(10);

See from_block and from_latest for details on each mode.

Source

pub fn latest(count: usize) -> EventScannerBuilder<LatestEvents>

Streams the latest count matching events per registered listener.

§Example
// Collect the latest 10 events across Earliest..=Latest
let provider = ProviderBuilder::new().connect("ws://localhost:8545").await?;
let robust_provider = RobustProviderBuilder::new(provider).build().await?;
let mut scanner = EventScannerBuilder::latest(10).connect(robust_provider).await?;

let filter = EventFilter::new().contract_address(contract_address);
let mut stream = scanner.subscribe(filter);

scanner.start().await?;

// Expect a single message with up to 10 logs, then the stream ends
while let Some(Ok(Message::Data(logs))) = stream.next().await {
    println!("Latest logs: {}", logs.len());
}

Restricting to a specific block range:

// Collect the latest 5 events between blocks [1_000_000, 1_100_000]
let provider = ProviderBuilder::new().connect("ws://localhost:8545").await?;
let robust_provider = RobustProviderBuilder::new(provider).build().await?;
let mut scanner = EventScannerBuilder::latest(5)
    .from_block(1_000_000)
    .to_block(1_100_000)
    .connect(robust_provider);
§How it works

The scanner performs a reverse-ordered scan (newest to oldest) within the specified block range, collecting up to count events per registered listener. Once the target count is reached or the range is exhausted, it delivers the events in chronological order (oldest to newest) and completes.

When using a custom block range, the scanner automatically normalizes the range boundaries. This means you can specify from_block and to_block in any order - the scanner will always scan from the higher block number down to the lower one, regardless of which parameter holds which value.

§Key behaviors
  • Single delivery: Each registered stream receives at most count logs in a single message, chronologically ordered
  • One-shot operation: The scanner completes after delivering messages; it does not continue streaming
  • Flexible count: If fewer than count events exist in the range, returns all available events
  • Default range: By default, scans from Earliest to Latest block
  • Reorg handling: Periodically checks the tip to detect reorgs during the scan
§Arguments
  • count - Maximum number of recent events to collect per listener (must be greater than 0)
§Reorg behavior

During the scan, the scanner periodically checks the tip to detect reorgs. On reorg detection:

  1. Emits Notification::ReorgDetected to all listeners
  2. Resets to the updated tip
  3. Restarts the scan from the new tip
  4. Continues until count events are collected

Final delivery to log listeners preserves chronological order regardless of reorgs.

§Notes

For continuous streaming after collecting latest events, use EventScannerBuilder::sync().from_latest(count) instead

Source§

impl EventScannerBuilder<LatestEvents>

Source

pub fn new(count: usize) -> Self

Source§

impl EventScannerBuilder<SyncFromLatestEvents>

Source

pub fn new(count: usize) -> Self

Source§

impl EventScannerBuilder<SyncFromBlock>

Source

pub fn new(from_block: impl Into<BlockId>) -> Self

Source§

impl<M> EventScannerBuilder<M>

Source

pub fn max_block_range(self, max_block_range: u64) -> Self

Sets the maximum block range per event batch.

Controls how the scanner splits a large block range into smaller batches for processing. Each batch corresponds to a single RPC call to fetch logs. This prevents timeouts and respects rate limits imposed by node providers.

§Arguments
  • max_block_range - Maximum number of blocks to process per batch (must be greater than 0)
§Example

If scanning events from blocks 1000–1099 (100 blocks total) with max_block_range(30):

  • Batch 1: blocks 1000–1029 (30 blocks)
  • Batch 2: blocks 1030–1059 (30 blocks)
  • Batch 3: blocks 1060–1089 (30 blocks)
  • Batch 4: blocks 1090–1099 (10 blocks)

Trait Implementations§

Source§

impl<M: Default> Default for EventScannerBuilder<M>

Source§

fn default() -> EventScannerBuilder<M>

Returns the “default value” for a type. Read more

Auto Trait Implementations§

§

impl<M> Freeze for EventScannerBuilder<M>
where M: Freeze,

§

impl<M> RefUnwindSafe for EventScannerBuilder<M>
where M: RefUnwindSafe,

§

impl<M> Send for EventScannerBuilder<M>
where M: Send,

§

impl<M> Sync for EventScannerBuilder<M>
where M: Sync,

§

impl<M> Unpin for EventScannerBuilder<M>
where M: Unpin,

§

impl<M> UnwindSafe for EventScannerBuilder<M>
where M: UnwindSafe,

Blanket Implementations§

Source§

impl<T> Any for T
where T: 'static + ?Sized,

Source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
Source§

impl<T> Borrow<T> for T
where T: ?Sized,

Source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
Source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

Source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
Source§

impl<T> From<T> for T

Source§

fn from(t: T) -> T

Returns the argument unchanged.

Source§

impl<T> Instrument for T

Source§

fn instrument(self, span: Span) -> Instrumented<Self>

Instruments this type with the provided Span, returning an Instrumented wrapper. Read more
Source§

fn in_current_span(self) -> Instrumented<Self>

Instruments this type with the current Span, returning an Instrumented wrapper. Read more
Source§

impl<T, U> Into<U> for T
where U: From<T>,

Source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

Source§

impl<T> IntoEither for T

Source§

fn into_either(self, into_left: bool) -> Either<Self, Self>

Converts self into a Left variant of Either<Self, Self> if into_left is true. Converts self into a Right variant of Either<Self, Self> otherwise. Read more
Source§

fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
where F: FnOnce(&Self) -> bool,

Converts self into a Left variant of Either<Self, Self> if into_left(&self) returns true. Converts self into a Right variant of Either<Self, Self> otherwise. Read more
Source§

impl<T> PolicyExt for T
where T: ?Sized,

Source§

fn and<P, B, E>(self, other: P) -> And<T, P>
where T: Policy<B, E>, P: Policy<B, E>,

Create a new Policy that returns Action::Follow only if self and other return Action::Follow. Read more
Source§

fn or<P, B, E>(self, other: P) -> Or<T, P>
where T: Policy<B, E>, P: Policy<B, E>,

Create a new Policy that returns Action::Follow if either self or other returns Action::Follow. Read more
Source§

impl<T> Same for T

Source§

type Output = T

Should always be Self
Source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

Source§

type Error = Infallible

The type returned in the event of a conversion error.
Source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
Source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

Source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
Source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.
Source§

impl<V, T> VZip<V> for T
where V: MultiLane<T>,

Source§

fn vzip(self) -> V

Source§

impl<T> WithSubscriber for T

Source§

fn with_subscriber<S>(self, subscriber: S) -> WithDispatch<Self>
where S: Into<Dispatch>,

Attaches the provided Subscriber to this type, returning a WithDispatch wrapper. Read more
Source§

fn with_current_subscriber(self) -> WithDispatch<Self>

Attaches the current default Subscriber to this type, returning a WithDispatch wrapper. Read more