another_rxrust/operators/
sum_and_count.rs

1use crate::internals::stream_controller::*;
2use crate::prelude::*;
3use std::{
4  marker::PhantomData,
5  ops::Add,
6  sync::{Arc, RwLock},
7};
8
9#[derive(Clone)]
10pub struct SumAndCount<Item> {
11  _item: PhantomData<Item>,
12}
13
14impl<'a, Item> SumAndCount<Item>
15where
16  Item: Clone + Send + Sync + Add<Output = Item>,
17{
18  pub fn new() -> SumAndCount<Item> {
19    SumAndCount { _item: PhantomData }
20  }
21  pub fn execute(
22    &self,
23    source: Observable<'a, Item>,
24  ) -> Observable<'a, (Item, usize)> {
25    Observable::create(move |s| {
26      let n = Arc::new(RwLock::new(0usize));
27      let sum = Arc::new(RwLock::new(None::<Item>));
28
29      let sctl = StreamController::new(s);
30      let sctl_error = sctl.clone();
31      let sctl_complete = sctl.clone();
32
33      let sum_next = Arc::clone(&sum);
34      let n_next = Arc::clone(&n);
35
36      source.inner_subscribe(sctl.new_observer(
37        move |_, x| {
38          *n_next.write().unwrap() += 1;
39          let mut sum = sum_next.write().unwrap();
40          if let Some(latest) = &*sum {
41            *sum = Some(latest.clone() + x);
42          } else {
43            *sum = Some(x);
44          }
45        },
46        move |_, e| {
47          sctl_error.sink_error(e);
48        },
49        move |serial| {
50          if let Some(latest) = &*sum.read().unwrap() {
51            sctl_complete.sink_next((latest.clone(), *n.read().unwrap()));
52          }
53          sctl_complete.sink_complete(&serial);
54        },
55      ));
56    })
57  }
58}
59
60impl<'a, Item> Observable<'a, Item>
61where
62  Item: Clone + Send + Sync + Add<Output = Item>,
63{
64  pub fn sum_and_count(&self) -> Observable<'a, (Item, usize)> {
65    SumAndCount::new().execute(self.clone())
66  }
67}
68
69#[cfg(test)]
70mod test {
71  use crate::prelude::*;
72
73  #[test]
74  fn basic() {
75    observables::from_iter(1..10).sum_and_count().subscribe(
76      print_next_fmt!("{:?}"),
77      print_error!(),
78      print_complete!(),
79    );
80  }
81
82  #[test]
83  fn empty() {
84    observables::empty::<i32>().sum_and_count().subscribe(
85      print_next_fmt!("{:?}"),
86      print_error!(),
87      print_complete!(),
88    );
89  }
90
91  #[test]
92  fn error() {
93    Observable::create(|s| {
94      s.next(1);
95      s.error(RxError::from_error("ERR!"))
96    })
97    .sum_and_count()
98    .subscribe(
99      print_next_fmt!("{:?}"),
100      print_error_as!(&str),
101      print_complete!(),
102    );
103  }
104}