use crate::ops::SharedOp;
use crate::prelude::*;
use std::marker::PhantomData;
pub trait FilterMap
where
Self: Sized,
{
fn filter_map<F, SourceItem, Item>(
self,
f: F,
) -> FilterMapOp<Self, F, SourceItem>
where
F: FnMut(SourceItem) -> Option<Item>,
{
FilterMapOp {
source: self,
f,
_p: PhantomData,
}
}
}
impl<T> FilterMap for T {}
pub struct FilterMapOp<S, F, I> {
source: S,
f: F,
_p: PhantomData<I>,
}
impl<Item, Err, SourceItem, S, F, O, U>
RawSubscribable<Item, Err, Subscriber<O, U>> for FilterMapOp<S, F, SourceItem>
where
S: RawSubscribable<SourceItem, Err, Subscriber<FilterMapObserver<O, F>, U>>,
F: FnMut(SourceItem) -> Option<Item>,
{
type Unsub = S::Unsub;
fn raw_subscribe(self, subscriber: Subscriber<O, U>) -> Self::Unsub {
self.source.raw_subscribe(Subscriber {
observer: FilterMapObserver {
down_observer: subscriber.observer,
f: self.f,
},
subscription: subscriber.subscription,
})
}
}
unsafe impl<S, F, I> Send for FilterMapOp<S, F, I>
where
S: Send,
F: Send,
{
}
unsafe impl<S, F, I> Sync for FilterMapOp<S, F, I>
where
S: Sync,
F: Sync,
{
}
impl<S, F, I> Fork for FilterMapOp<S, F, I>
where
S: Fork,
F: Clone,
{
type Output = FilterMapOp<S::Output, F, I>;
fn fork(&self) -> Self::Output {
FilterMapOp {
source: self.source.fork(),
f: self.f.clone(),
_p: PhantomData,
}
}
}
impl<S, F, I> IntoShared for FilterMapOp<S, F, I>
where
S: IntoShared,
F: Send + Sync + 'static,
I: 'static,
{
type Shared = SharedOp<FilterMapOp<S::Shared, F, I>>;
fn to_shared(self) -> Self::Shared {
SharedOp(FilterMapOp {
source: self.source.to_shared(),
f: self.f,
_p: PhantomData,
})
}
}
pub struct FilterMapObserver<O, F> {
down_observer: O,
f: F,
}
impl<O, F, Item, Err, OutputItem> Observer<Item, Err>
for FilterMapObserver<O, F>
where
O: Observer<OutputItem, Err>,
F: FnMut(Item) -> Option<OutputItem>,
{
fn next(&mut self, value: Item) {
if let Some(v) = (self.f)(value) {
self.down_observer.next(v)
}
}
#[inline(always)]
fn error(&mut self, err: Err) { self.down_observer.error(err) }
#[inline(always)]
fn complete(&mut self) { self.down_observer.complete() }
}
impl<O, F> IntoShared for FilterMapObserver<O, F>
where
O: IntoShared,
F: Send + Sync + 'static,
{
type Shared = FilterMapObserver<O::Shared, F>;
fn to_shared(self) -> Self::Shared {
FilterMapObserver {
down_observer: self.down_observer.to_shared(),
f: self.f,
}
}
}
#[cfg(test)]
mod test {
use crate::{ops::FilterMap, prelude::*};
#[test]
fn map_types_mixed() {
let mut i = 0;
observable::from_iter(vec!['a', 'b', 'c'])
.filter_map(|_v| Some(1))
.subscribe(|v| i += v);
assert_eq!(i, 3);
}
#[test]
fn filter_map_shared_and_fork() {
observable::of(1)
.filter_map(|_| Some("str"))
.fork()
.to_shared()
.fork()
.to_shared()
.subscribe(|_| {});
}
#[test]
fn filter_map_return_ref() {
observable::of(&1)
.filter_map(Some)
.fork()
.to_shared()
.fork()
.to_shared()
.subscribe(|_| {});
}
}