rx_rust/operators/transforming/
window_with_count.rs

1use crate::utils::types::NecessarySendSync;
2use crate::{
3    disposable::subscription::Subscription,
4    observable::Observable,
5    observer::{Observer, Termination},
6    subject::{publish_subject::PublishSubject, subject_observable::SubjectObservable},
7};
8use educe::Educe;
9use std::{cmp::Ordering, num::NonZeroUsize};
10
11/// Periodically subdivides items from an Observable into Observable windows, each containing a specified number of items.
12/// See <https://reactivex.io/documentation/operators/window.html>
13///
14/// # Examples
15/// ```rust
16/// use rx_rust::{
17///     disposable::subscription::Subscription,
18///     observable::observable_ext::ObservableExt,
19///     observer::Termination,
20///     operators::{
21///         creating::from_iter::FromIter,
22///         transforming::window_with_count::WindowWithCount,
23///     },
24/// };
25/// use std::{num::NonZeroUsize, sync::{Arc, Mutex}};
26///
27/// let windows = Arc::new(Mutex::new(Vec::<Vec<i32>>::new()));
28/// let terminations = Arc::new(Mutex::new(Vec::new()));
29/// let inner_subscriptions = Arc::new(Mutex::new(Vec::<Subscription>::new()));
30/// let windows_observer = Arc::clone(&windows);
31/// let terminations_observer = Arc::clone(&terminations);
32/// let inner_subscriptions_observer = Arc::clone(&inner_subscriptions);
33///
34/// let subscription = WindowWithCount::new(
35///     FromIter::new(vec![1, 2, 3, 4]),
36///     NonZeroUsize::new(2).unwrap(),
37/// )
38/// .subscribe_with_callback(
39///     move |window| {
40///         let index = {
41///             let mut windows = windows_observer.lock().unwrap();
42///             windows.push(Vec::new());
43///             windows.len() - 1
44///         };
45///         let windows_for_values = Arc::clone(&windows_observer);
46///         let sub = window.subscribe_with_callback(
47///             move |value| {
48///                 windows_for_values.lock().unwrap()[index].push(value);
49///             },
50///             |_| {},
51///         );
52///         inner_subscriptions_observer.lock().unwrap().push(sub);
53///     },
54///     move |termination| terminations_observer
55///         .lock()
56///         .unwrap()
57///         .push(termination),
58/// );
59///
60/// drop(subscription);
61/// inner_subscriptions.lock().unwrap().drain(..).for_each(drop);
62///
63/// assert_eq!(
64///     &*windows.lock().unwrap(),
65///     &[vec![1, 2], vec![3, 4], vec![]]
66/// );
67/// assert_eq!(
68///     &*terminations.lock().unwrap(),
69///     &[Termination::Completed]
70/// );
71/// ```
72#[derive(Educe)]
73#[educe(Debug, Clone)]
74pub struct WindowWithCount<OE> {
75    source: OE,
76    count: NonZeroUsize,
77}
78
79impl<OE> WindowWithCount<OE> {
80    pub fn new(source: OE, count: NonZeroUsize) -> Self {
81        Self { source, count }
82    }
83}
84
85impl<'or, 'sub, T, E, OE> Observable<'or, 'sub, SubjectObservable<PublishSubject<'or, T, E>>, E>
86    for WindowWithCount<OE>
87where
88    T: Clone + NecessarySendSync + 'or,
89    E: Clone + NecessarySendSync + 'or,
90    OE: Observable<'or, 'sub, T, E>,
91{
92    fn subscribe(
93        self,
94        mut observer: impl Observer<SubjectObservable<PublishSubject<'or, T, E>>, E>
95        + NecessarySendSync
96        + 'or,
97    ) -> Subscription<'sub> {
98        let subject = PublishSubject::default();
99        observer.on_next(SubjectObservable::new(subject.clone()));
100
101        let observer = WindowWithCountObserver {
102            observer,
103            subject,
104            count: self.count,
105            sent_count: 0,
106        };
107        self.source.subscribe(observer)
108    }
109}
110
111struct WindowWithCountObserver<'or, T, E, OR> {
112    observer: OR,
113    subject: PublishSubject<'or, T, E>,
114    count: NonZeroUsize,
115    sent_count: usize,
116}
117
118impl<'or, T, E, OR> Observer<T, E> for WindowWithCountObserver<'or, T, E, OR>
119where
120    T: Clone,
121    E: Clone,
122    OR: Observer<SubjectObservable<PublishSubject<'or, T, E>>, E>,
123{
124    fn on_next(&mut self, value: T) {
125        match (self.sent_count + 1).cmp(&self.count.get()) {
126            Ordering::Less => {
127                self.subject.on_next(value);
128                self.sent_count += 1;
129            }
130            Ordering::Equal => {
131                let new_subject = PublishSubject::default();
132                let mut old_subject = std::mem::replace(&mut self.subject, new_subject.clone());
133                old_subject.on_next(value);
134                old_subject.on_termination(Termination::Completed);
135                self.observer.on_next(SubjectObservable::new(new_subject));
136                self.sent_count = 0;
137            }
138            Ordering::Greater => unreachable!(),
139        }
140    }
141
142    fn on_termination(self, termination: Termination<E>) {
143        self.subject.on_termination(termination.clone());
144        self.observer.on_termination(termination);
145    }
146}