use crate::observer::{
complete_proxy_impl, error_proxy_impl, is_stopped_proxy_impl,
};
use crate::prelude::*;
#[derive(Clone)]
pub struct ScanOp<Source, BinaryOp, OutputItem> {
pub(crate) source_observable: Source,
pub(crate) binary_op: BinaryOp,
pub(crate) initial_value: OutputItem,
}
pub struct ScanObserver<Observer, BinaryOp, OutputItem> {
target_observer: Observer,
binary_op: BinaryOp,
acc: OutputItem,
}
#[doc(hidden)]
macro observable_impl($subscription:ty, $($marker:ident +)* $lf: lifetime) {
fn actual_subscribe<O: Observer<Self::Item, Self::Err> + $($marker +)* $lf>(
self,
subscriber: Subscriber<O, $subscription>,
) -> Self::Unsub {
self.source_observable.actual_subscribe(Subscriber {
observer: ScanObserver {
target_observer: subscriber.observer,
binary_op: self.binary_op,
acc: self.initial_value,
},
subscription: subscriber.subscription,
})
}
}
impl<OutputItem, Source, BinaryOp> Observable
for ScanOp<Source, BinaryOp, OutputItem>
where
Source: Observable,
OutputItem: Clone,
BinaryOp: FnMut(OutputItem, Source::Item) -> OutputItem,
{
type Item = OutputItem;
type Err = Source::Err;
}
impl<'a, OutputItem, Source, BinaryOp> LocalObservable<'a>
for ScanOp<Source, BinaryOp, OutputItem>
where
Source: LocalObservable<'a>,
OutputItem: Clone + 'a,
BinaryOp: FnMut(OutputItem, Source::Item) -> OutputItem + 'a,
{
type Unsub = Source::Unsub;
observable_impl!(LocalSubscription, 'a);
}
impl<OutputItem, Source, BinaryOp> SharedObservable
for ScanOp<Source, BinaryOp, OutputItem>
where
Source: SharedObservable,
OutputItem: Clone + Send + Sync + 'static,
BinaryOp:
FnMut(OutputItem, Source::Item) -> OutputItem + Send + Sync + 'static,
{
type Unsub = Source::Unsub;
observable_impl!(SharedSubscription, Send + Sync + 'static);
}
impl<InputItem, Err, Source, BinaryOp, OutputItem> Observer<InputItem, Err>
for ScanObserver<Source, BinaryOp, OutputItem>
where
Source: Observer<OutputItem, Err>,
BinaryOp: FnMut(OutputItem, InputItem) -> OutputItem,
OutputItem: Clone,
{
fn next(&mut self, value: InputItem) {
self.acc = (self.binary_op)(self.acc.clone(), value);
self.target_observer.next(self.acc.clone())
}
error_proxy_impl!(Err, target_observer);
complete_proxy_impl!(target_observer);
is_stopped_proxy_impl!(target_observer);
}
#[cfg(test)]
mod test {
use crate::prelude::*;
#[test]
fn scan_initial() {
let mut emitted = Vec::<i32>::new();
observable::from_iter(vec![1, 1, 1, 1, 1])
.scan_initial(100, |acc, v| acc + v)
.subscribe(|v| emitted.push(v));
assert_eq!(vec!(101, 102, 103, 104, 105), emitted);
}
#[test]
fn scan_initial_on_empty_observable() {
let mut emitted = Vec::<i32>::new();
observable::empty()
.scan_initial(100, |acc, v: i32| acc + v)
.subscribe(|v| emitted.push(v));
assert_eq!(Vec::<i32>::new(), emitted);
}
#[test]
fn scan_initial_mixed_types() {
let mut emitted = Vec::<i32>::new();
observable::from_iter(vec!['a', 'b', 'c', 'd', 'e'])
.scan_initial(100, |acc, _v| acc + 1)
.subscribe(|v| emitted.push(v));
assert_eq!(vec!(101, 102, 103, 104, 105), emitted);
}
#[test]
fn scan_with_default() {
let mut emitted = Vec::<i32>::new();
observable::from_iter(vec![1, 1, 1, 1, 1])
.scan(|acc, v| acc + v)
.subscribe(|v| emitted.push(v));
assert_eq!(vec!(1, 2, 3, 4, 5), emitted);
}
#[test]
fn scan_fork_and_shared_mixed_types() {
let m = observable::from_iter(vec!['a', 'b', 'c']).scan(|_acc, _v| 1i32);
m.scan(|_acc, v| v as f32)
.to_shared()
.to_shared()
.subscribe(|_| {});
}
#[test]
fn scan_fork_and_shared() {
let m = observable::from_iter(0..100).scan(|acc: i32, v| acc + v);
m.scan(|acc: i32, v| acc + v)
.to_shared()
.to_shared()
.subscribe(|_| {});
}
}