use crate::observer::{
observer_complete_proxy_impl, observer_error_proxy_impl,
};
use crate::prelude::*;
use ops::SharedOp;
use std::marker::PhantomData;
pub trait Scan<OutputItem> {
fn scan_initial<InputItem, BinaryOp>(
self,
initial_value: OutputItem,
binary_op: BinaryOp,
) -> ScanOp<Self, BinaryOp, InputItem, OutputItem>
where
Self: Sized,
BinaryOp: Fn(OutputItem, InputItem) -> OutputItem,
{
ScanOp {
source_observable: self,
binary_op,
initial_value,
_p: PhantomData,
}
}
fn scan<InputItem, BinaryOp>(
self,
binary_op: BinaryOp,
) -> ScanOp<Self, BinaryOp, InputItem, OutputItem>
where
Self: Sized,
BinaryOp: Fn(OutputItem, InputItem) -> OutputItem,
OutputItem: Default,
{
self.scan_initial(OutputItem::default(), binary_op)
}
}
impl<O, OutputItem> Scan<OutputItem> for O {}
pub struct ScanOp<Source, BinaryOp, InputItem, OutputItem> {
source_observable: Source,
binary_op: BinaryOp,
initial_value: OutputItem,
_p: PhantomData<InputItem>,
}
pub struct ScanObserver<Observer, BinaryOp, OutputItem> {
target_observer: Observer,
binary_op: BinaryOp,
acc: OutputItem,
}
impl<OutputItem, InputItem, Observer, Subscription, Source, BinaryOp>
RawSubscribable<Subscriber<Observer, Subscription>>
for ScanOp<Source, BinaryOp, InputItem, OutputItem>
where
Source: RawSubscribable<
Subscriber<ScanObserver<Observer, BinaryOp, OutputItem>, Subscription>,
>,
BinaryOp: FnMut(OutputItem, InputItem) -> OutputItem,
{
type Unsub = Source::Unsub;
fn raw_subscribe(
self,
subscriber: Subscriber<Observer, Subscription>,
) -> Self::Unsub {
self.source_observable.raw_subscribe(Subscriber {
observer: ScanObserver {
target_observer: subscriber.observer,
binary_op: self.binary_op,
acc: self.initial_value,
},
subscription: subscriber.subscription,
})
}
}
impl<InputItem, Source, BinaryOp, OutputItem> ObserverNext<InputItem>
for ScanObserver<Source, BinaryOp, OutputItem>
where
Source: ObserverNext<OutputItem>,
BinaryOp: FnMut(OutputItem, InputItem) -> OutputItem,
OutputItem: Clone,
{
fn next(&mut self, value: InputItem) {
self.acc = (self.binary_op)(self.acc.clone(), value);
self.target_observer.next(self.acc.clone())
}
}
observer_error_proxy_impl!(ScanObserver<Source, BinaryOp, OutputItem>,
Source, target_observer, <Source, BinaryOp, OutputItem>);
observer_complete_proxy_impl!(ScanObserver<Source, BinaryOp, OutputItem>,
Source, target_observer, <Source, BinaryOp, OutputItem>);
impl<Source, BinaryOp, InputItem, OutputItem> Fork
for ScanOp<Source, BinaryOp, InputItem, OutputItem>
where
Source: Fork,
BinaryOp: Clone,
OutputItem: Clone,
{
type Output = ScanOp<Source::Output, BinaryOp, InputItem, OutputItem>;
fn fork(&self) -> Self::Output {
ScanOp {
source_observable: self.source_observable.fork(),
binary_op: self.binary_op.clone(),
initial_value: self.initial_value.clone(),
_p: self._p,
}
}
}
impl<Source, BinaryOp, OutputItem> IntoShared
for ScanObserver<Source, BinaryOp, OutputItem>
where
Source: IntoShared,
BinaryOp: Send + Sync + 'static,
OutputItem: Send + Sync + 'static,
{
type Shared = ScanObserver<Source::Shared, BinaryOp, OutputItem>;
fn to_shared(self) -> Self::Shared {
ScanObserver {
target_observer: self.target_observer.to_shared(),
binary_op: self.binary_op,
acc: self.acc,
}
}
}
impl<Source, BinaryOp, InputItem, OutputItem> IntoShared
for ScanOp<Source, BinaryOp, InputItem, OutputItem>
where
Source: IntoShared,
BinaryOp: Send + Sync + 'static,
InputItem: Send + Sync + 'static,
OutputItem: Send + Sync + 'static,
{
type Shared =
SharedOp<ScanOp<Source::Shared, BinaryOp, InputItem, OutputItem>>;
fn to_shared(self) -> Self::Shared {
SharedOp(ScanOp {
source_observable: self.source_observable.to_shared(),
binary_op: self.binary_op,
initial_value: self.initial_value,
_p: self._p,
})
}
}
#[cfg(test)]
mod test {
use crate::{ops::Scan, prelude::*};
#[test]
fn scan_initial() {
let mut emitted = Vec::<i32>::new();
observable::from_iter(vec![1, 1, 1, 1, 1])
.scan_initial(100, |acc, v| acc + v)
.subscribe(|v| emitted.push(v));
assert_eq!(vec!(101, 102, 103, 104, 105), emitted);
}
#[test]
fn scan_initial_on_empty_observable() {
let mut emitted = Vec::<i32>::new();
observable::empty()
.scan_initial(100, |acc, v: i32| acc + v)
.subscribe(|v| emitted.push(v));
assert_eq!(Vec::<i32>::new(), emitted);
}
#[test]
fn scan_initial_mixed_types() {
let mut emitted = Vec::<i32>::new();
observable::from_iter(vec!['a', 'b', 'c', 'd', 'e'])
.scan_initial(100, |acc, _v| acc + 1)
.subscribe(|v| emitted.push(v));
assert_eq!(vec!(101, 102, 103, 104, 105), emitted);
}
#[test]
fn scan_with_default() {
let mut emitted = Vec::<i32>::new();
observable::from_iter(vec![1, 1, 1, 1, 1])
.scan(|acc, v| acc + v)
.subscribe(|v| emitted.push(v));
assert_eq!(vec!(1, 2, 3, 4, 5), emitted);
}
#[test]
fn scan_fork_and_shared_mixed_types() {
let m = observable::from_iter(vec!['a', 'b', 'c']).scan(|_acc, _v| 1i32);
m.fork()
.scan(|_acc, v| v as f32)
.fork()
.to_shared()
.fork()
.to_shared()
.subscribe(|_| {});
}
#[test]
fn scan_fork_and_shared() {
let m = observable::from_iter(0..100).scan(|acc: i32, v| acc + v);
m.fork()
.scan(|acc: i32, v| acc + v)
.fork()
.to_shared()
.fork()
.to_shared()
.subscribe(|_| {});
}
}