rx_rust/operators/backpressure/
on_backpressure_latest.rs

1use crate::{
2    disposable::subscription::Subscription,
3    observable::{Observable, observable_ext::ObservableExt},
4    observer::Observer,
5    operators::backpressure::on_backpressure::{OnBackpressure, RequestCallbackType},
6    utils::types::NecessarySendSync,
7};
8use educe::Educe;
9
10/// Keeps only the most recent upstream item while the downstream observer is still processing
11/// the previous one. Once the [`RequestCallbackType`] is invoked, the latest buffered value is
12/// emitted and the cycle repeats. This mirrors `onBackpressureLatest` in other ReactiveX stacks.
13/// See <https://reactivex.io/documentation/operators/backpressure.html>.
14///
15/// # Examples
16/// ```rust
17/// use rx_rust::{
18///     disposable::Disposable,
19///     observable::observable_ext::ObservableExt,
20///     observer::Observer,
21///     operators::backpressure::on_backpressure_latest::OnBackpressureLatest,
22///     subject::publish_subject::PublishSubject,
23/// };
24/// use std::convert::Infallible;
25///
26/// let mut received = Vec::new();
27/// let mut subject = PublishSubject::<_, Infallible>::new();
28/// let observable = OnBackpressureLatest::new(subject.clone());
29///
30/// let subscription = observable.subscribe_with_callback(
31///     |(value, request_callback)| {
32///         received.push(value);
33///         request_callback(); // immediately accept the next value
34///     },
35///     |_| {},
36/// );
37///
38/// subject.on_next(1);
39/// subject.on_next(2);
40/// subject.on_next(3);
41///
42/// subscription.dispose();
43/// drop(subject);
44///
45/// assert_eq!(received, vec![1, 2, 3]);
46/// ```
47#[derive(Educe)]
48#[educe(Debug, Clone)]
49pub struct OnBackpressureLatest<OE> {
50    source: OE,
51}
52
53impl<OE> OnBackpressureLatest<OE> {
54    pub fn new<'or, 'sub, T, E>(source: OE) -> Self
55    where
56        OE: Observable<'or, 'sub, T, E>,
57    {
58        Self { source }
59    }
60}
61
62impl<'or, 'sub, T, E, OE> Observable<'or, 'sub, (T, RequestCallbackType<'or>), E>
63    for OnBackpressureLatest<OE>
64where
65    T: NecessarySendSync + 'or + 'sub,
66    E: NecessarySendSync + 'or + 'sub,
67    OE: Observable<'or, 'sub, T, E>,
68{
69    fn subscribe(
70        self,
71        observer: impl Observer<(T, RequestCallbackType<'or>), E> + NecessarySendSync + 'or,
72    ) -> Subscription<'sub> {
73        OnBackpressure::new(self.source, |buffer, value| {
74            *buffer = Vec::with_capacity(1);
75            buffer.push(value);
76        })
77        .map(|(values, request_callback)| (values.into_iter().next().unwrap(), request_callback))
78        .subscribe(observer)
79    }
80}