event_scanner/
types.rs

1use std::fmt::Debug;
2
3use tokio::sync::mpsc;
4use tracing::{info, warn};
5
6use crate::ScannerError;
7
8/// Messages streamed by the scanner to subscribers.
9///
10/// Each message represents either data or a notification about the scanner's state or behavior.
11#[derive(Copy, Debug, Clone)]
12pub enum ScannerMessage<T: Clone> {
13    /// Data streamed to the subscriber.
14    Data(T),
15
16    /// Notification about scanner state changes or important events.
17    Notification(Notification),
18}
19
20/// Notifications emitted by the scanner to signal state changes or important events.
21#[derive(Copy, Debug, Clone, PartialEq)]
22pub enum Notification {
23    /// Emitted when transitioning from the latest events phase to live streaming mode
24    /// in sync scanners.
25    SwitchingToLive,
26
27    /// When a reorg occurs, the scanner adjusts its position to re-stream events from the
28    /// canonical chain state. The specific behavior depends on the scanning mode (see individual
29    /// scanner mode documentation for details).
30    ///
31    /// # Redundant Notifications
32    ///
33    /// Due to the asynchronous nature of block scanning and log fetching, you may occasionally
34    /// receive this notification even after the reorg has already been accounted for. This happens
35    /// when:
36    ///
37    /// 1. `BlockRangeScanner` validates and emits a block range
38    /// 2. A reorg occurs on the chain
39    /// 3. `EventScanner` fetches logs for that range, but the RPC provider returns logs from the
40    ///    post-reorg chain state (the provider's view has already updated)
41    /// 4. `BlockRangeScanner` detects the reorg on its next check and emits
42    ///    `Notification::ReorgDetected` with a new range starting from the first reorged block
43    /// 5. `EventScanner` re-fetches logs for this range, which may return duplicate logs already
44    ///    delivered in step 3 (the new range might also extend beyond the original range)
45    ///
46    /// **How to handle**: This is a benign race condition. Your application should be designed to
47    /// handle duplicate logs idempotently (e.g., using transaction hashes or log indices as
48    /// deduplication keys). The scanner prioritizes correctness by ensuring all logs from the
49    /// canonical chain are delivered, even if it means occasional duplicates during reorgs.
50    ///
51    /// The `common_ancestor` field contains the block number of the last block
52    /// that is still valid on the canonical chain.
53    ReorgDetected { common_ancestor: u64 },
54
55    /// Emitted during the latest events phase when no matching logs are found in the
56    /// scanned range.
57    NoPastLogsFound,
58}
59
60impl<T: Clone> From<Notification> for ScannerMessage<T> {
61    fn from(value: Notification) -> Self {
62        ScannerMessage::Notification(value)
63    }
64}
65
66impl<T: Clone> PartialEq<Notification> for ScannerMessage<T> {
67    fn eq(&self, other: &Notification) -> bool {
68        if let ScannerMessage::Notification(notification) = self {
69            notification == other
70        } else {
71            false
72        }
73    }
74}
75
76pub type ScannerResult<T> = Result<ScannerMessage<T>, ScannerError>;
77
78pub trait IntoScannerResult<T: Clone> {
79    fn into_scanner_message_result(self) -> ScannerResult<T>;
80}
81
82impl<T: Clone> IntoScannerResult<T> for ScannerResult<T> {
83    fn into_scanner_message_result(self) -> ScannerResult<T> {
84        self
85    }
86}
87
88impl<T: Clone> IntoScannerResult<T> for ScannerMessage<T> {
89    fn into_scanner_message_result(self) -> ScannerResult<T> {
90        Ok(self)
91    }
92}
93
94impl<T: Clone, E: Into<ScannerError>> IntoScannerResult<T> for E {
95    fn into_scanner_message_result(self) -> ScannerResult<T> {
96        Err(self.into())
97    }
98}
99
100impl<T: Clone> IntoScannerResult<T> for Notification {
101    fn into_scanner_message_result(self) -> ScannerResult<T> {
102        Ok(ScannerMessage::Notification(self))
103    }
104}
105
106pub(crate) trait TryStream<T: Clone> {
107    async fn try_stream<M: IntoScannerResult<T>>(&self, msg: M) -> bool;
108}
109
110impl<T: Clone + Debug> TryStream<T> for mpsc::Sender<ScannerResult<T>> {
111    async fn try_stream<M: IntoScannerResult<T>>(&self, msg: M) -> bool {
112        let item = msg.into_scanner_message_result();
113        match &item {
114            Ok(msg) => info!(item = ?msg, "Sending message"),
115            Err(err) => info!(error = ?err, "Sending error"),
116        }
117        if let Err(err) = self.send(item).await {
118            warn!(error = %err, "Downstream channel closed, stopping stream");
119            return false;
120        }
121        true
122    }
123}