another_rxrust/operators/
sum_and_count.rs1use crate::internals::stream_controller::*;
2use crate::prelude::*;
3use std::{
4 marker::PhantomData,
5 ops::Add,
6 sync::{Arc, RwLock},
7};
8
9#[derive(Clone)]
10pub struct SumAndCount<Item> {
11 _item: PhantomData<Item>,
12}
13
14impl<'a, Item> SumAndCount<Item>
15where
16 Item: Clone + Send + Sync + Add<Output = Item>,
17{
18 pub fn new() -> SumAndCount<Item> {
19 SumAndCount { _item: PhantomData }
20 }
21 pub fn execute(
22 &self,
23 source: Observable<'a, Item>,
24 ) -> Observable<'a, (Item, usize)> {
25 Observable::create(move |s| {
26 let n = Arc::new(RwLock::new(0usize));
27 let sum = Arc::new(RwLock::new(None::<Item>));
28
29 let sctl = StreamController::new(s);
30 let sctl_error = sctl.clone();
31 let sctl_complete = sctl.clone();
32
33 let sum_next = Arc::clone(&sum);
34 let n_next = Arc::clone(&n);
35
36 source.inner_subscribe(sctl.new_observer(
37 move |_, x| {
38 *n_next.write().unwrap() += 1;
39 let mut sum = sum_next.write().unwrap();
40 if let Some(latest) = &*sum {
41 *sum = Some(latest.clone() + x);
42 } else {
43 *sum = Some(x);
44 }
45 },
46 move |_, e| {
47 sctl_error.sink_error(e);
48 },
49 move |serial| {
50 if let Some(latest) = &*sum.read().unwrap() {
51 sctl_complete.sink_next((latest.clone(), *n.read().unwrap()));
52 }
53 sctl_complete.sink_complete(&serial);
54 },
55 ));
56 })
57 }
58}
59
60impl<'a, Item> Observable<'a, Item>
61where
62 Item: Clone + Send + Sync + Add<Output = Item>,
63{
64 pub fn sum_and_count(&self) -> Observable<'a, (Item, usize)> {
65 SumAndCount::new().execute(self.clone())
66 }
67}
68
69#[cfg(test)]
70mod test {
71 use crate::prelude::*;
72
73 #[test]
74 fn basic() {
75 observables::from_iter(1..10).sum_and_count().subscribe(
76 print_next_fmt!("{:?}"),
77 print_error!(),
78 print_complete!(),
79 );
80 }
81
82 #[test]
83 fn empty() {
84 observables::empty::<i32>().sum_and_count().subscribe(
85 print_next_fmt!("{:?}"),
86 print_error!(),
87 print_complete!(),
88 );
89 }
90
91 #[test]
92 fn error() {
93 Observable::create(|s| {
94 s.next(1);
95 s.error(RxError::from_error("ERR!"))
96 })
97 .sum_and_count()
98 .subscribe(
99 print_next_fmt!("{:?}"),
100 print_error_as!(&str),
101 print_complete!(),
102 );
103 }
104}