event_scanner/
types.rs

1use std::fmt::Debug;
2
3use tokio::sync::mpsc;
4use tracing::{info, warn};
5
6use crate::ScannerError;
7
8/// Messages streamed by the scanner to subscribers.
9///
10/// Each message represents either data or a notification about the scanner's state or behavior.
11#[derive(Copy, Debug, Clone)]
12pub enum ScannerMessage<T: Clone> {
13    /// Data streamed to the subscriber.
14    Data(T),
15
16    /// Notification about scanner state changes or important events.
17    Notification(Notification),
18}
19
20/// Notifications emitted by the scanner to signal state changes or important events.
21#[derive(Copy, Debug, Clone, PartialEq)]
22pub enum Notification {
23    /// Emitted when transitioning from the latest events phase to live streaming mode
24    /// in sync scanners.
25    SwitchingToLive,
26
27    /// Emitted when a blockchain reorganization is detected during scanning.
28    ReorgDetected,
29
30    /// Emitted during the latest events phase when no matching logs are found in the
31    /// scanned range.
32    NoPastLogsFound,
33}
34
35impl<T: Clone> From<Notification> for ScannerMessage<T> {
36    fn from(value: Notification) -> Self {
37        ScannerMessage::Notification(value)
38    }
39}
40
41impl<T: Clone> PartialEq<Notification> for ScannerMessage<T> {
42    fn eq(&self, other: &Notification) -> bool {
43        if let ScannerMessage::Notification(notification) = self {
44            notification == other
45        } else {
46            false
47        }
48    }
49}
50
51pub type ScannerResult<T> = Result<ScannerMessage<T>, ScannerError>;
52
53pub trait IntoScannerResult<T: Clone> {
54    fn into_scanner_message_result(self) -> ScannerResult<T>;
55}
56
57impl<T: Clone> IntoScannerResult<T> for ScannerResult<T> {
58    fn into_scanner_message_result(self) -> ScannerResult<T> {
59        self
60    }
61}
62
63impl<T: Clone> IntoScannerResult<T> for ScannerMessage<T> {
64    fn into_scanner_message_result(self) -> ScannerResult<T> {
65        Ok(self)
66    }
67}
68
69impl<T: Clone, E: Into<ScannerError>> IntoScannerResult<T> for E {
70    fn into_scanner_message_result(self) -> ScannerResult<T> {
71        Err(self.into())
72    }
73}
74
75impl<T: Clone> IntoScannerResult<T> for Notification {
76    fn into_scanner_message_result(self) -> ScannerResult<T> {
77        Ok(ScannerMessage::Notification(self))
78    }
79}
80
81pub(crate) trait TryStream<T: Clone> {
82    async fn try_stream<M: IntoScannerResult<T>>(&self, msg: M) -> bool;
83}
84
85impl<T: Clone + Debug> TryStream<T> for mpsc::Sender<ScannerResult<T>> {
86    async fn try_stream<M: IntoScannerResult<T>>(&self, msg: M) -> bool {
87        let item = msg.into_scanner_message_result();
88        match &item {
89            Ok(msg) => info!(item = ?msg, "Sending message"),
90            Err(err) => info!(error = ?err, "Sending error"),
91        }
92        if let Err(err) = self.send(item).await {
93            warn!(error = %err, "Downstream channel closed, stopping stream");
94            return false;
95        }
96        true
97    }
98}