another_rxrust/operators/
group_by.rs

1use crate::internals::{function_wrapper::*, stream_controller::*};
2use crate::prelude::*;
3use std::collections::HashMap;
4use std::hash::Hash;
5use std::sync::{Arc, RwLock};
6
7#[derive(Clone)]
8pub struct GroupBy<'a, Item, Key>
9where
10  Item: Clone + Send + Sync,
11  Key: Clone + Send + Sync + Hash + Eq,
12{
13  key_f: FunctionWrapper<'a, Item, Key>,
14}
15
16impl<'a, Item, Key> GroupBy<'a, Item, Key>
17where
18  Item: Clone + Send + Sync,
19  Key: Clone + Send + Sync + Hash + Eq,
20{
21  pub fn new<F>(f: F) -> GroupBy<'a, Item, Key>
22  where
23    F: Fn(Item) -> Key + Send + Sync + 'a,
24  {
25    GroupBy { key_f: FunctionWrapper::new(f) }
26  }
27  pub fn execute(
28    &self,
29    source: Observable<'a, Item>,
30  ) -> Observable<'a, Observable<'a, Item>> {
31    let f = self.key_f.clone();
32
33    Observable::create(move |s| {
34      let f = f.clone();
35
36      let sbjmap = Arc::new(RwLock::new(HashMap::<
37        Key,
38        subjects::Subject<Item>,
39      >::new()));
40
41      let sctl = StreamController::new(s);
42      let sctl_next = sctl.clone();
43      let sctl_error = sctl.clone();
44      let sctl_complete = sctl.clone();
45
46      let sbjmap_next = Arc::clone(&sbjmap);
47      let sbjmap_error = Arc::clone(&sbjmap);
48      let sbjmap_complete = Arc::clone(&sbjmap);
49
50      source.inner_subscribe(sctl.new_observer(
51        move |_, x: Item| {
52          let key = f.call(x.clone()); // Umm, can I use it as a reference?
53          let sbj = {
54            let mut sbjmap = sbjmap_next.write().unwrap();
55            if let Some(sbj) = sbjmap.get(&key) {
56              sbj.clone()
57            } else {
58              let sbj = subjects::Subject::<Item>::new();
59              sbjmap.insert(key, sbj.clone());
60              sctl_next.sink_next(sbj.observable());
61              sbj
62            }
63          };
64          sbj.next(x);
65        },
66        move |_, e| {
67          sbjmap_error.read().unwrap().iter().for_each(|x| {
68            x.1.error(e.clone());
69          });
70          sctl_error.sink_error(e);
71        },
72        move |serial| {
73          sbjmap_complete.read().unwrap().iter().for_each(|x| {
74            x.1.complete();
75          });
76          sctl_complete.sink_complete(&serial);
77        },
78      ));
79    })
80  }
81}
82
83impl<'a, Item> Observable<'a, Item>
84where
85  Item: Clone + Send + Sync,
86{
87  pub fn group_by<Key, F>(&self, f: F) -> Observable<'a, Observable<'a, Item>>
88  where
89    F: Fn(Item) -> Key + Send + Sync + 'a,
90    Key: Clone + Send + Sync + Hash + Eq + 'a,
91  {
92    GroupBy::new(f).execute(self.clone())
93  }
94}
95
96#[cfg(test)]
97mod test {
98  use crate::prelude::*;
99  use std::sync::{Arc, RwLock};
100
101  #[test]
102  fn basic() {
103    let n = Arc::new(RwLock::new(0));
104    observables::from_iter(0..10).group_by(|x| x % 3).subscribe(
105      move |x| {
106        let nn = *n.read().unwrap();
107        *n.write().unwrap() += 1;
108        x.subscribe(
109          move |y| println!("next ({}) - {}", nn, y),
110          move |e| println!("error ({}) - {:?}", nn, e),
111          move || println!("complete ({})", nn),
112        );
113      },
114      print_error!(),
115      print_complete!(),
116    );
117  }
118
119  #[test]
120  fn error() {
121    let n = Arc::new(RwLock::new(0));
122    observables::from_iter(0..10)
123      .flat_map(|x| {
124        if x == 7 {
125          observables::error(RxError::from_error("it's 7!!"))
126        } else {
127          observables::just(x)
128        }
129      })
130      .group_by(|x| x % 3)
131      .subscribe(
132        move |x| {
133          let nn = *n.read().unwrap();
134          *n.write().unwrap() += 1;
135          x.subscribe(
136            move |y| println!("next ({}) - {}", nn, y),
137            move |e| println!("error ({}) - {:?}", nn, e),
138            move || println!("complete ({})", nn),
139          );
140        },
141        print_error!(),
142        print_complete!(),
143      );
144  }
145}