use thiserror::Error;
use crate::{Never, ObserverNotification, RxObserver, Signal, SubscriberNotification};
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub enum ObserverTerminalNotification<InError = Never>
where
InError: Signal,
{
Error(InError),
Complete,
}
#[derive(Error, Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub enum ObserverToObserverTerminalNotificationConversionError {
#[error("'Next' is not a terminal signal!")]
CannotReceiveNext,
}
impl<In, InError> TryFrom<ObserverNotification<In, InError>>
for ObserverTerminalNotification<InError>
where
In: Signal,
InError: Signal,
{
type Error = ObserverToObserverTerminalNotificationConversionError;
fn try_from(
value: ObserverNotification<In, InError>,
) -> Result<Self, <ObserverTerminalNotification<InError> as TryFrom<ObserverNotification<In, InError>>>::Error>{
match value {
ObserverNotification::Next(_) => {
Err(ObserverToObserverTerminalNotificationConversionError::CannotReceiveNext)
}
ObserverNotification::Error(error) => Ok(ObserverTerminalNotification::Error(error)),
ObserverNotification::Complete => Ok(ObserverTerminalNotification::Complete),
}
}
}
#[derive(Error, Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub enum SubscriberToObserverTerminalNotificationConversionError {
#[error("'Next' is not a terminal signal!")]
CannotReceiveNext,
#[error("'Unsubscribe' is not a terminal signal!")]
CannotReceiveUnsubscribe,
}
impl<In, InError> TryFrom<SubscriberNotification<In, InError>>
for ObserverTerminalNotification<InError>
where
In: Signal,
InError: Signal,
{
type Error = SubscriberToObserverTerminalNotificationConversionError;
fn try_from(
value: SubscriberNotification<In, InError>,
) -> Result<Self, <ObserverTerminalNotification<InError> as TryFrom<SubscriberNotification<In, InError>>>::Error>{
match value {
SubscriberNotification::Next(_) => {
Err(SubscriberToObserverTerminalNotificationConversionError::CannotReceiveNext)
}
SubscriberNotification::Error(error) => Ok(ObserverTerminalNotification::Error(error)),
SubscriberNotification::Complete => Ok(ObserverTerminalNotification::Complete),
SubscriberNotification::Unsubscribe => Err(
SubscriberToObserverTerminalNotificationConversionError::CannotReceiveUnsubscribe,
),
}
}
}
pub trait ObserverPushObserverTerminalNotificationExtention: RxObserver {
fn push(&mut self, notification: impl Into<ObserverTerminalNotification<Self::InError>>);
}
impl<T> ObserverPushObserverTerminalNotificationExtention for T
where
T: RxObserver,
{
fn push(&mut self, notification: impl Into<ObserverTerminalNotification<Self::InError>>) {
match notification.into() {
ObserverTerminalNotification::Error(error) => self.error(error),
ObserverTerminalNotification::Complete => self.complete(),
}
}
}
#[cfg(test)]
mod test {
use super::*;
mod push {
use derive_where::derive_where;
use rx_core_macro_observer_derive::RxObserver;
use crate::{PhantomInvariant, RxObserver};
use super::*;
#[derive_where(Default)]
#[derive(RxObserver)]
#[_rx_core_common_crate(crate)]
#[rx_in(In)]
#[rx_in_error(InError)]
struct MockObserver<In, InError>
where
In: Signal,
InError: Signal,
{
error: Option<InError>,
complete: bool,
_phantom_data: PhantomInvariant<In>,
}
impl<In, InError> RxObserver for MockObserver<In, InError>
where
In: Signal,
InError: Signal,
{
fn next(&mut self, _next: Self::In) {
unreachable!("not used for this test")
}
fn error(&mut self, error: Self::InError) {
self.error.replace(error);
}
fn complete(&mut self) {
self.complete = true;
}
}
#[test]
fn should_call_error_when_pushing_an_error_notification() {
let mut observer = MockObserver::<usize, &'static str>::default();
let error = "error";
observer.push(ObserverTerminalNotification::Error(error));
assert_eq!(observer.error, Some(error));
}
#[test]
fn should_call_complete_when_pushing_a_complete_notification() {
let mut observer = MockObserver::<usize, &'static str>::default();
observer.push(ObserverTerminalNotification::Complete);
assert!(observer.complete);
}
}
mod observer_notification_conversion {
use super::*;
#[test]
fn should_convert_error() {
let error = "error";
let observer_terminal_notification: Result<
ObserverTerminalNotification<&'static str>,
_,
> = ObserverNotification::<usize, _>::Error(error).try_into();
assert_eq!(
observer_terminal_notification,
Ok(ObserverTerminalNotification::Error(error))
);
}
#[test]
fn should_convert_complete() {
let observer_terminal_notification: Result<
ObserverTerminalNotification<&'static str>,
_,
> = ObserverNotification::<usize, _>::Complete.try_into();
assert_eq!(
observer_terminal_notification,
Ok(ObserverTerminalNotification::Complete)
);
}
}
mod subscriber_notification_conversion {
use super::*;
#[test]
fn should_not_convert_next() {
let observer_terminal_notification: Result<ObserverTerminalNotification, _> =
SubscriberNotification::Next(1).try_into();
assert_eq!(
observer_terminal_notification,
Err(SubscriberToObserverTerminalNotificationConversionError::CannotReceiveNext)
);
}
#[test]
fn should_convert_error() {
let error = "error";
let observer_terminal_notification: Result<
ObserverTerminalNotification<&'static str>,
_,
> = SubscriberNotification::<usize, _>::Error(error).try_into();
assert_eq!(
observer_terminal_notification,
Ok(ObserverTerminalNotification::Error(error))
);
}
#[test]
fn should_convert_complete() {
let observer_terminal_notification: Result<ObserverTerminalNotification, _> =
SubscriberNotification::<usize, _>::Complete.try_into();
assert_eq!(
observer_terminal_notification,
Ok(ObserverTerminalNotification::Complete)
);
}
#[test]
fn should_not_convert_unsubscribe() {
let observer_terminal_notification: Result<ObserverTerminalNotification, _> =
SubscriberNotification::<usize, _>::Unsubscribe.try_into();
assert_eq!(
observer_terminal_notification,
Err(SubscriberToObserverTerminalNotificationConversionError::CannotReceiveUnsubscribe)
);
}
}
}