event_scanner/
types.rs

1use std::{error::Error, fmt::Debug};
2
3use tokio::sync::mpsc;
4use tracing::{info, warn};
5
6#[derive(Copy, Debug, Clone)]
7pub enum ScannerMessage<T: Clone, E: Error + Clone> {
8    Data(T),
9    Error(E),
10    Status(ScannerStatus),
11}
12
13#[derive(Copy, Debug, Clone, PartialEq)]
14pub enum ScannerStatus {
15    SwitchingToLive,
16    ReorgDetected,
17}
18
19impl<T: Clone, E: Error + Clone> From<ScannerStatus> for ScannerMessage<T, E> {
20    fn from(value: ScannerStatus) -> Self {
21        ScannerMessage::Status(value)
22    }
23}
24
25impl<T: Clone, E: Error + Clone> PartialEq<ScannerStatus> for ScannerMessage<T, E> {
26    fn eq(&self, other: &ScannerStatus) -> bool {
27        if let ScannerMessage::Status(status) = self { status == other } else { false }
28    }
29}
30
31pub(crate) trait TryStream<T: Clone, E: Error + Clone> {
32    async fn try_stream<M: Into<ScannerMessage<T, E>>>(&self, msg: M) -> bool;
33}
34
35impl<T: Clone + Debug, E: Error + Clone> TryStream<T, E> for mpsc::Sender<ScannerMessage<T, E>> {
36    async fn try_stream<M: Into<ScannerMessage<T, E>>>(&self, msg: M) -> bool {
37        let msg = msg.into();
38        info!(msg = ?msg, "Sending message");
39        if let Err(err) = self.send(msg).await {
40            warn!(error = %err, "Downstream channel closed, stopping stream");
41            return false;
42        }
43        true
44    }
45}