rx_rust/operators/backpressure/on_backpressure_latest.rs
1use crate::{
2 disposable::subscription::Subscription,
3 observable::{Observable, observable_ext::ObservableExt},
4 observer::Observer,
5 operators::backpressure::on_backpressure::{OnBackpressure, RequestCallbackType},
6 utils::types::NecessarySendSync,
7};
8use educe::Educe;
9
10/// Keeps only the most recent upstream item while the downstream observer is still processing
11/// the previous one. Once the [`RequestCallbackType`] is invoked, the latest buffered value is
12/// emitted and the cycle repeats. This mirrors `onBackpressureLatest` in other ReactiveX stacks.
13/// See <https://reactivex.io/documentation/operators/backpressure.html>.
14///
15/// # Examples
16/// ```rust
17/// use rx_rust::{
18/// disposable::Disposable,
19/// observable::observable_ext::ObservableExt,
20/// observer::Observer,
21/// operators::backpressure::on_backpressure_latest::OnBackpressureLatest,
22/// subject::publish_subject::PublishSubject,
23/// };
24/// use std::convert::Infallible;
25///
26/// let mut received = Vec::new();
27/// let mut subject = PublishSubject::<_, Infallible>::new();
28/// let observable = OnBackpressureLatest::new(subject.clone());
29///
30/// let subscription = observable.subscribe_with_callback(
31/// |(value, request_callback)| {
32/// received.push(value);
33/// request_callback(); // immediately accept the next value
34/// },
35/// |_| {},
36/// );
37///
38/// subject.on_next(1);
39/// subject.on_next(2);
40/// subject.on_next(3);
41///
42/// subscription.dispose();
43/// drop(subject);
44///
45/// assert_eq!(received, vec![1, 2, 3]);
46/// ```
47#[derive(Educe)]
48#[educe(Debug, Clone)]
49pub struct OnBackpressureLatest<OE> {
50 source: OE,
51}
52
53impl<OE> OnBackpressureLatest<OE> {
54 pub fn new<'or, 'sub, T, E>(source: OE) -> Self
55 where
56 OE: Observable<'or, 'sub, T, E>,
57 {
58 Self { source }
59 }
60}
61
62impl<'or, 'sub, T, E, OE> Observable<'or, 'sub, (T, RequestCallbackType<'or>), E>
63 for OnBackpressureLatest<OE>
64where
65 T: NecessarySendSync + 'or + 'sub,
66 E: NecessarySendSync + 'or + 'sub,
67 OE: Observable<'or, 'sub, T, E>,
68{
69 fn subscribe(
70 self,
71 observer: impl Observer<(T, RequestCallbackType<'or>), E> + NecessarySendSync + 'or,
72 ) -> Subscription<'sub> {
73 OnBackpressure::new(self.source, |buffer, value| {
74 *buffer = Vec::with_capacity(1);
75 buffer.push(value);
76 })
77 .map(|(values, request_callback)| (values.into_iter().next().unwrap(), request_callback))
78 .subscribe(observer)
79 }
80}