use crate::prelude::*;
use ops::last::{Last, LastOrOp};
use ops::map::{Map, MapOp};
use ops::scan::{Scan, ScanOp};
use std::ops::Add;
use std::ops::Mul;
type Accum<Item> = (Item, usize);
fn average_floats<T>(acc: Accum<T>) -> T
where
T: Default + Copy + Send + Mul<f64, Output = T>,
{
acc.0 * (1.0 / (acc.1 as f64))
}
fn accumulate_item<T>(acc: Accum<T>, v: T) -> Accum<T>
where
T: Copy + Add<T, Output = T>,
{
let newacc = acc.0 + v;
let newcount = acc.1 + 1;
(newacc, newcount)
}
pub type AverageOp<Source, Item> = MapOp<
LastOrOp<
ScanOp<Source, fn(Accum<Item>, Item) -> Accum<Item>, Item, Accum<Item>>,
Accum<Item>,
>,
fn(Accum<Item>) -> Item,
Accum<Item>,
>;
pub trait Average<Item>
where
Self: Sized,
{
fn average(self) -> AverageOp<Self, Item>;
}
impl<O, Item> Average<Item> for O
where
Self: Sized,
Item:
Copy + Send + Default + Add<Item, Output = Item> + Mul<f64, Output = Item>,
{
fn average(self) -> AverageOp<Self, Item> {
let start = (Item::default(), 0);
let acc = accumulate_item as fn(Accum<Item>, Item) -> Accum<Item>;
let avg = average_floats as fn(Accum<Item>) -> Item;
self.scan_initial(start, acc).last().map(avg)
}
}
#[cfg(test)]
mod test {
use crate::{ops::Average, prelude::*};
use float_cmp::*;
#[test]
fn average_of_floats() {
let mut emitted = 0.0;
let mut num_emissions = 0;
let mut num_errors = 0;
let mut num_completions = 0;
observable::from_iter(vec![3., 4., 5., 6., 7.])
.average()
.subscribe_all(
|v| {
num_emissions += 1;
emitted = v
},
|_| num_errors += 1,
|| num_completions += 1,
);
assert!(approx_eq!(f64, 5.0, emitted));
assert_eq!(1, num_emissions);
assert_eq!(0, num_errors);
assert_eq!(1, num_completions);
}
#[test]
fn average_on_single_float_item() {
let mut emitted = 0.0;
let mut num_emissions = 0;
observable::of(123.0).average().subscribe(|v| {
num_emissions += 1;
emitted = v
});
assert!(approx_eq!(f64, 123.0, emitted));
assert_eq!(1, num_emissions);
}
#[test]
fn average_on_empty_observable() {
let mut emitted: Option<f64> = None;
observable::empty()
.average()
.subscribe(|v| emitted = Some(v));
assert_eq!(None, emitted);
}
#[test]
fn average_fork_and_shared() {
let m = observable::from_iter(vec![1., 2.]).average();
m.fork().to_shared().fork().to_shared().subscribe(|_| {});
}
}