rx_rust/operators/transforming/
window.rs1use crate::utils::types::{Mutable, NecessarySend, Shared};
2use crate::{
3 disposable::subscription::Subscription,
4 observable::Observable,
5 observer::{Observer, Termination},
6 subject::{publish_subject::PublishSubject, subject_observable::SubjectObservable},
7 utils::unsub_after_termination::subscribe_unsub_after_termination,
8};
9use crate::{safe_lock, safe_lock_observer, safe_lock_option_observer};
10use educe::Educe;
11
12#[derive(Educe)]
76#[educe(Debug, Clone)]
77pub struct Window<OE, OE1> {
78 source: OE,
79 boundary: OE1,
80}
81
82impl<OE, OE1> Window<OE, OE1> {
83 pub fn new<'or, 'sub, T, E>(source: OE, boundary: OE1) -> Self
84 where
85 OE: Observable<'or, 'sub, T, E>,
86 OE1: Observable<'or, 'sub, (), E>,
87 {
88 Self { source, boundary }
89 }
90}
91
92impl<'or, 'sub, T, E, OE, OE1>
93 Observable<'or, 'sub, SubjectObservable<PublishSubject<'or, T, E>>, E> for Window<OE, OE1>
94where
95 T: Clone + NecessarySend + 'or,
96 E: Clone + NecessarySend + 'or,
97 OE: Observable<'or, 'sub, T, E>,
98 OE1: Observable<'or, 'sub, (), E>,
99 'sub: 'or,
100{
101 fn subscribe(
102 self,
103 observer: impl Observer<SubjectObservable<PublishSubject<'or, T, E>>, E> + NecessarySend + 'or,
104 ) -> Subscription<'sub> {
105 subscribe_unsub_after_termination(observer, |mut observer| {
106 let subject = PublishSubject::default();
107 observer.on_next(SubjectObservable::new(subject.clone()));
108
109 let subject = Shared::new(Mutable::new(subject));
110 let observer = Shared::new(Mutable::new(Some(observer)));
111 let window_observer = WindowObserver {
112 observer: observer.clone(),
113 subject: subject.clone(),
114 };
115 let boundary_observer = BoundaryObserver { observer, subject };
116 let subscription_1 = self.boundary.subscribe(boundary_observer);
117 let subscription_2 = self.source.subscribe(window_observer);
118 subscription_1 + subscription_2
119 })
120 }
121}
122
123struct WindowObserver<'or, T, E, OR> {
124 observer: Shared<Mutable<Option<OR>>>,
125 subject: Shared<Mutable<PublishSubject<'or, T, E>>>,
126}
127
128impl<'or, T, E, OR> Observer<T, E> for WindowObserver<'or, T, E, OR>
129where
130 T: Clone,
131 E: Clone,
132 OR: Observer<SubjectObservable<PublishSubject<'or, T, E>>, E>,
133{
134 fn on_next(&mut self, value: T) {
135 safe_lock_observer!(on_next: self.subject, value);
136 }
137
138 fn on_termination(self, termination: Termination<E>) {
139 safe_lock!(clone: self.subject).on_termination(termination.clone());
140 safe_lock_option_observer!(on_termination: self.observer, termination);
141 }
142}
143
144struct BoundaryObserver<'or, T, E, OR> {
145 observer: Shared<Mutable<Option<OR>>>,
146 subject: Shared<Mutable<PublishSubject<'or, T, E>>>,
147}
148
149impl<'or, T, E, OR> Observer<(), E> for BoundaryObserver<'or, T, E, OR>
150where
151 T: Clone,
152 E: Clone,
153 OR: Observer<SubjectObservable<PublishSubject<'or, T, E>>, E>,
154{
155 fn on_next(&mut self, _: ()) {
156 let new_subject = PublishSubject::default();
157 let old_subject = safe_lock!(mem_replace: self.subject, new_subject.clone());
158 old_subject.on_termination(Termination::Completed);
159 safe_lock_option_observer!(on_next: self.observer, SubjectObservable::new(new_subject));
160 }
161
162 fn on_termination(self, termination: Termination<E>) {
163 safe_lock!(clone: self.subject).on_termination(termination.clone());
164 safe_lock_option_observer!(on_termination: self.observer, termination);
165 }
166}