rx_rust/operators/utility/
delay.rs1use crate::disposable::Disposable;
2use crate::disposable::boxed_disposal::BoxedDisposal;
3use crate::disposable::subscription::Subscription;
4use crate::scheduler::RecursionAction;
5use crate::utils::types::{Mutable, MutableHelper, NecessarySendSync, Shared};
6use crate::{
7 observable::Observable,
8 observer::{Observer, Termination},
9 scheduler::Scheduler,
10};
11use crate::{safe_lock_option, safe_lock_option_disposable, safe_lock_option_observer};
12use educe::Educe;
13use std::{
14 collections::VecDeque,
15 time::{Duration, Instant},
16};
17
18#[derive(Educe)]
74#[educe(Debug, Clone)]
75pub struct Delay<OE, S> {
76 source: OE,
77 delay: Duration,
78 scheduler: S,
79}
80
81impl<OE, S> Delay<OE, S> {
82 pub fn new(source: OE, delay: Duration, scheduler: S) -> Self {
83 Self {
84 source,
85 delay,
86 scheduler,
87 }
88 }
89}
90
91impl<'or, 'sub, T, E, OE, S> Observable<'static, 'sub, T, E> for Delay<OE, S>
92where
93 T: NecessarySendSync + 'static,
94 OE: Observable<'or, 'sub, T, E>,
95 S: Scheduler,
96{
97 fn subscribe(
98 self,
99 observer: impl Observer<T, E> + NecessarySendSync + 'static,
100 ) -> Subscription<'sub> {
101 let context = Shared::new(Mutable::new(DelayContext {
102 values: VecDeque::new(),
103 timer: None,
104 }));
105 let delay_observer = DelayObserver {
106 delay: self.delay,
107 scheduler: self.scheduler,
108 context: context.clone(),
109 observer: Shared::new(Mutable::new(Some(observer))),
110 };
111 self.source.subscribe(delay_observer) + context
112 }
113}
114
115struct DelayContext<T> {
116 values: VecDeque<(Instant, Option<T>)>, timer: Option<BoxedDisposal<'static>>,
118}
119
120impl<T> Disposable for Shared<Mutable<DelayContext<T>>> {
121 fn dispose(self) {
122 safe_lock_option_disposable!(dispose: self, timer);
123 }
124}
125
126struct DelayObserver<T, OR, S> {
127 delay: Duration,
128 scheduler: S,
129 context: Shared<Mutable<DelayContext<T>>>,
130 observer: Shared<Mutable<Option<OR>>>, }
132
133impl<T, OR, S> DelayObserver<T, OR, S> {
134 fn emit_value_and_setup_timer_if_needed<E>(&self, value: Option<T>)
135 where
136 T: NecessarySendSync + 'static,
137 OR: Observer<T, E> + NecessarySendSync + 'static,
138 S: Scheduler,
139 {
140 self.context.lock_mut(|mut lock| {
141 lock.values.push_back((Instant::now() + self.delay, value));
142 if lock.timer.is_some() {
143 return;
144 }
145 let context = self.context.clone();
146 let observer = self.observer.clone();
147 lock.timer = Some(BoxedDisposal::new(self.scheduler.schedule_recursively(
148 move |_| {
149 let (values, completed) = context.lock_mut(|mut lock| {
151 let mut values = Vec::new();
152 let mut completed = false;
153 let now = Instant::now();
154 while let Some((instant, _)) = lock.values.front() {
155 if now < *instant {
156 break;
157 }
158 let value = lock.values.pop_front().unwrap().1;
159 if let Some(value) = value {
160 values.push(value);
161 } else {
162 completed = true;
163 break;
164 }
165 }
166 (values, completed)
167 });
168
169 if completed {
170 safe_lock_option_observer!(on_next_and_termination: observer, values: values, Termination::Completed);
171 RecursionAction::Stop
172 } else {
173 safe_lock_option_observer!(on_next: observer, values: values);
174 context.lock_mut(|mut lock| {
175 if let Some((next_instant, _)) = lock.values.front() {
176 RecursionAction::ContinueAt(*next_instant)
178 } else {
179 if let Some(timer) = lock.timer.take() {
181 drop(lock);
182 timer.dispose();
183 }
184 RecursionAction::Stop
185 }
186 })
187 }
188 },
189 Some(self.delay),
190 )));
191 });
192 }
193}
194
195impl<T, E, OR, S> Observer<T, E> for DelayObserver<T, OR, S>
196where
197 T: NecessarySendSync + 'static,
198 OR: Observer<T, E> + NecessarySendSync + 'static,
199 S: Scheduler,
200{
201 fn on_next(&mut self, value: T) {
202 self.emit_value_and_setup_timer_if_needed(Some(value));
203 }
204
205 fn on_termination(self, termination: Termination<E>) {
206 match termination {
207 Termination::Completed => {
208 self.emit_value_and_setup_timer_if_needed(None);
209 }
210 Termination::Error(_) => {
211 self.context.dispose();
212 safe_lock_option_observer!(on_termination: self.observer, termination);
213 }
214 }
215 }
216}