rx_rust/operators/backpressure/on_backpressure_buffer.rs
1use crate::{
2 disposable::subscription::Subscription,
3 observable::Observable,
4 observer::Observer,
5 operators::backpressure::on_backpressure::{OnBackpressure, RequestCallbackType},
6 utils::types::NecessarySendSync,
7};
8use educe::Educe;
9
10/// Buffers every upstream item into a `Vec<T>` and only delivers the collected batch once
11/// the downstream observer calls the accompanying [`RequestCallbackType`]. This mirrors the
12/// behavior of `onBackpressureBuffer` from other ReactiveX implementations.
13/// See <https://reactivex.io/documentation/operators/backpressure.html>.
14///
15/// # Examples
16/// ```rust
17/// use rx_rust::{
18/// disposable::Disposable,
19/// observable::observable_ext::ObservableExt,
20/// observer::Observer,
21/// operators::backpressure::on_backpressure_buffer::OnBackpressureBuffer,
22/// subject::publish_subject::PublishSubject,
23/// };
24/// use std::convert::Infallible;
25///
26/// let mut received = Vec::new();
27/// let mut subject = PublishSubject::<_, Infallible>::new();
28/// let observable = OnBackpressureBuffer::new(subject.clone());
29///
30/// let subscription = observable.subscribe_with_callback(
31/// |(values, request_callback)| {
32/// received.push(values);
33/// request_callback(); // immediately allow the next batch
34/// },
35/// |_| {},
36/// );
37///
38/// subject.on_next(1);
39/// subject.on_next(2);
40/// subject.on_next(3);
41///
42/// subscription.dispose();
43/// drop(subject);
44///
45/// assert_eq!(received, vec![vec![1], vec![2], vec![3]]);
46/// ```
47#[derive(Educe)]
48#[educe(Debug, Clone)]
49pub struct OnBackpressureBuffer<OE> {
50 source: OE,
51}
52
53impl<OE> OnBackpressureBuffer<OE> {
54 pub fn new<'or, 'sub, T, E>(source: OE) -> Self
55 where
56 OE: Observable<'or, 'sub, T, E>,
57 {
58 Self { source }
59 }
60}
61
62impl<'or, 'sub, T, E, OE> Observable<'or, 'sub, (Vec<T>, RequestCallbackType<'or>), E>
63 for OnBackpressureBuffer<OE>
64where
65 T: NecessarySendSync + 'or + 'sub,
66 E: NecessarySendSync + 'or + 'sub,
67 OE: Observable<'or, 'sub, T, E>,
68{
69 fn subscribe(
70 self,
71 observer: impl Observer<(Vec<T>, RequestCallbackType<'or>), E> + NecessarySendSync + 'or,
72 ) -> Subscription<'sub> {
73 OnBackpressure::new(self.source, |buffer, value| {
74 buffer.push(value);
75 })
76 .subscribe(observer)
77 }
78}