another_rxrust/operators/
group_by.rs1use crate::internals::{function_wrapper::*, stream_controller::*};
2use crate::prelude::*;
3use std::collections::HashMap;
4use std::hash::Hash;
5use std::sync::{Arc, RwLock};
6
7#[derive(Clone)]
8pub struct GroupBy<'a, Item, Key>
9where
10 Item: Clone + Send + Sync,
11 Key: Clone + Send + Sync + Hash + Eq,
12{
13 key_f: FunctionWrapper<'a, Item, Key>,
14}
15
16impl<'a, Item, Key> GroupBy<'a, Item, Key>
17where
18 Item: Clone + Send + Sync,
19 Key: Clone + Send + Sync + Hash + Eq,
20{
21 pub fn new<F>(f: F) -> GroupBy<'a, Item, Key>
22 where
23 F: Fn(Item) -> Key + Send + Sync + 'a,
24 {
25 GroupBy { key_f: FunctionWrapper::new(f) }
26 }
27 pub fn execute(
28 &self,
29 source: Observable<'a, Item>,
30 ) -> Observable<'a, Observable<'a, Item>> {
31 let f = self.key_f.clone();
32
33 Observable::create(move |s| {
34 let f = f.clone();
35
36 let sbjmap = Arc::new(RwLock::new(HashMap::<
37 Key,
38 subjects::Subject<Item>,
39 >::new()));
40
41 let sctl = StreamController::new(s);
42 let sctl_next = sctl.clone();
43 let sctl_error = sctl.clone();
44 let sctl_complete = sctl.clone();
45
46 let sbjmap_next = Arc::clone(&sbjmap);
47 let sbjmap_error = Arc::clone(&sbjmap);
48 let sbjmap_complete = Arc::clone(&sbjmap);
49
50 source.inner_subscribe(sctl.new_observer(
51 move |_, x: Item| {
52 let key = f.call(x.clone()); let sbj = {
54 let mut sbjmap = sbjmap_next.write().unwrap();
55 if let Some(sbj) = sbjmap.get(&key) {
56 sbj.clone()
57 } else {
58 let sbj = subjects::Subject::<Item>::new();
59 sbjmap.insert(key, sbj.clone());
60 sctl_next.sink_next(sbj.observable());
61 sbj
62 }
63 };
64 sbj.next(x);
65 },
66 move |_, e| {
67 sbjmap_error.read().unwrap().iter().for_each(|x| {
68 x.1.error(e.clone());
69 });
70 sctl_error.sink_error(e);
71 },
72 move |serial| {
73 sbjmap_complete.read().unwrap().iter().for_each(|x| {
74 x.1.complete();
75 });
76 sctl_complete.sink_complete(&serial);
77 },
78 ));
79 })
80 }
81}
82
83impl<'a, Item> Observable<'a, Item>
84where
85 Item: Clone + Send + Sync,
86{
87 pub fn group_by<Key, F>(&self, f: F) -> Observable<'a, Observable<'a, Item>>
88 where
89 F: Fn(Item) -> Key + Send + Sync + 'a,
90 Key: Clone + Send + Sync + Hash + Eq + 'a,
91 {
92 GroupBy::new(f).execute(self.clone())
93 }
94}
95
96#[cfg(test)]
97mod test {
98 use crate::prelude::*;
99 use std::sync::{Arc, RwLock};
100
101 #[test]
102 fn basic() {
103 let n = Arc::new(RwLock::new(0));
104 observables::from_iter(0..10).group_by(|x| x % 3).subscribe(
105 move |x| {
106 let nn = *n.read().unwrap();
107 *n.write().unwrap() += 1;
108 x.subscribe(
109 move |y| println!("next ({}) - {}", nn, y),
110 move |e| println!("error ({}) - {:?}", nn, e),
111 move || println!("complete ({})", nn),
112 );
113 },
114 print_error!(),
115 print_complete!(),
116 );
117 }
118
119 #[test]
120 fn error() {
121 let n = Arc::new(RwLock::new(0));
122 observables::from_iter(0..10)
123 .flat_map(|x| {
124 if x == 7 {
125 observables::error(RxError::from_error("it's 7!!"))
126 } else {
127 observables::just(x)
128 }
129 })
130 .group_by(|x| x % 3)
131 .subscribe(
132 move |x| {
133 let nn = *n.read().unwrap();
134 *n.write().unwrap() += 1;
135 x.subscribe(
136 move |y| println!("next ({}) - {}", nn, y),
137 move |e| println!("error ({}) - {:?}", nn, e),
138 move || println!("complete ({})", nn),
139 );
140 },
141 print_error!(),
142 print_complete!(),
143 );
144 }
145}