rx_rust/operators/utility/
observe_on.rs1use crate::{
2 disposable::{Disposable, boxed_disposal::BoxedDisposal, subscription::Subscription},
3 observable::Observable,
4 observer::{Observer, Termination},
5 safe_lock_option_disposable, safe_lock_option_observer,
6 scheduler::{RecursionAction, Scheduler},
7 utils::types::{MutGuard, Mutable, MutableHelper, NecessarySendSync, Shared},
8};
9use educe::Educe;
10
11#[derive(Educe)]
60#[educe(Debug, Clone)]
61pub struct ObserveOn<OE, S> {
62 source: OE,
63 scheduler: S,
64}
65
66impl<OE, S> ObserveOn<OE, S> {
67 pub fn new(source: OE, scheduler: S) -> Self {
68 Self { source, scheduler }
69 }
70}
71
72impl<'or, 'sub, T, E, OE, S> Observable<'static, 'sub, T, E> for ObserveOn<OE, S>
73where
74 T: NecessarySendSync + 'static,
75 E: NecessarySendSync + 'static,
76 OE: Observable<'or, 'sub, T, E>,
77 S: Scheduler,
78{
79 fn subscribe(
80 self,
81 observer: impl Observer<T, E> + NecessarySendSync + 'static,
82 ) -> Subscription<'sub> {
83 let context = Shared::new(Mutable::new(ObserveOnContext {
84 values: Vec::new(),
85 termination: None,
86 disposal: None,
87 }));
88 let observer = ObserveOnObserver {
89 context: context.clone(),
90 observer: Shared::new(Mutable::new(Some(observer))),
91 scheduler: self.scheduler,
92 };
93 self.source.subscribe(observer) + context
94 }
95}
96
97struct ObserveOnContext<T, E> {
98 values: Vec<T>,
99 termination: Option<Termination<E>>,
100 disposal: Option<BoxedDisposal<'static>>,
101}
102
103impl<T, E> Disposable for Shared<Mutable<ObserveOnContext<T, E>>> {
104 fn dispose(self) {
105 safe_lock_option_disposable!(dispose: self, disposal);
106 }
107}
108
109struct ObserveOnObserver<T, E, OR, S> {
110 context: Shared<Mutable<ObserveOnContext<T, E>>>,
111 observer: Shared<Mutable<Option<OR>>>,
112 scheduler: S,
113}
114
115impl<T, E, OR, S> ObserveOnObserver<T, E, OR, S> {
116 fn setup_scheduler_if_needed(&self, mut lock: MutGuard<'_, ObserveOnContext<T, E>>)
117 where
118 T: NecessarySendSync + 'static,
119 E: NecessarySendSync + 'static,
120 OR: Observer<T, E> + NecessarySendSync + 'static,
121 S: Scheduler,
122 {
123 if lock.disposal.is_some() {
124 return;
125 }
126 let context = self.context.clone();
127 let observer = self.observer.clone();
128 lock.disposal
129 .replace(BoxedDisposal::new(self.scheduler.schedule_recursively(
130 move |_| {
131 context.lock_mut(|mut lock| {
132 let termination = lock.termination.take();
133 let values = std::mem::take(&mut lock.values);
134
135 match (termination, values.is_empty()) {
136 (None, true) => {
137 if let Some(disposal) = lock.disposal.take() {
139 disposal.dispose();
140 }
141 RecursionAction::Stop
142 }
143 (None, false) => {
144 drop(lock);
145 safe_lock_option_observer!(on_next: observer, values: values);
146 RecursionAction::ContinueImmediately
147 }
148 (Some(termination), true) => {
149 drop(lock);
150 safe_lock_option_observer!(on_termination: observer, termination);
151 RecursionAction::Stop
152 }
153 (Some(termination), false) => {
154 drop(lock);
155 match termination {
156 Termination::Completed => {
157 safe_lock_option_observer!(on_next_and_termination: observer, values: values, Termination::Completed);
158 }
159 Termination::Error(_) => {
160 safe_lock_option_observer!(on_termination: observer, termination);
161 }
162 }
163 RecursionAction::Stop
164 }
165 }
166 })
167 },
168 None,
169 )));
170 }
171}
172
173impl<T, E, OR, S> Observer<T, E> for ObserveOnObserver<T, E, OR, S>
174where
175 T: NecessarySendSync + 'static,
176 E: NecessarySendSync + 'static,
177 OR: Observer<T, E> + NecessarySendSync + 'static,
178 S: Scheduler,
179{
180 fn on_next(&mut self, value: T) {
181 self.context.lock_mut(|mut lock| {
182 lock.values.push(value);
183 self.setup_scheduler_if_needed(lock);
184 });
185 }
186
187 fn on_termination(self, termination: Termination<E>) {
188 self.context.lock_mut(|mut lock| {
189 lock.termination.replace(termination);
190 self.setup_scheduler_if_needed(lock);
191 });
192 }
193}