use crate::observer::{
observer_complete_proxy_impl, observer_error_proxy_impl,
};
use crate::prelude::*;
use ops::SharedOp;
use std::marker::PhantomData;
pub trait Map<T> {
fn map<B, F>(self, f: F) -> MapOp<Self, F, B>
where
Self: Sized,
F: Fn(B) -> T,
{
MapOp {
source: self,
func: f,
_p: PhantomData,
}
}
}
impl<O, Item> Map<Item> for O {}
pub struct MapOp<S, M, B> {
source: S,
func: M,
_p: PhantomData<B>,
}
impl<Item, B, O, U, S, M> RawSubscribable<Subscriber<O, U>> for MapOp<S, M, B>
where
S: RawSubscribable<Subscriber<MapObserver<O, M>, U>>,
M: FnMut(B) -> Item,
{
type Unsub = S::Unsub;
fn raw_subscribe(self, subscriber: Subscriber<O, U>) -> Self::Unsub {
let map = self.func;
self.source.raw_subscribe(Subscriber {
observer: MapObserver {
observer: subscriber.observer,
map,
},
subscription: subscriber.subscription,
})
}
}
pub struct MapObserver<S, M> {
observer: S,
map: M,
}
impl<Item, S, M, B> ObserverNext<Item> for MapObserver<S, M>
where
S: ObserverNext<B>,
M: FnMut(Item) -> B,
{
fn next(&mut self, value: Item) { self.observer.next((self.map)(value)) }
}
observer_complete_proxy_impl!(MapObserver<O, M>, O, observer, <O, M>);
observer_error_proxy_impl!(MapObserver<O, M>, O, observer, <O, M>);
impl<S, M, B> Fork for MapOp<S, M, B>
where
S: Fork,
M: Clone,
{
type Output = MapOp<S::Output, M, B>;
fn fork(&self) -> Self::Output {
MapOp {
source: self.source.fork(),
func: self.func.clone(),
_p: PhantomData,
}
}
}
impl<S, M> IntoShared for MapObserver<S, M>
where
S: IntoShared,
M: Send + Sync + 'static,
{
type Shared = MapObserver<S::Shared, M>;
fn to_shared(self) -> Self::Shared {
MapObserver {
observer: self.observer.to_shared(),
map: self.map,
}
}
}
impl<S, M, B> IntoShared for MapOp<S, M, B>
where
S: IntoShared,
M: Send + Sync + 'static,
B: Send + Sync + 'static,
{
type Shared = SharedOp<MapOp<S::Shared, M, B>>;
fn to_shared(self) -> Self::Shared {
SharedOp(MapOp {
source: self.source.to_shared(),
func: self.func,
_p: PhantomData,
})
}
}
#[cfg(test)]
mod test {
use crate::{ops::Map, prelude::*};
#[test]
fn primitive_type() {
let mut i = 0;
observable::from_iter(100..101)
.map(|v| v * 2)
.subscribe(|v| i += v);
assert_eq!(i, 200);
}
#[test]
fn reference_lifetime_should_work() {
let mut i = 0;
observable::of(100).map(|v| v).subscribe(|v| i += v);
assert_eq!(i, 100);
}
#[test]
fn fork_and_shared() {
let m = observable::from_iter(0..100).map(|v| v);
m.fork()
.map(|v| v)
.fork()
.to_shared()
.fork()
.to_shared()
.subscribe(|_| {});
let m = observable::from_iter(vec!['a', 'b', 'c']).map(|_v| 1);
m.fork()
.map(|v| v as f32)
.fork()
.to_shared()
.fork()
.to_shared()
.subscribe(|_| {});
let m = observable::of(&1).map(|v| v);
m.fork()
.map(|v| v)
.fork()
.to_shared()
.fork()
.to_shared()
.subscribe(|_| {});
}
#[test]
fn map_types_mixed() {
let mut i = 0;
observable::from_iter(vec!['a', 'b', 'c'])
.map(|_v| 1)
.subscribe(|v| i += v);
assert_eq!(i, 3);
}
}