rx_rust/operators/transforming/
window.rs1use crate::utils::types::{Mutable, NecessarySendSync, Shared};
2use crate::{
3 disposable::subscription::Subscription,
4 observable::Observable,
5 observer::{Observer, Termination},
6 subject::{publish_subject::PublishSubject, subject_observable::SubjectObservable},
7 utils::unsub_after_termination::subscribe_unsub_after_termination,
8};
9use crate::{safe_lock, safe_lock_observer, safe_lock_option_observer};
10use educe::Educe;
11
12#[derive(Educe)]
76#[educe(Debug, Clone)]
77pub struct Window<OE, OE1> {
78 source: OE,
79 boundary: OE1,
80}
81
82impl<OE, OE1> Window<OE, OE1> {
83 pub fn new<'or, 'sub, T, E>(source: OE, boundary: OE1) -> Self
84 where
85 OE: Observable<'or, 'sub, T, E>,
86 OE1: Observable<'or, 'sub, (), E>,
87 {
88 Self { source, boundary }
89 }
90}
91
92impl<'or, 'sub, T, E, OE, OE1>
93 Observable<'or, 'sub, SubjectObservable<PublishSubject<'or, T, E>>, E> for Window<OE, OE1>
94where
95 T: Clone + NecessarySendSync + 'or,
96 E: Clone + NecessarySendSync + 'or,
97 OE: Observable<'or, 'sub, T, E>,
98 OE1: Observable<'or, 'sub, (), E>,
99 'sub: 'or,
100{
101 fn subscribe(
102 self,
103 observer: impl Observer<SubjectObservable<PublishSubject<'or, T, E>>, E>
104 + NecessarySendSync
105 + 'or,
106 ) -> Subscription<'sub> {
107 subscribe_unsub_after_termination(observer, |mut observer| {
108 let subject = PublishSubject::default();
109 observer.on_next(SubjectObservable::new(subject.clone()));
110
111 let subject = Shared::new(Mutable::new(subject));
112 let observer = Shared::new(Mutable::new(Some(observer)));
113 let window_observer = WindowObserver {
114 observer: observer.clone(),
115 subject: subject.clone(),
116 };
117 let boundary_observer = BoundaryObserver { observer, subject };
118 let subscription_1 = self.boundary.subscribe(boundary_observer);
119 let subscription_2 = self.source.subscribe(window_observer);
120 subscription_1 + subscription_2
121 })
122 }
123}
124
125struct WindowObserver<'or, T, E, OR> {
126 observer: Shared<Mutable<Option<OR>>>,
127 subject: Shared<Mutable<PublishSubject<'or, T, E>>>,
128}
129
130impl<'or, T, E, OR> Observer<T, E> for WindowObserver<'or, T, E, OR>
131where
132 T: Clone,
133 E: Clone,
134 OR: Observer<SubjectObservable<PublishSubject<'or, T, E>>, E>,
135{
136 fn on_next(&mut self, value: T) {
137 safe_lock_observer!(on_next: self.subject, value);
138 }
139
140 fn on_termination(self, termination: Termination<E>) {
141 safe_lock!(clone: self.subject).on_termination(termination.clone());
142 safe_lock_option_observer!(on_termination: self.observer, termination);
143 }
144}
145
146struct BoundaryObserver<'or, T, E, OR> {
147 observer: Shared<Mutable<Option<OR>>>,
148 subject: Shared<Mutable<PublishSubject<'or, T, E>>>,
149}
150
151impl<'or, T, E, OR> Observer<(), E> for BoundaryObserver<'or, T, E, OR>
152where
153 T: Clone,
154 E: Clone,
155 OR: Observer<SubjectObservable<PublishSubject<'or, T, E>>, E>,
156{
157 fn on_next(&mut self, _: ()) {
158 let new_subject = PublishSubject::default();
159 let old_subject = safe_lock!(mem_replace: self.subject, new_subject.clone());
160 old_subject.on_termination(Termination::Completed);
161 safe_lock_option_observer!(on_next: self.observer, SubjectObservable::new(new_subject));
162 }
163
164 fn on_termination(self, termination: Termination<E>) {
165 safe_lock!(clone: self.subject).on_termination(termination.clone());
166 safe_lock_option_observer!(on_termination: self.observer, termination);
167 }
168}