rx_rust/operators/utility/
timeout.rs1use crate::{
2 disposable::{
3 Disposable, boxed_disposal::BoxedDisposal, callback_disposal::CallbackDisposal,
4 subscription::Subscription,
5 },
6 observable::Observable,
7 observer::{Observer, Termination},
8 safe_lock, safe_lock_option_observer,
9 scheduler::Scheduler,
10 utils::{
11 types::{Mutable, MutableHelper, NecessarySendSync, Shared},
12 unsub_after_termination::subscribe_unsub_after_termination,
13 },
14};
15use educe::Educe;
16use std::time::Duration;
17
18#[derive(Educe)]
19#[educe(Debug, Clone, PartialEq, Eq)]
20pub enum Error<E> {
21 Timeout,
22 SourceError(E),
23}
24
25#[derive(Educe)]
74#[educe(Debug, Clone)]
75pub struct Timeout<OE, S> {
76 source: OE,
77 duration: Duration,
78 scheduler: S,
79}
80
81impl<OE, S> Timeout<OE, S> {
82 pub fn new(source: OE, duration: Duration, scheduler: S) -> Self {
83 Self {
84 source,
85 duration,
86 scheduler,
87 }
88 }
89}
90
91impl<'or, 'sub, T, E, OE, S> Observable<'static, 'sub, T, Error<E>> for Timeout<OE, S>
92where
93 OE: Observable<'or, 'static, T, E>,
94 S: Scheduler,
95{
96 fn subscribe(
97 self,
98 observer: impl Observer<T, Error<E>> + NecessarySendSync + 'static,
99 ) -> Subscription<'static> {
100 subscribe_unsub_after_termination(observer, |observer| {
101 let context = Shared::new(Mutable::new(TimeoutContext {
102 timer_state: TimerState::Initialized,
103 version: 0,
104 }));
105 let observer = Shared::new(Mutable::new(Some(observer)));
106 let timeout_observer = TimeoutObserver {
107 observer: observer.clone(),
108 duration: self.duration,
109 scheduler: self.scheduler.clone(),
110 context: context.clone(),
111 };
112
113 let sub = self.source.subscribe(timeout_observer);
114 let timer = create_timer(
115 0,
116 observer.clone(),
117 self.duration,
118 self.scheduler.clone(),
119 context.clone(),
120 );
121 let timer_state =
122 safe_lock!(mem_replace: context, timer_state, TimerState::Scheduled(timer));
123 match timer_state {
124 TimerState::Initialized => {} TimerState::Scheduled(_) => unreachable!(),
126 TimerState::DidTimeout => {} TimerState::Disposed => unreachable!(),
128 }
129 sub + context
130 })
131 }
132}
133
134enum TimerState {
135 Initialized,
136 Scheduled(BoxedDisposal<'static>),
137 DidTimeout,
138 Disposed,
139}
140
141struct TimeoutContext {
142 timer_state: TimerState,
143 version: usize,
144}
145
146impl Disposable for Shared<Mutable<TimeoutContext>> {
147 fn dispose(self) {
148 let timer_state = safe_lock!(mem_replace: self, timer_state, TimerState::Disposed);
149 match timer_state {
150 TimerState::Initialized => unreachable!(),
151 TimerState::Scheduled(disposal) => disposal.dispose(), TimerState::DidTimeout => {} TimerState::Disposed => unreachable!(),
154 }
155 }
156}
157
158struct TimeoutObserver<OR, S> {
159 observer: Shared<Mutable<Option<OR>>>,
160 duration: Duration,
161 scheduler: S,
162 context: Shared<Mutable<TimeoutContext>>,
163}
164
165impl<T, E, OR, S> Observer<T, E> for TimeoutObserver<OR, S>
166where
167 OR: Observer<T, Error<E>> + NecessarySendSync + 'static,
168 S: Scheduler,
169{
170 fn on_next(&mut self, value: T) {
171 self.context
172 .lock_mut(|mut lock| match &mut lock.timer_state {
173 TimerState::Initialized => {
174 drop(lock);
175 safe_lock_option_observer!(on_next: self.observer, value);
176 }
177 TimerState::Scheduled(disposal) => {
178 let disposal = std::mem::replace(
180 disposal,
181 BoxedDisposal::new(CallbackDisposal::new(|| {})), );
183 disposal.dispose();
184
185 lock.version += 1;
187 let timer = create_timer(
188 lock.version,
189 self.observer.clone(),
190 self.duration,
191 self.scheduler.clone(),
192 self.context.clone(),
193 );
194 lock.timer_state = TimerState::Scheduled(timer);
195
196 drop(lock);
198 safe_lock_option_observer!(on_next: self.observer, value);
199 }
200 TimerState::DidTimeout => {}
201 TimerState::Disposed => {}
202 });
203 }
204
205 fn on_termination(self, termination: Termination<E>) {
206 match termination {
207 Termination::Completed => {
208 safe_lock_option_observer!(on_termination: self.observer, Termination::Completed);
209 }
210 Termination::Error(error) => {
211 safe_lock_option_observer!(on_termination: self.observer, Termination::Error(Error::SourceError(error)));
212 }
213 }
214 }
215}
216
217fn create_timer<T, E, OR, S>(
218 version: usize,
219 observer: Shared<Mutable<Option<OR>>>,
220 duration: Duration,
221 scheduler: S,
222 context: Shared<Mutable<TimeoutContext>>,
223) -> BoxedDisposal<'static>
224where
225 OR: Observer<T, Error<E>> + NecessarySendSync + 'static,
226 S: Scheduler,
227{
228 BoxedDisposal::new(scheduler.schedule(
229 move || {
230 context.lock_mut(|mut lock| {
231 if lock.version != version {
232 return;
234 }
235 let timer_state = std::mem::replace(&mut lock.timer_state, TimerState::DidTimeout);
237 drop(lock);
238 safe_lock_option_observer!(on_termination: observer, Termination::Error(Error::Timeout));
239 match timer_state {
240 TimerState::Initialized => {} TimerState::Scheduled(disposal) => disposal.dispose(), TimerState::DidTimeout => unreachable!(),
243 TimerState::Disposed => {},
244 }
245 });
246 },
247 Some(duration),
248 ))
249}