use crate::prelude::*;
use ops::last::{Last, LastOrOp};
use ops::scan::{Scan, ScanOp};
pub type ReduceOp<Source, BinaryOp, InputItem, OutputItem> =
LastOrOp<ScanOp<Source, BinaryOp, InputItem, OutputItem>, OutputItem>;
pub trait Reduce<OutputItem> {
fn reduce_initial<InputItem, BinaryOp>(
self,
initial: OutputItem,
binary_op: BinaryOp,
) -> ReduceOp<Self, BinaryOp, InputItem, OutputItem>
where
Self: Sized,
BinaryOp: Fn(OutputItem, InputItem) -> OutputItem,
OutputItem: Clone,
{
self
.scan_initial(initial.clone(), binary_op)
.last_or(initial)
}
fn reduce<InputItem, BinaryOp>(
self,
binary_op: BinaryOp,
) -> LastOrOp<ScanOp<Self, BinaryOp, InputItem, OutputItem>, OutputItem>
where
Self: Sized,
BinaryOp: Fn(OutputItem, InputItem) -> OutputItem,
OutputItem: Default + Clone,
{
self.reduce_initial(OutputItem::default(), binary_op)
}
}
impl<O, OutputItem> Reduce<OutputItem> for O {}
#[cfg(test)]
mod test {
use crate::{ops::Reduce, prelude::*};
#[test]
fn reduce_initial() {
let mut emitted = 0;
observable::from_iter(vec![1, 1, 1, 1, 1])
.reduce_initial(100, |acc, v| acc + v)
.subscribe(|v| emitted = v);
assert_eq!(105, emitted);
}
#[test]
fn reduce_initial_on_empty_observable() {
let mut emitted = 0;
observable::empty()
.reduce_initial(100, |acc, v: i32| acc + v)
.subscribe(|v| emitted = v);
assert_eq!(100, emitted);
}
#[test]
fn reduce() {
let mut emitted = 0;
observable::from_iter(vec![1, 1, 1, 1, 1])
.reduce(|acc, v| acc + v)
.subscribe(|v| emitted = v);
assert_eq!(5, emitted);
}
#[test]
fn reduce_on_empty_observable() {
let mut emitted = 0;
observable::empty()
.reduce(|acc, v: i32| acc + v)
.subscribe(|v| emitted = v);
assert_eq!(0, emitted);
}
#[test]
fn reduce_mixed_types() {
let mut emitted = 0u32;
observable::from_iter(vec![1i32, 2i32, 3i32, 4i32])
.reduce(|acc, v: i32| acc + (v as u32))
.subscribe(|v| emitted = v);
assert_eq!(10u32, emitted);
}
#[test]
fn reduce_for_counting_total_length() {
let mut emitted = 0;
observable::from_iter(vec![String::from("foo"), String::from("bar")])
.reduce(|acc, v: String| acc + v.len())
.subscribe(|v| emitted = v);
assert_eq!(6, emitted);
}
#[test]
fn reduce_fork_and_shared() {
let m = observable::from_iter(0..100).reduce(|acc: i32, v| acc + v);
m.fork()
.reduce(|acc: i32, v| acc + v)
.fork()
.to_shared()
.fork()
.to_shared()
.subscribe(|_| {});
}
}