use std::fmt::Debug;
use tokio::sync::mpsc;
use crate::ScannerError;
#[derive(Debug, Clone, Copy)]
pub(crate) enum ChannelState {
Open,
Closed,
}
impl ChannelState {
#[must_use]
pub(crate) fn is_closed(self) -> bool {
matches!(self, ChannelState::Closed)
}
}
#[derive(Copy, Debug, Clone)]
pub enum ScannerMessage<T: Clone> {
Data(T),
Notification(Notification),
}
#[derive(Copy, Debug, Clone, PartialEq)]
pub enum Notification {
SwitchingToLive,
ReorgDetected {
common_ancestor: u64,
},
NoPastLogsFound,
}
impl<T: Clone> From<Notification> for ScannerMessage<T> {
fn from(value: Notification) -> Self {
ScannerMessage::Notification(value)
}
}
impl<T: Clone> PartialEq<Notification> for ScannerMessage<T> {
fn eq(&self, other: &Notification) -> bool {
if let ScannerMessage::Notification(notification) = self {
notification == other
} else {
false
}
}
}
pub type ScannerResult<T> = Result<ScannerMessage<T>, ScannerError>;
pub trait IntoScannerResult<T: Clone> {
fn into_scanner_message_result(self) -> ScannerResult<T>;
}
impl<T: Clone> IntoScannerResult<T> for ScannerResult<T> {
fn into_scanner_message_result(self) -> ScannerResult<T> {
self
}
}
impl<T: Clone> IntoScannerResult<T> for ScannerMessage<T> {
fn into_scanner_message_result(self) -> ScannerResult<T> {
Ok(self)
}
}
impl<T: Clone, E: Into<ScannerError>> IntoScannerResult<T> for E {
fn into_scanner_message_result(self) -> ScannerResult<T> {
Err(self.into())
}
}
impl<T: Clone> IntoScannerResult<T> for Notification {
fn into_scanner_message_result(self) -> ScannerResult<T> {
Ok(ScannerMessage::Notification(self))
}
}
pub(crate) trait TryStream<T: Clone> {
async fn try_stream<M: IntoScannerResult<T>>(&self, msg: M) -> ChannelState;
}
impl<T: Clone + Debug> TryStream<T> for mpsc::Sender<ScannerResult<T>> {
async fn try_stream<M: IntoScannerResult<T>>(&self, msg: M) -> ChannelState {
let item = msg.into_scanner_message_result();
if self.send(item).await.is_err() {
return ChannelState::Closed;
}
ChannelState::Open
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::ops::RangeInclusive;
use crate::ScannerError;
type TestResult = Result<ScannerMessage<RangeInclusive<u64>>, ScannerError>;
mod channel_state_enum {
use super::*;
#[test]
fn is_closed_returns_false_for_open_state() {
assert!(!ChannelState::Open.is_closed());
}
#[test]
fn is_closed_returns_true_for_closed_state() {
assert!(ChannelState::Closed.is_closed());
}
#[test]
fn channel_state_is_copy() {
let state = ChannelState::Open;
let copied = state; assert!(!state.is_closed()); assert!(!copied.is_closed());
}
#[test]
fn channel_state_debug_format() {
assert_eq!(format!("{:?}", ChannelState::Open), "Open");
assert_eq!(format!("{:?}", ChannelState::Closed), "Closed");
}
}
mod try_stream {
use super::*;
#[tokio::test]
async fn try_stream_returns_open_when_receiver_exists() {
let (tx, _rx) = mpsc::channel::<TestResult>(10);
let result = tx.try_stream(Notification::SwitchingToLive).await;
assert!(!result.is_closed());
}
#[tokio::test]
async fn try_stream_returns_closed_when_receiver_dropped() {
let (tx, rx) = mpsc::channel::<TestResult>(10);
drop(rx);
let result = tx.try_stream(Notification::SwitchingToLive).await;
assert!(result.is_closed());
}
#[tokio::test]
async fn try_stream_sends_message_successfully() {
let (tx, mut rx) = mpsc::channel::<TestResult>(10);
let result = tx.try_stream(Notification::SwitchingToLive).await;
assert!(!result.is_closed());
let received = rx.recv().await.unwrap();
assert!(matches!(
received,
Ok(ScannerMessage::Notification(Notification::SwitchingToLive))
));
}
}
}