rx_rust/operators/transforming/window_with_count.rs
1use crate::utils::types::NecessarySendSync;
2use crate::{
3 disposable::subscription::Subscription,
4 observable::Observable,
5 observer::{Observer, Termination},
6 subject::{publish_subject::PublishSubject, subject_observable::SubjectObservable},
7};
8use educe::Educe;
9use std::{cmp::Ordering, num::NonZeroUsize};
10
11/// Periodically subdivides items from an Observable into Observable windows, each containing a specified number of items.
12/// See <https://reactivex.io/documentation/operators/window.html>
13///
14/// # Examples
15/// ```rust
16/// use rx_rust::{
17/// disposable::subscription::Subscription,
18/// observable::observable_ext::ObservableExt,
19/// observer::Termination,
20/// operators::{
21/// creating::from_iter::FromIter,
22/// transforming::window_with_count::WindowWithCount,
23/// },
24/// };
25/// use std::{num::NonZeroUsize, sync::{Arc, Mutex}};
26///
27/// let windows = Arc::new(Mutex::new(Vec::<Vec<i32>>::new()));
28/// let terminations = Arc::new(Mutex::new(Vec::new()));
29/// let inner_subscriptions = Arc::new(Mutex::new(Vec::<Subscription>::new()));
30/// let windows_observer = Arc::clone(&windows);
31/// let terminations_observer = Arc::clone(&terminations);
32/// let inner_subscriptions_observer = Arc::clone(&inner_subscriptions);
33///
34/// let subscription = WindowWithCount::new(
35/// FromIter::new(vec![1, 2, 3, 4]),
36/// NonZeroUsize::new(2).unwrap(),
37/// )
38/// .subscribe_with_callback(
39/// move |window| {
40/// let index = {
41/// let mut windows = windows_observer.lock().unwrap();
42/// windows.push(Vec::new());
43/// windows.len() - 1
44/// };
45/// let windows_for_values = Arc::clone(&windows_observer);
46/// let sub = window.subscribe_with_callback(
47/// move |value| {
48/// windows_for_values.lock().unwrap()[index].push(value);
49/// },
50/// |_| {},
51/// );
52/// inner_subscriptions_observer.lock().unwrap().push(sub);
53/// },
54/// move |termination| terminations_observer
55/// .lock()
56/// .unwrap()
57/// .push(termination),
58/// );
59///
60/// drop(subscription);
61/// inner_subscriptions.lock().unwrap().drain(..).for_each(drop);
62///
63/// assert_eq!(
64/// &*windows.lock().unwrap(),
65/// &[vec![1, 2], vec![3, 4], vec![]]
66/// );
67/// assert_eq!(
68/// &*terminations.lock().unwrap(),
69/// &[Termination::Completed]
70/// );
71/// ```
72#[derive(Educe)]
73#[educe(Debug, Clone)]
74pub struct WindowWithCount<OE> {
75 source: OE,
76 count: NonZeroUsize,
77}
78
79impl<OE> WindowWithCount<OE> {
80 pub fn new(source: OE, count: NonZeroUsize) -> Self {
81 Self { source, count }
82 }
83}
84
85impl<'or, 'sub, T, E, OE> Observable<'or, 'sub, SubjectObservable<PublishSubject<'or, T, E>>, E>
86 for WindowWithCount<OE>
87where
88 T: Clone + NecessarySendSync + 'or,
89 E: Clone + NecessarySendSync + 'or,
90 OE: Observable<'or, 'sub, T, E>,
91{
92 fn subscribe(
93 self,
94 mut observer: impl Observer<SubjectObservable<PublishSubject<'or, T, E>>, E>
95 + NecessarySendSync
96 + 'or,
97 ) -> Subscription<'sub> {
98 let subject = PublishSubject::default();
99 observer.on_next(SubjectObservable::new(subject.clone()));
100
101 let observer = WindowWithCountObserver {
102 observer,
103 subject,
104 count: self.count,
105 sent_count: 0,
106 };
107 self.source.subscribe(observer)
108 }
109}
110
111struct WindowWithCountObserver<'or, T, E, OR> {
112 observer: OR,
113 subject: PublishSubject<'or, T, E>,
114 count: NonZeroUsize,
115 sent_count: usize,
116}
117
118impl<'or, T, E, OR> Observer<T, E> for WindowWithCountObserver<'or, T, E, OR>
119where
120 T: Clone,
121 E: Clone,
122 OR: Observer<SubjectObservable<PublishSubject<'or, T, E>>, E>,
123{
124 fn on_next(&mut self, value: T) {
125 match (self.sent_count + 1).cmp(&self.count.get()) {
126 Ordering::Less => {
127 self.subject.on_next(value);
128 self.sent_count += 1;
129 }
130 Ordering::Equal => {
131 let new_subject = PublishSubject::default();
132 let mut old_subject = std::mem::replace(&mut self.subject, new_subject.clone());
133 old_subject.on_next(value);
134 old_subject.on_termination(Termination::Completed);
135 self.observer.on_next(SubjectObservable::new(new_subject));
136 self.sent_count = 0;
137 }
138 Ordering::Greater => unreachable!(),
139 }
140 }
141
142 fn on_termination(self, termination: Termination<E>) {
143 self.subject.on_termination(termination.clone());
144 self.observer.on_termination(termination);
145 }
146}