event_scanner/
types.rs

1use std::fmt::Debug;
2
3use tokio::sync::mpsc;
4use tracing::{info, warn};
5
6use crate::ScannerError;
7
8#[derive(Debug, Clone)]
9pub enum ScannerMessage<T: Clone> {
10    Data(T),
11    Notification(Notification),
12}
13
14#[derive(Copy, Debug, Clone, PartialEq)]
15pub enum Notification {
16    SwitchingToLive,
17    ReorgDetected,
18}
19
20impl<T: Clone> From<Notification> for ScannerMessage<T> {
21    fn from(value: Notification) -> Self {
22        ScannerMessage::Notification(value)
23    }
24}
25
26impl<T: Clone> PartialEq<Notification> for ScannerMessage<T> {
27    fn eq(&self, other: &Notification) -> bool {
28        if let ScannerMessage::Notification(notification) = self {
29            notification == other
30        } else {
31            false
32        }
33    }
34}
35
36pub type ScannerResult<T> = Result<ScannerMessage<T>, ScannerError>;
37
38pub trait IntoScannerResult<T: Clone> {
39    fn into_scanner_message_result(self) -> ScannerResult<T>;
40}
41
42impl<T: Clone> IntoScannerResult<T> for ScannerResult<T> {
43    fn into_scanner_message_result(self) -> ScannerResult<T> {
44        self
45    }
46}
47
48impl<T: Clone> IntoScannerResult<T> for ScannerMessage<T> {
49    fn into_scanner_message_result(self) -> ScannerResult<T> {
50        Ok(self)
51    }
52}
53
54impl<T: Clone, E: Into<ScannerError>> IntoScannerResult<T> for E {
55    fn into_scanner_message_result(self) -> ScannerResult<T> {
56        Err(self.into())
57    }
58}
59
60impl<T: Clone> IntoScannerResult<T> for Notification {
61    fn into_scanner_message_result(self) -> ScannerResult<T> {
62        Ok(ScannerMessage::Notification(self))
63    }
64}
65
66pub(crate) trait TryStream<T: Clone> {
67    async fn try_stream<M: IntoScannerResult<T>>(&self, msg: M) -> bool;
68}
69
70impl<T: Clone + Debug> TryStream<T> for mpsc::Sender<ScannerResult<T>> {
71    async fn try_stream<M: IntoScannerResult<T>>(&self, msg: M) -> bool {
72        let item = msg.into_scanner_message_result();
73        match &item {
74            Ok(msg) => info!(item = ?msg, "Sending message"),
75            Err(err) => info!(error = ?err, "Sending error"),
76        }
77        if let Err(err) = self.send(item).await {
78            warn!(error = %err, "Downstream channel closed, stopping stream");
79            return false;
80        }
81        true
82    }
83}