rx_rust/operators/backpressure/
on_backpressure_buffer.rs

1use crate::{
2    disposable::subscription::Subscription,
3    observable::Observable,
4    observer::Observer,
5    operators::backpressure::on_backpressure::{OnBackpressure, RequestCallbackType},
6    utils::types::NecessarySendSync,
7};
8use educe::Educe;
9
10/// Buffers every upstream item into a `Vec<T>` and only delivers the collected batch once
11/// the downstream observer calls the accompanying [`RequestCallbackType`]. This mirrors the
12/// behavior of `onBackpressureBuffer` from other ReactiveX implementations.
13/// See <https://reactivex.io/documentation/operators/backpressure.html>.
14///
15/// # Examples
16/// ```rust
17/// use rx_rust::{
18///     disposable::Disposable,
19///     observable::observable_ext::ObservableExt,
20///     observer::Observer,
21///     operators::backpressure::on_backpressure_buffer::OnBackpressureBuffer,
22///     subject::publish_subject::PublishSubject,
23/// };
24/// use std::convert::Infallible;
25///
26/// let mut received = Vec::new();
27/// let mut subject = PublishSubject::<_, Infallible>::new();
28/// let observable = OnBackpressureBuffer::new(subject.clone());
29///
30/// let subscription = observable.subscribe_with_callback(
31///     |(values, request_callback)| {
32///         received.push(values);
33///         request_callback(); // immediately allow the next batch
34///     },
35///     |_| {},
36/// );
37///
38/// subject.on_next(1);
39/// subject.on_next(2);
40/// subject.on_next(3);
41///
42/// subscription.dispose();
43/// drop(subject);
44///
45/// assert_eq!(received, vec![vec![1], vec![2], vec![3]]);
46/// ```
47#[derive(Educe)]
48#[educe(Debug, Clone)]
49pub struct OnBackpressureBuffer<OE> {
50    source: OE,
51}
52
53impl<OE> OnBackpressureBuffer<OE> {
54    pub fn new<'or, 'sub, T, E>(source: OE) -> Self
55    where
56        OE: Observable<'or, 'sub, T, E>,
57    {
58        Self { source }
59    }
60}
61
62impl<'or, 'sub, T, E, OE> Observable<'or, 'sub, (Vec<T>, RequestCallbackType<'or>), E>
63    for OnBackpressureBuffer<OE>
64where
65    T: NecessarySendSync + 'or + 'sub,
66    E: NecessarySendSync + 'or + 'sub,
67    OE: Observable<'or, 'sub, T, E>,
68{
69    fn subscribe(
70        self,
71        observer: impl Observer<(Vec<T>, RequestCallbackType<'or>), E> + NecessarySendSync + 'or,
72    ) -> Subscription<'sub> {
73        OnBackpressure::new(self.source, |buffer, value| {
74            buffer.push(value);
75        })
76        .subscribe(observer)
77    }
78}