1use std::{error::Error, fmt::Debug};
2
3use tokio::sync::mpsc;
4use tracing::{info, warn};
5
6#[derive(Copy, Debug, Clone)]
7pub enum ScannerMessage<T: Clone, E: Error + Clone> {
8 Data(T),
9 Error(E),
10 Status(ScannerStatus),
11}
12
13#[derive(Copy, Debug, Clone, PartialEq)]
14pub enum ScannerStatus {
15 SwitchingToLive,
16 ReorgDetected,
17}
18
19impl<T: Clone, E: Error + Clone> From<ScannerStatus> for ScannerMessage<T, E> {
20 fn from(value: ScannerStatus) -> Self {
21 ScannerMessage::Status(value)
22 }
23}
24
25impl<T: Clone, E: Error + Clone> PartialEq<ScannerStatus> for ScannerMessage<T, E> {
26 fn eq(&self, other: &ScannerStatus) -> bool {
27 if let ScannerMessage::Status(status) = self { status == other } else { false }
28 }
29}
30
31pub(crate) trait TryStream<T: Clone, E: Error + Clone> {
32 async fn try_stream<M: Into<ScannerMessage<T, E>>>(&self, msg: M) -> bool;
33}
34
35impl<T: Clone + Debug, E: Error + Clone> TryStream<T, E> for mpsc::Sender<ScannerMessage<T, E>> {
36 async fn try_stream<M: Into<ScannerMessage<T, E>>>(&self, msg: M) -> bool {
37 let msg = msg.into();
38 info!(msg = ?msg, "Sending message");
39 if let Err(err) = self.send(msg).await {
40 warn!(error = %err, "Downstream channel closed, stopping stream");
41 return false;
42 }
43 true
44 }
45}