1use std::fmt::Debug;
2
3use tokio::sync::mpsc;
4
5use crate::ScannerError;
6
7#[derive(Debug, Clone, Copy)]
12pub(crate) enum ChannelState {
13 Open,
15 Closed,
17}
18
19impl ChannelState {
20 #[must_use]
22 pub(crate) fn is_closed(self) -> bool {
23 matches!(self, ChannelState::Closed)
24 }
25}
26
27#[derive(Copy, Debug, Clone)]
31pub enum ScannerMessage<T: Clone> {
32 Data(T),
34
35 Notification(Notification),
37}
38
39#[derive(Copy, Debug, Clone, PartialEq)]
41pub enum Notification {
42 SwitchingToLive,
45
46 ReorgDetected {
71 common_ancestor: u64,
73 },
74
75 NoPastLogsFound,
78}
79
80impl<T: Clone> From<Notification> for ScannerMessage<T> {
81 fn from(value: Notification) -> Self {
82 ScannerMessage::Notification(value)
83 }
84}
85
86impl<T: Clone> PartialEq<Notification> for ScannerMessage<T> {
87 fn eq(&self, other: &Notification) -> bool {
88 if let ScannerMessage::Notification(notification) = self {
89 notification == other
90 } else {
91 false
92 }
93 }
94}
95
96pub type ScannerResult<T> = Result<ScannerMessage<T>, ScannerError>;
100
101pub trait IntoScannerResult<T: Clone> {
103 fn into_scanner_message_result(self) -> ScannerResult<T>;
104}
105
106impl<T: Clone> IntoScannerResult<T> for ScannerResult<T> {
107 fn into_scanner_message_result(self) -> ScannerResult<T> {
108 self
109 }
110}
111
112impl<T: Clone> IntoScannerResult<T> for ScannerMessage<T> {
113 fn into_scanner_message_result(self) -> ScannerResult<T> {
114 Ok(self)
115 }
116}
117
118impl<T: Clone, E: Into<ScannerError>> IntoScannerResult<T> for E {
119 fn into_scanner_message_result(self) -> ScannerResult<T> {
120 Err(self.into())
121 }
122}
123
124impl<T: Clone> IntoScannerResult<T> for Notification {
125 fn into_scanner_message_result(self) -> ScannerResult<T> {
126 Ok(ScannerMessage::Notification(self))
127 }
128}
129
130pub(crate) trait TryStream<T: Clone> {
132 async fn try_stream<M: IntoScannerResult<T>>(&self, msg: M) -> ChannelState;
133}
134
135impl<T: Clone + Debug> TryStream<T> for mpsc::Sender<ScannerResult<T>> {
136 async fn try_stream<M: IntoScannerResult<T>>(&self, msg: M) -> ChannelState {
137 let item = msg.into_scanner_message_result();
138 if self.send(item).await.is_err() {
139 return ChannelState::Closed;
140 }
141 ChannelState::Open
142 }
143}
144
145#[cfg(test)]
146mod tests {
147 use super::*;
148 use std::ops::RangeInclusive;
149
150 use crate::ScannerError;
151
152 type TestResult = Result<ScannerMessage<RangeInclusive<u64>>, ScannerError>;
154
155 mod channel_state_enum {
156 use super::*;
157
158 #[test]
159 fn is_closed_returns_false_for_open_state() {
160 assert!(!ChannelState::Open.is_closed());
161 }
162
163 #[test]
164 fn is_closed_returns_true_for_closed_state() {
165 assert!(ChannelState::Closed.is_closed());
166 }
167
168 #[test]
169 fn channel_state_is_copy() {
170 let state = ChannelState::Open;
171 let copied = state; assert!(!state.is_closed()); assert!(!copied.is_closed());
174 }
175
176 #[test]
177 fn channel_state_debug_format() {
178 assert_eq!(format!("{:?}", ChannelState::Open), "Open");
179 assert_eq!(format!("{:?}", ChannelState::Closed), "Closed");
180 }
181 }
182
183 mod try_stream {
184 use super::*;
185
186 #[tokio::test]
187 async fn try_stream_returns_open_when_receiver_exists() {
188 let (tx, _rx) = mpsc::channel::<TestResult>(10);
189
190 let result = tx.try_stream(Notification::SwitchingToLive).await;
191
192 assert!(!result.is_closed());
193 }
194
195 #[tokio::test]
196 async fn try_stream_returns_closed_when_receiver_dropped() {
197 let (tx, rx) = mpsc::channel::<TestResult>(10);
198 drop(rx); let result = tx.try_stream(Notification::SwitchingToLive).await;
201
202 assert!(result.is_closed());
203 }
204
205 #[tokio::test]
206 async fn try_stream_sends_message_successfully() {
207 let (tx, mut rx) = mpsc::channel::<TestResult>(10);
208
209 let result = tx.try_stream(Notification::SwitchingToLive).await;
210
211 assert!(!result.is_closed());
212
213 let received = rx.recv().await.unwrap();
215 assert!(matches!(
216 received,
217 Ok(ScannerMessage::Notification(Notification::SwitchingToLive))
218 ));
219 }
220 }
221}