another_rxrust/operators/
reduce.rs1use crate::internals::{function_wrapper::*, stream_controller::*};
2use crate::prelude::*;
3use std::sync::{Arc, RwLock};
4
5#[derive(Clone)]
6pub struct Reduce<'a, Item>
7where
8 Item: Clone + Send + Sync,
9{
10 reduce_f: FunctionWrapper<'a, (Item, Item), Item>,
11}
12
13impl<'a, Item> Reduce<'a, Item>
14where
15 Item: Clone + Send + Sync,
16{
17 pub fn new<F>(f: F) -> Reduce<'a, Item>
18 where
19 F: Fn((Item, Item)) -> Item + Send + Sync + 'a,
20 {
21 Reduce { reduce_f: FunctionWrapper::new(f) }
22 }
23 pub fn execute(&self, source: Observable<'a, Item>) -> Observable<'a, Item> {
24 let f = self.reduce_f.clone();
25
26 Observable::<Item>::create(move |s| {
27 let f = f.clone();
28 let result = Arc::new(RwLock::new(None::<Item>));
29
30 let sctl = StreamController::new(s);
31 let sctl_error = sctl.clone();
32 let sctl_complete = sctl.clone();
33
34 let result_next = Arc::clone(&result);
35
36 source.inner_subscribe(sctl.new_observer(
37 move |_, x| {
38 let mut r = result_next.write().unwrap();
39 if let Some(xx) = &*r {
40 *r = Some(f.call((xx.clone(), x)));
41 } else {
42 *r = Some(x);
43 }
44 },
45 move |_, e| {
46 sctl_error.sink_error(e);
47 },
48 move |serial| {
49 if let Some(x) = &*result.read().unwrap() {
50 sctl_complete.sink_next(x.clone());
51 }
52 sctl_complete.sink_complete(&serial);
53 },
54 ));
55 })
56 }
57}
58
59impl<'a, Item> Observable<'a, Item>
60where
61 Item: Clone + Send + Sync,
62{
63 pub fn reduce<F>(&self, f: F) -> Observable<'a, Item>
64 where
65 F: Fn((Item, Item)) -> Item + Send + Sync + 'a,
66 {
67 Reduce::new(f).execute(self.clone())
68 }
69}
70
71#[cfg(test)]
72mod test {
73 use crate::prelude::*;
74
75 #[test]
76 fn basic() {
77 observables::range(1, 10).reduce(|(a, b)| a + b).subscribe(
78 print_next_fmt!("{}"),
79 print_error!(),
80 print_complete!(),
81 );
82 }
83
84 #[test]
85 fn string() {
86 observables::from_iter(
87 ["a".to_owned(), "b".to_owned(), "c".to_owned()].into_iter(),
88 )
89 .reduce(|(a, b)| format!("{} - {}", a, b))
90 .subscribe(
91 print_next_fmt!("{}"),
92 print_error!(),
93 print_complete!(),
94 );
95 }
96
97 #[test]
98 fn single() {
99 observables::just(1).reduce(|(a, b)| a + b).subscribe(
100 print_next_fmt!("{}"),
101 print_error!(),
102 print_complete!(),
103 );
104 }
105
106 #[test]
107 fn empty() {
108 observables::empty::<i32>()
109 .reduce(|(a, b)| a + b)
110 .subscribe(
111 print_next_fmt!("{}"),
112 print_error!(),
113 print_complete!(),
114 );
115 }
116
117 #[test]
118 fn error() {
119 Observable::create(|s| {
120 s.next(1);
121 s.error(RxError::from_error("ERR!"))
122 })
123 .reduce(|(a, b)| a + b)
124 .subscribe(
125 print_next_fmt!("{}"),
126 print_error_as!(&str),
127 print_complete!(),
128 );
129 }
130}