rx_rust/operators/backpressure/
on_backpressure.rs

1use crate::disposable::Disposable;
2use crate::utils::types::{MutGuard, Mutable, MutableHelper, NecessarySendSync, Shared};
3use crate::{
4    disposable::subscription::Subscription,
5    observable::Observable,
6    observer::{Observer, Termination},
7};
8use crate::{safe_lock_option, safe_lock_option_observer};
9use educe::Educe;
10
11cfg_if::cfg_if! {
12    if #[cfg(feature = "single-threaded")] {
13        /// Callback handed to the downstream observer so it can request the next chunk
14        /// once it finishes processing the current batch.
15        pub type RequestCallbackType<'cb> = Box<dyn FnOnce() + 'cb>;
16    } else {
17        /// Callback handed to the downstream observer so it can request the next chunk
18        /// once it finishes processing the current batch.
19        pub type RequestCallbackType<'cb> = Box<dyn FnOnce() + Send + Sync + 'cb>;
20    }
21}
22
23/// Low-level primitive that converts a fast upstream into demand-driven chunks by
24/// accumulating values with a custom `receiving_strategy` and emitting them alongside a
25/// [`RequestCallbackType`]. Downstream observers must invoke the callback to resume the
26/// upstream flow. See <https://reactivex.io/documentation/operators/backpressure.html>.
27///
28/// # Examples
29/// ```rust
30/// use rx_rust::{
31///     disposable::Disposable,
32///     observable::observable_ext::ObservableExt,
33///     observer::Observer,
34///     operators::backpressure::on_backpressure::OnBackpressure,
35///     subject::publish_subject::PublishSubject,
36/// };
37/// use std::convert::Infallible;
38///
39/// let mut received = Vec::new();
40/// let mut subject = PublishSubject::<_, Infallible>::new();
41/// let observable = OnBackpressure::new(subject.clone(), |buffer, value| buffer.push(value));
42///
43/// let subscription = observable.subscribe_with_callback(
44///     |(values, request_callback)| {
45///         received.push(values);
46///         request_callback(); // ready for the next batch
47///     },
48///     |_| {},
49/// );
50///
51/// subject.on_next(1);
52/// subject.on_next(2);
53/// subject.on_next(3);
54///
55/// subscription.dispose();
56/// drop(subject);
57///
58/// assert_eq!(received, vec![vec![1], vec![2], vec![3]]);
59/// ```
60#[derive(Educe)]
61#[educe(Debug, Clone)]
62pub struct OnBackpressure<OE, F> {
63    source: OE,
64    receiving_strategy: F,
65}
66
67impl<OE, F> OnBackpressure<OE, F> {
68    pub fn new<'or, 'sub, T, E>(source: OE, receiving_strategy: F) -> Self
69    where
70        OE: Observable<'or, 'sub, T, E>,
71        F: FnMut(&mut Vec<T>, T),
72    {
73        Self {
74            source,
75            receiving_strategy,
76        }
77    }
78}
79
80impl<'or, 'sub, T, E, OE, F> Observable<'or, 'sub, (Vec<T>, RequestCallbackType<'or>), E>
81    for OnBackpressure<OE, F>
82where
83    T: NecessarySendSync + 'or + 'sub,
84    E: NecessarySendSync + 'or + 'sub,
85    OE: Observable<'or, 'sub, T, E>,
86    F: FnMut(&mut Vec<T>, T) + NecessarySendSync + 'or,
87{
88    fn subscribe(
89        self,
90        observer: impl Observer<(Vec<T>, RequestCallbackType<'or>), E> + NecessarySendSync + 'or,
91    ) -> Subscription<'sub> {
92        let context = Shared::new(Mutable::new(OnBackpressureContext {
93            buffer: Some(Vec::new()),
94            termination: None,
95            emit_directly: true,
96        }));
97        let observer = OnBackpressureObserver {
98            observer: Shared::new(Mutable::new(Some(observer))),
99            context: context.clone(),
100            receiving_strategy: self.receiving_strategy,
101        };
102        self.source.subscribe(observer) + context
103    }
104}
105
106struct OnBackpressureContext<T, E> {
107    buffer: Option<Vec<T>>, // None means the subscription is disposed
108    termination: Option<Termination<E>>,
109    emit_directly: bool,
110}
111
112impl<T, E> Disposable for Shared<Mutable<OnBackpressureContext<T, E>>> {
113    fn dispose(self) {
114        safe_lock_option!(take: self, buffer);
115    }
116}
117
118struct OnBackpressureObserver<T, E, OR, F> {
119    observer: Shared<Mutable<Option<OR>>>,
120    context: Shared<Mutable<OnBackpressureContext<T, E>>>,
121    receiving_strategy: F,
122}
123
124impl<'cb, T, E, OR, F> Observer<T, E> for OnBackpressureObserver<T, E, OR, F>
125where
126    T: NecessarySendSync + 'cb,
127    E: NecessarySendSync + 'cb,
128    OR: Observer<(Vec<T>, RequestCallbackType<'cb>), E> + NecessarySendSync + 'cb,
129    F: FnMut(&mut Vec<T>, T),
130{
131    fn on_next(&mut self, value: T) {
132        self.context.lock_mut(|mut lock| {
133            if let Some(buffer) = &mut lock.buffer {
134                (self.receiving_strategy)(buffer, value);
135                if lock.emit_directly {
136                    emit(Some(lock), self.observer.clone(), self.context.clone());
137                }
138            }
139        });
140    }
141
142    fn on_termination(self, termination: Termination<E>) {
143        self.context.lock_mut(|mut lock| {
144            if lock.buffer.is_none() {
145                return;
146            }
147            lock.termination = Some(termination);
148            if lock.emit_directly {
149                emit(Some(lock), self.observer.clone(), self.context.clone());
150            }
151        });
152    }
153}
154
155fn emit<'cb, T, E, OR>(
156    lock: Option<MutGuard<'_, OnBackpressureContext<T, E>>>,
157    observer: Shared<Mutable<Option<OR>>>,
158    context: Shared<Mutable<OnBackpressureContext<T, E>>>,
159) where
160    T: NecessarySendSync + 'cb,
161    E: NecessarySendSync + 'cb,
162    OR: Observer<(Vec<T>, RequestCallbackType<'cb>), E> + NecessarySendSync + 'cb,
163{
164    let implementation = |mut lock: MutGuard<'_, OnBackpressureContext<T, E>>| {
165        if let Some(buffer) = &mut lock.buffer {
166            if buffer.is_empty() {
167                if let Some(termination) = lock.termination.take() {
168                    // OnTermination
169                    drop(lock);
170                    safe_lock_option_observer!(on_termination: observer, termination);
171                } else {
172                    // This code is calling from request callback. Emit directly for next value
173                    lock.emit_directly = true;
174                    drop(lock);
175                }
176            } else {
177                // OnNext
178                let values = std::mem::take(buffer);
179                lock.emit_directly = false;
180                drop(lock);
181                let observer_cloned = observer.clone();
182                let context_cloned = context.clone();
183                let callback: RequestCallbackType = Box::new(move || {
184                    emit(None, observer_cloned, context_cloned);
185                });
186                safe_lock_option_observer!(on_next: observer, (values, callback));
187            }
188        } else {
189            // The subscription is disposed
190        }
191    };
192    if let Some(lock) = lock {
193        implementation(lock);
194    } else {
195        context.lock_mut(implementation);
196    }
197}