rx_rust/operators/transforming/
group_by.rs

1use crate::utils::types::NecessarySend;
2use crate::{
3    disposable::subscription::Subscription,
4    observable::Observable,
5    observer::{Observer, Termination},
6    subject::{
7        publish_subject::PublishSubject, subject_ext::SubjectExt,
8        subject_observable::SubjectObservable,
9    },
10    utils::types::MarkerType,
11};
12use educe::Educe;
13use std::{collections::HashMap, hash::Hash, marker::PhantomData};
14
15/// Divides an Observable into a set of Observables, each of which emits a different group of items from the original Observable, organized by key.
16/// See <https://reactivex.io/documentation/operators/groupby.html>
17///
18/// # Examples
19/// ```rust
20/// use rx_rust::{
21///     disposable::subscription::Subscription,
22///     observable::observable_ext::ObservableExt,
23///     observer::Termination,
24///     operators::{
25///         creating::from_iter::FromIter,
26///         transforming::group_by::GroupBy,
27///     },
28/// };
29/// use std::sync::{Arc, Mutex};
30///
31/// let groups = Arc::new(Mutex::new(Vec::<Vec<i32>>::new()));
32/// let terminations = Arc::new(Mutex::new(Vec::new()));
33/// let inner_subscriptions = Arc::new(Mutex::new(Vec::<Subscription>::new()));
34/// let groups_observer = Arc::clone(&groups);
35/// let terminations_observer = Arc::clone(&terminations);
36/// let inner_subscriptions_observer = Arc::clone(&inner_subscriptions);
37///
38/// let subscription = GroupBy::new(FromIter::new(vec![1, 2, 3, 4]), |value| value % 2)
39///     .subscribe_with_callback(
40///         move |group| {
41///             let index = {
42///                 let mut groups = groups_observer.lock().unwrap();
43///                 groups.push(Vec::new());
44///                 groups.len() - 1
45///             };
46///             let groups_for_values = Arc::clone(&groups_observer);
47///             let sub = group.subscribe_with_callback(
48///                 move |value| {
49///                     groups_for_values.lock().unwrap()[index].push(value);
50///                 },
51///                 |_| {},
52///             );
53///             inner_subscriptions_observer.lock().unwrap().push(sub);
54///         },
55///         move |termination| terminations_observer
56///             .lock()
57///             .unwrap()
58///             .push(termination),
59///     );
60///
61/// drop(subscription);
62/// inner_subscriptions
63///     .lock()
64///     .unwrap()
65///     .drain(..)
66///     .for_each(drop);
67///
68/// let mut grouped = groups.lock().unwrap().clone();
69/// grouped.iter_mut().for_each(|values| values.sort());
70/// grouped.sort();
71/// assert_eq!(grouped, vec![vec![1, 3], vec![2, 4]]);
72/// assert_eq!(
73///     &*terminations.lock().unwrap(),
74///     &[Termination::Completed]
75/// );
76/// ```
77#[derive(Educe)]
78#[educe(Debug, Clone)]
79pub struct GroupBy<OE, F, K> {
80    source: OE,
81    callback: F,
82    _marker: MarkerType<K>,
83}
84
85impl<OE, F, K> GroupBy<OE, F, K> {
86    pub fn new<'or, 'sub, T, E>(source: OE, callback: F) -> Self
87    where
88        OE: Observable<'or, 'sub, T, E>,
89        F: FnMut(T) -> K,
90    {
91        Self {
92            source,
93            callback,
94            _marker: PhantomData,
95        }
96    }
97}
98
99impl<'or, 'sub, T, E, OE, F, K>
100    Observable<'or, 'sub, SubjectObservable<PublishSubject<'or, T, E>>, E> for GroupBy<OE, F, K>
101where
102    T: Clone + NecessarySend + 'or,
103    E: Clone + NecessarySend + 'or,
104    OE: Observable<'or, 'sub, T, E>,
105    F: FnMut(T) -> K + NecessarySend + 'or,
106    K: Eq + Hash + NecessarySend + 'or,
107{
108    fn subscribe(
109        self,
110        observer: impl Observer<SubjectObservable<PublishSubject<'or, T, E>>, E> + NecessarySend + 'or,
111    ) -> Subscription<'sub> {
112        let observer = GroupByObserver {
113            observer,
114            callback: self.callback,
115            subjects: HashMap::default(),
116        };
117        self.source.subscribe(observer)
118    }
119}
120
121struct GroupByObserver<'or, T, E, OR, F, K> {
122    observer: OR,
123    callback: F,
124    subjects: HashMap<K, PublishSubject<'or, T, E>>,
125}
126
127impl<'or, T, E, OR, F, K> Observer<T, E> for GroupByObserver<'or, T, E, OR, F, K>
128where
129    T: Clone + NecessarySend,
130    E: Clone + NecessarySend,
131    OR: Observer<SubjectObservable<PublishSubject<'or, T, E>>, E>,
132    F: FnMut(T) -> K,
133    K: Eq + Hash,
134{
135    fn on_next(&mut self, value: T) {
136        let key = (self.callback)(value.clone());
137        let mut subject = self
138            .subjects
139            .entry(key)
140            .or_insert_with(|| {
141                let subject = PublishSubject::new();
142                self.observer.on_next(subject.clone().into_observable());
143                subject
144            })
145            .clone();
146        subject.on_next(value);
147    }
148
149    fn on_termination(self, termination: Termination<E>) {
150        self.subjects
151            .into_values()
152            .for_each(|subject| subject.on_termination(termination.clone()));
153        self.observer.on_termination(termination);
154    }
155}