rx_rust/operators/transforming/
map.rs1use crate::utils::types::NecessarySendSync;
2use crate::{
3 disposable::subscription::Subscription,
4 observable::Observable,
5 observer::{Observer, Termination},
6 utils::types::MarkerType,
7};
8use educe::Educe;
9use std::marker::PhantomData;
10
11#[derive(Educe)]
38#[educe(Debug, Clone)]
39pub struct Map<T0, OE, F> {
40 source: OE,
41 callback: F,
42 _marker: MarkerType<T0>,
43}
44
45impl<T0, OE, F> Map<T0, OE, F> {
46 pub fn new<'or, 'sub, T, E>(source: OE, callback: F) -> Self
47 where
48 OE: Observable<'or, 'sub, T0, E>,
49 F: FnMut(T0) -> T,
50 {
51 Self {
52 source,
53 callback,
54 _marker: PhantomData,
55 }
56 }
57}
58
59impl<'or, 'sub, T0, T, E, OE, F> Observable<'or, 'sub, T, E> for Map<T0, OE, F>
60where
61 OE: Observable<'or, 'sub, T0, E>,
62 F: FnMut(T0) -> T + NecessarySendSync + 'or,
63{
64 fn subscribe(
65 self,
66 observer: impl Observer<T, E> + NecessarySendSync + 'or,
67 ) -> Subscription<'sub> {
68 let observer = MapObserver {
69 observer,
70 callback: self.callback,
71 };
72 self.source.subscribe(observer)
73 }
74}
75
76struct MapObserver<OR, F> {
77 observer: OR,
78 callback: F,
79}
80
81impl<T0, T, E, OR, F> Observer<T0, E> for MapObserver<OR, F>
82where
83 OR: Observer<T, E>,
84 F: FnMut(T0) -> T,
85{
86 fn on_next(&mut self, value: T0) {
87 self.observer.on_next((self.callback)(value))
88 }
89
90 fn on_termination(self, termination: Termination<E>) {
91 self.observer.on_termination(termination)
92 }
93}