another_rxrust/operators/
reduce.rs

1use crate::internals::{function_wrapper::*, stream_controller::*};
2use crate::prelude::*;
3use std::sync::{Arc, RwLock};
4
5#[derive(Clone)]
6pub struct Reduce<'a, Item>
7where
8  Item: Clone + Send + Sync,
9{
10  reduce_f: FunctionWrapper<'a, (Item, Item), Item>,
11}
12
13impl<'a, Item> Reduce<'a, Item>
14where
15  Item: Clone + Send + Sync,
16{
17  pub fn new<F>(f: F) -> Reduce<'a, Item>
18  where
19    F: Fn((Item, Item)) -> Item + Send + Sync + 'a,
20  {
21    Reduce { reduce_f: FunctionWrapper::new(f) }
22  }
23  pub fn execute(&self, source: Observable<'a, Item>) -> Observable<'a, Item> {
24    let f = self.reduce_f.clone();
25
26    Observable::<Item>::create(move |s| {
27      let f = f.clone();
28      let result = Arc::new(RwLock::new(None::<Item>));
29
30      let sctl = StreamController::new(s);
31      let sctl_error = sctl.clone();
32      let sctl_complete = sctl.clone();
33
34      let result_next = Arc::clone(&result);
35
36      source.inner_subscribe(sctl.new_observer(
37        move |_, x| {
38          let mut r = result_next.write().unwrap();
39          if let Some(xx) = &*r {
40            *r = Some(f.call((xx.clone(), x)));
41          } else {
42            *r = Some(x);
43          }
44        },
45        move |_, e| {
46          sctl_error.sink_error(e);
47        },
48        move |serial| {
49          if let Some(x) = &*result.read().unwrap() {
50            sctl_complete.sink_next(x.clone());
51          }
52          sctl_complete.sink_complete(&serial);
53        },
54      ));
55    })
56  }
57}
58
59impl<'a, Item> Observable<'a, Item>
60where
61  Item: Clone + Send + Sync,
62{
63  pub fn reduce<F>(&self, f: F) -> Observable<'a, Item>
64  where
65    F: Fn((Item, Item)) -> Item + Send + Sync + 'a,
66  {
67    Reduce::new(f).execute(self.clone())
68  }
69}
70
71#[cfg(test)]
72mod test {
73  use crate::prelude::*;
74
75  #[test]
76  fn basic() {
77    observables::range(1, 10).reduce(|(a, b)| a + b).subscribe(
78      print_next_fmt!("{}"),
79      print_error!(),
80      print_complete!(),
81    );
82  }
83
84  #[test]
85  fn string() {
86    observables::from_iter(
87      ["a".to_owned(), "b".to_owned(), "c".to_owned()].into_iter(),
88    )
89    .reduce(|(a, b)| format!("{} - {}", a, b))
90    .subscribe(
91      print_next_fmt!("{}"),
92      print_error!(),
93      print_complete!(),
94    );
95  }
96
97  #[test]
98  fn single() {
99    observables::just(1).reduce(|(a, b)| a + b).subscribe(
100      print_next_fmt!("{}"),
101      print_error!(),
102      print_complete!(),
103    );
104  }
105
106  #[test]
107  fn empty() {
108    observables::empty::<i32>()
109      .reduce(|(a, b)| a + b)
110      .subscribe(
111        print_next_fmt!("{}"),
112        print_error!(),
113        print_complete!(),
114      );
115  }
116
117  #[test]
118  fn error() {
119    Observable::create(|s| {
120      s.next(1);
121      s.error(RxError::from_error("ERR!"))
122    })
123    .reduce(|(a, b)| a + b)
124    .subscribe(
125      print_next_fmt!("{}"),
126      print_error_as!(&str),
127      print_complete!(),
128    );
129  }
130}