rx_rust/operators/transforming/group_by.rs
1use crate::utils::types::NecessarySendSync;
2use crate::{
3 disposable::subscription::Subscription,
4 observable::Observable,
5 observer::{Observer, Termination},
6 subject::{
7 publish_subject::PublishSubject, subject_ext::SubjectExt,
8 subject_observable::SubjectObservable,
9 },
10 utils::types::MarkerType,
11};
12use educe::Educe;
13use std::{collections::HashMap, hash::Hash, marker::PhantomData};
14
15/// Divides an Observable into a set of Observables, each of which emits a different group of items from the original Observable, organized by key.
16/// See <https://reactivex.io/documentation/operators/groupby.html>
17///
18/// # Examples
19/// ```rust
20/// use rx_rust::{
21/// disposable::subscription::Subscription,
22/// observable::observable_ext::ObservableExt,
23/// observer::Termination,
24/// operators::{
25/// creating::from_iter::FromIter,
26/// transforming::group_by::GroupBy,
27/// },
28/// };
29/// use std::sync::{Arc, Mutex};
30///
31/// let groups = Arc::new(Mutex::new(Vec::<Vec<i32>>::new()));
32/// let terminations = Arc::new(Mutex::new(Vec::new()));
33/// let inner_subscriptions = Arc::new(Mutex::new(Vec::<Subscription>::new()));
34/// let groups_observer = Arc::clone(&groups);
35/// let terminations_observer = Arc::clone(&terminations);
36/// let inner_subscriptions_observer = Arc::clone(&inner_subscriptions);
37///
38/// let subscription = GroupBy::new(FromIter::new(vec![1, 2, 3, 4]), |value| value % 2)
39/// .subscribe_with_callback(
40/// move |group| {
41/// let index = {
42/// let mut groups = groups_observer.lock().unwrap();
43/// groups.push(Vec::new());
44/// groups.len() - 1
45/// };
46/// let groups_for_values = Arc::clone(&groups_observer);
47/// let sub = group.subscribe_with_callback(
48/// move |value| {
49/// groups_for_values.lock().unwrap()[index].push(value);
50/// },
51/// |_| {},
52/// );
53/// inner_subscriptions_observer.lock().unwrap().push(sub);
54/// },
55/// move |termination| terminations_observer
56/// .lock()
57/// .unwrap()
58/// .push(termination),
59/// );
60///
61/// drop(subscription);
62/// inner_subscriptions
63/// .lock()
64/// .unwrap()
65/// .drain(..)
66/// .for_each(drop);
67///
68/// let mut grouped = groups.lock().unwrap().clone();
69/// grouped.iter_mut().for_each(|values| values.sort());
70/// grouped.sort();
71/// assert_eq!(grouped, vec![vec![1, 3], vec![2, 4]]);
72/// assert_eq!(
73/// &*terminations.lock().unwrap(),
74/// &[Termination::Completed]
75/// );
76/// ```
77#[derive(Educe)]
78#[educe(Debug, Clone)]
79pub struct GroupBy<OE, F, K> {
80 source: OE,
81 callback: F,
82 _marker: MarkerType<K>,
83}
84
85impl<OE, F, K> GroupBy<OE, F, K> {
86 pub fn new<'or, 'sub, T, E>(source: OE, callback: F) -> Self
87 where
88 OE: Observable<'or, 'sub, T, E>,
89 F: FnMut(T) -> K,
90 {
91 Self {
92 source,
93 callback,
94 _marker: PhantomData,
95 }
96 }
97}
98
99impl<'or, 'sub, T, E, OE, F, K>
100 Observable<'or, 'sub, SubjectObservable<PublishSubject<'or, T, E>>, E> for GroupBy<OE, F, K>
101where
102 T: Clone + NecessarySendSync + 'or,
103 E: Clone + NecessarySendSync + 'or,
104 OE: Observable<'or, 'sub, T, E>,
105 F: FnMut(T) -> K + NecessarySendSync + 'or,
106 K: Eq + Hash + NecessarySendSync + 'or,
107{
108 fn subscribe(
109 self,
110 observer: impl Observer<SubjectObservable<PublishSubject<'or, T, E>>, E> + NecessarySendSync + 'or,
111 ) -> Subscription<'sub> {
112 let observer = GroupByObserver {
113 observer,
114 callback: self.callback,
115 subjects: HashMap::default(),
116 };
117 self.source.subscribe(observer)
118 }
119}
120
121struct GroupByObserver<'or, T, E, OR, F, K> {
122 observer: OR,
123 callback: F,
124 subjects: HashMap<K, PublishSubject<'or, T, E>>,
125}
126
127impl<'or, T, E, OR, F, K> Observer<T, E> for GroupByObserver<'or, T, E, OR, F, K>
128where
129 T: Clone + NecessarySendSync,
130 E: Clone + NecessarySendSync,
131 OR: Observer<SubjectObservable<PublishSubject<'or, T, E>>, E>,
132 F: FnMut(T) -> K,
133 K: Eq + Hash,
134{
135 fn on_next(&mut self, value: T) {
136 let key = (self.callback)(value.clone());
137 let mut subject = self
138 .subjects
139 .entry(key)
140 .or_insert_with(|| {
141 let subject = PublishSubject::new();
142 self.observer.on_next(subject.clone().into_observable());
143 subject
144 })
145 .clone();
146 subject.on_next(value);
147 }
148
149 fn on_termination(self, termination: Termination<E>) {
150 self.subjects
151 .into_values()
152 .for_each(|subject| subject.on_termination(termination.clone()));
153 self.observer.on_termination(termination);
154 }
155}