rx_rust/operators/others/
map_infallible_to_error.rs

1use crate::utils::types::NecessarySendSync;
2use crate::{
3    disposable::subscription::Subscription,
4    observable::Observable,
5    observer::{Observer, Termination},
6    utils::types::MarkerType,
7};
8use educe::Educe;
9use std::{convert::Infallible, marker::PhantomData};
10
11/// Maps an Observable with an `Infallible` error type to an Observable with a concrete error type.
12///
13/// # Examples
14/// ```rust
15/// use rx_rust::{
16///     observable::observable_ext::ObservableExt,
17///     observer::Termination,
18///     operators::{
19///         creating::from_iter::FromIter,
20///         others::map_infallible_to_error::MapInfallibleToError,
21///     },
22/// };
23///
24/// let mut values = Vec::new();
25/// let mut terminations = Vec::new();
26///
27/// MapInfallibleToError::<String, _>::new(FromIter::new(vec![1, 2]))
28///     .subscribe_with_callback(
29///         |value| values.push(value),
30///         |termination| terminations.push(termination),
31///     );
32///
33/// assert_eq!(values, vec![1, 2]);
34/// assert_eq!(terminations, vec![Termination::Completed]);
35/// ```
36#[derive(Educe)]
37#[educe(Debug, Clone)]
38pub struct MapInfallibleToError<E, OE> {
39    source: OE,
40    _marker: MarkerType<E>,
41}
42
43impl<E, OE> MapInfallibleToError<E, OE> {
44    pub fn new(source: OE) -> Self {
45        Self {
46            source,
47            _marker: PhantomData,
48        }
49    }
50}
51
52impl<'or, 'sub, T, E, OE> Observable<'or, 'sub, T, E> for MapInfallibleToError<E, OE>
53where
54    E: 'or,
55    OE: Observable<'or, 'sub, T, Infallible>,
56{
57    fn subscribe(
58        self,
59        observer: impl Observer<T, E> + NecessarySendSync + 'or,
60    ) -> Subscription<'sub> {
61        let observer = MapInfallibleToErrorObserver {
62            observer,
63            _marker: PhantomData,
64        };
65        self.source.subscribe(observer)
66    }
67}
68
69struct MapInfallibleToErrorObserver<E, OR> {
70    observer: OR,
71    _marker: MarkerType<E>,
72}
73
74impl<T, E, OR> Observer<T, Infallible> for MapInfallibleToErrorObserver<E, OR>
75where
76    OR: Observer<T, E>,
77{
78    fn on_next(&mut self, value: T) {
79        self.observer.on_next(value);
80    }
81
82    fn on_termination(self, termination: Termination<Infallible>) {
83        match termination {
84            Termination::Completed => self.observer.on_termination(Termination::Completed),
85            Termination::Error(_) => unreachable!(),
86        }
87    }
88}