Skip to main content

event_scanner/
types.rs

1use std::fmt::Debug;
2
3use tokio::sync::mpsc;
4
5use crate::ScannerError;
6
7/// Represents the state of a channel after attempting to send a message.
8///
9/// This enum provides explicit semantics for channel operations, making it clear
10/// whether the downstream receiver is still listening or has been dropped.
11#[derive(Debug, Clone, Copy)]
12pub(crate) enum ChannelState {
13    /// The channel is open and the message was successfully sent.
14    Open,
15    /// The channel is closed (receiver dropped), no further messages can be sent.
16    Closed,
17}
18
19impl ChannelState {
20    /// Returns `true` if the channel is closed.
21    #[must_use]
22    pub(crate) fn is_closed(self) -> bool {
23        matches!(self, ChannelState::Closed)
24    }
25}
26
27/// Messages streamed by the scanner to subscribers.
28///
29/// Each message represents either data or a notification about the scanner's state or behavior.
30#[derive(Copy, Debug, Clone)]
31pub enum ScannerMessage<T: Clone> {
32    /// Data streamed to the subscriber.
33    Data(T),
34
35    /// Notification about scanner state changes or important events.
36    Notification(Notification),
37}
38
39/// Notifications emitted by the scanner to signal state changes or important events.
40#[derive(Copy, Debug, Clone, PartialEq)]
41pub enum Notification {
42    /// Emitted when transitioning from the latest events phase to live streaming mode
43    /// in sync scanners.
44    SwitchingToLive,
45
46    /// When a reorg occurs, the scanner adjusts its position to re-stream events from the
47    /// canonical chain state. The specific behavior depends on the scanning mode (see individual
48    /// scanner mode documentation for details).
49    ///
50    /// # Redundant Notifications
51    ///
52    /// Due to the asynchronous nature of block scanning and log fetching, you may occasionally
53    /// receive this notification even after the reorg has already been accounted for. This happens
54    /// when:
55    ///
56    /// 1. `BlockRangeScanner` validates and emits a block range
57    /// 2. A reorg occurs on the chain
58    /// 3. `EventScanner` fetches logs for that range, but the RPC provider returns logs from the
59    ///    post-reorg chain state (the provider's view has already updated)
60    /// 4. `BlockRangeScanner` detects the reorg on its next check and emits
61    ///    `Notification::ReorgDetected` with a new range starting from the first reorged block
62    /// 5. `EventScanner` re-fetches logs for this range, which may return duplicate logs already
63    ///    delivered in step 3 (the new range might also extend beyond the original range)
64    ///
65    /// **How to handle**: This is a benign race condition. Your application should be designed to
66    /// handle duplicate logs idempotently (e.g., using transaction hashes or log indices as
67    /// deduplication keys). Depending on your application semantics, you may also treat this
68    /// notification as a signal to roll back application state derived from blocks after the
69    /// reported common ancestor.
70    ReorgDetected {
71        /// The block number of the last block that is still valid on the canonical chain.
72        common_ancestor: u64,
73    },
74
75    /// Emitted during the latest events phase when no matching logs are found in the
76    /// scanned range.
77    NoPastLogsFound,
78}
79
80impl<T: Clone> From<Notification> for ScannerMessage<T> {
81    fn from(value: Notification) -> Self {
82        ScannerMessage::Notification(value)
83    }
84}
85
86impl<T: Clone> PartialEq<Notification> for ScannerMessage<T> {
87    fn eq(&self, other: &Notification) -> bool {
88        if let ScannerMessage::Notification(notification) = self {
89            notification == other
90        } else {
91            false
92        }
93    }
94}
95
96/// A convenience `Result` type for scanner streams.
97///
98/// Successful items are [`ScannerMessage`] values; failures are [`ScannerError`].
99pub type ScannerResult<T> = Result<ScannerMessage<T>, ScannerError>;
100
101/// Conversion helper for streaming either data, notifications, or errors.
102pub trait IntoScannerResult<T: Clone> {
103    fn into_scanner_message_result(self) -> ScannerResult<T>;
104}
105
106impl<T: Clone> IntoScannerResult<T> for ScannerResult<T> {
107    fn into_scanner_message_result(self) -> ScannerResult<T> {
108        self
109    }
110}
111
112impl<T: Clone> IntoScannerResult<T> for ScannerMessage<T> {
113    fn into_scanner_message_result(self) -> ScannerResult<T> {
114        Ok(self)
115    }
116}
117
118impl<T: Clone, E: Into<ScannerError>> IntoScannerResult<T> for E {
119    fn into_scanner_message_result(self) -> ScannerResult<T> {
120        Err(self.into())
121    }
122}
123
124impl<T: Clone> IntoScannerResult<T> for Notification {
125    fn into_scanner_message_result(self) -> ScannerResult<T> {
126        Ok(ScannerMessage::Notification(self))
127    }
128}
129
130/// Internal helper for attempting to forward a stream item through an `mpsc` channel.
131pub(crate) trait TryStream<T: Clone> {
132    async fn try_stream<M: IntoScannerResult<T>>(&self, msg: M) -> ChannelState;
133}
134
135impl<T: Clone + Debug> TryStream<T> for mpsc::Sender<ScannerResult<T>> {
136    async fn try_stream<M: IntoScannerResult<T>>(&self, msg: M) -> ChannelState {
137        let item = msg.into_scanner_message_result();
138        if self.send(item).await.is_err() {
139            return ChannelState::Closed;
140        }
141        ChannelState::Open
142    }
143}
144
145#[cfg(test)]
146mod tests {
147    use super::*;
148    use std::ops::RangeInclusive;
149
150    use crate::ScannerError;
151
152    /// Type alias for test results.
153    type TestResult = Result<ScannerMessage<RangeInclusive<u64>>, ScannerError>;
154
155    mod channel_state_enum {
156        use super::*;
157
158        #[test]
159        fn is_closed_returns_false_for_open_state() {
160            assert!(!ChannelState::Open.is_closed());
161        }
162
163        #[test]
164        fn is_closed_returns_true_for_closed_state() {
165            assert!(ChannelState::Closed.is_closed());
166        }
167
168        #[test]
169        fn channel_state_is_copy() {
170            let state = ChannelState::Open;
171            let copied = state; // Copy, not move
172            assert!(!state.is_closed()); // Both are still valid
173            assert!(!copied.is_closed());
174        }
175
176        #[test]
177        fn channel_state_debug_format() {
178            assert_eq!(format!("{:?}", ChannelState::Open), "Open");
179            assert_eq!(format!("{:?}", ChannelState::Closed), "Closed");
180        }
181    }
182
183    mod try_stream {
184        use super::*;
185
186        #[tokio::test]
187        async fn try_stream_returns_open_when_receiver_exists() {
188            let (tx, _rx) = mpsc::channel::<TestResult>(10);
189
190            let result = tx.try_stream(Notification::SwitchingToLive).await;
191
192            assert!(!result.is_closed());
193        }
194
195        #[tokio::test]
196        async fn try_stream_returns_closed_when_receiver_dropped() {
197            let (tx, rx) = mpsc::channel::<TestResult>(10);
198            drop(rx); // Drop the receiver to close the channel
199
200            let result = tx.try_stream(Notification::SwitchingToLive).await;
201
202            assert!(result.is_closed());
203        }
204
205        #[tokio::test]
206        async fn try_stream_sends_message_successfully() {
207            let (tx, mut rx) = mpsc::channel::<TestResult>(10);
208
209            let result = tx.try_stream(Notification::SwitchingToLive).await;
210
211            assert!(!result.is_closed());
212
213            // Verify the message was actually sent
214            let received = rx.recv().await.unwrap();
215            assert!(matches!(
216                received,
217                Ok(ScannerMessage::Notification(Notification::SwitchingToLive))
218            ));
219        }
220    }
221}