event_scanner/
types.rs

1use std::fmt::Debug;
2
3use tokio::sync::mpsc;
4
5use crate::ScannerError;
6
7/// Messages streamed by the scanner to subscribers.
8///
9/// Each message represents either data or a notification about the scanner's state or behavior.
10#[derive(Copy, Debug, Clone)]
11pub enum ScannerMessage<T: Clone> {
12    /// Data streamed to the subscriber.
13    Data(T),
14
15    /// Notification about scanner state changes or important events.
16    Notification(Notification),
17}
18
19/// Notifications emitted by the scanner to signal state changes or important events.
20#[derive(Copy, Debug, Clone, PartialEq)]
21pub enum Notification {
22    /// Emitted when transitioning from the latest events phase to live streaming mode
23    /// in sync scanners.
24    SwitchingToLive,
25
26    /// When a reorg occurs, the scanner adjusts its position to re-stream events from the
27    /// canonical chain state. The specific behavior depends on the scanning mode (see individual
28    /// scanner mode documentation for details).
29    ///
30    /// # Redundant Notifications
31    ///
32    /// Due to the asynchronous nature of block scanning and log fetching, you may occasionally
33    /// receive this notification even after the reorg has already been accounted for. This happens
34    /// when:
35    ///
36    /// 1. `BlockRangeScanner` validates and emits a block range
37    /// 2. A reorg occurs on the chain
38    /// 3. `EventScanner` fetches logs for that range, but the RPC provider returns logs from the
39    ///    post-reorg chain state (the provider's view has already updated)
40    /// 4. `BlockRangeScanner` detects the reorg on its next check and emits
41    ///    `Notification::ReorgDetected` with a new range starting from the first reorged block
42    /// 5. `EventScanner` re-fetches logs for this range, which may return duplicate logs already
43    ///    delivered in step 3 (the new range might also extend beyond the original range)
44    ///
45    /// **How to handle**: This is a benign race condition. Your application should be designed to
46    /// handle duplicate logs idempotently (e.g., using transaction hashes or log indices as
47    /// deduplication keys). Depending on your application semantics, you may also treat this
48    /// notification as a signal to roll back application state derived from blocks after the
49    /// reported common ancestor.
50    ReorgDetected {
51        /// The block number of the last block that is still valid on the canonical chain.
52        common_ancestor: u64,
53    },
54
55    /// Emitted during the latest events phase when no matching logs are found in the
56    /// scanned range.
57    NoPastLogsFound,
58}
59
60impl<T: Clone> From<Notification> for ScannerMessage<T> {
61    fn from(value: Notification) -> Self {
62        ScannerMessage::Notification(value)
63    }
64}
65
66impl<T: Clone> PartialEq<Notification> for ScannerMessage<T> {
67    fn eq(&self, other: &Notification) -> bool {
68        if let ScannerMessage::Notification(notification) = self {
69            notification == other
70        } else {
71            false
72        }
73    }
74}
75
76/// A convenience `Result` type for scanner streams.
77///
78/// Successful items are [`ScannerMessage`] values; failures are [`ScannerError`].
79pub type ScannerResult<T> = Result<ScannerMessage<T>, ScannerError>;
80
81/// Conversion helper for streaming either data, notifications, or errors.
82pub trait IntoScannerResult<T: Clone> {
83    fn into_scanner_message_result(self) -> ScannerResult<T>;
84}
85
86impl<T: Clone> IntoScannerResult<T> for ScannerResult<T> {
87    fn into_scanner_message_result(self) -> ScannerResult<T> {
88        self
89    }
90}
91
92impl<T: Clone> IntoScannerResult<T> for ScannerMessage<T> {
93    fn into_scanner_message_result(self) -> ScannerResult<T> {
94        Ok(self)
95    }
96}
97
98impl<T: Clone, E: Into<ScannerError>> IntoScannerResult<T> for E {
99    fn into_scanner_message_result(self) -> ScannerResult<T> {
100        Err(self.into())
101    }
102}
103
104impl<T: Clone> IntoScannerResult<T> for Notification {
105    fn into_scanner_message_result(self) -> ScannerResult<T> {
106        Ok(ScannerMessage::Notification(self))
107    }
108}
109
110/// Internal helper for attempting to forward a stream item through an `mpsc` channel.
111pub(crate) trait TryStream<T: Clone> {
112    async fn try_stream<M: IntoScannerResult<T>>(&self, msg: M) -> bool;
113}
114
115impl<T: Clone + Debug> TryStream<T> for mpsc::Sender<ScannerResult<T>> {
116    async fn try_stream<M: IntoScannerResult<T>>(&self, msg: M) -> bool {
117        let item = msg.into_scanner_message_result();
118        self.send(item).await.is_ok()
119    }
120}