rx_rust/operators/transforming/
buffer_with_count.rs

1use crate::utils::types::NecessarySendSync;
2use crate::{
3    disposable::subscription::Subscription,
4    observable::Observable,
5    observer::{Observer, Termination},
6};
7use educe::Educe;
8use std::num::NonZeroUsize;
9
10/// Periodically gathers items from an Observable into bundles and emits these bundles as `Vec<T>`, when the bundle reaches a specified size.
11/// See <https://reactivex.io/documentation/operators/buffer.html>
12///
13/// # Examples
14/// ```rust
15/// use rx_rust::{
16///     observable::observable_ext::ObservableExt,
17///     observer::Termination,
18///     operators::{
19///         creating::from_iter::FromIter,
20///         transforming::buffer_with_count::BufferWithCount,
21///     },
22/// };
23/// use std::num::NonZeroUsize;
24///
25/// let mut values = Vec::new();
26/// let mut terminations = Vec::new();
27///
28/// let observable = BufferWithCount::new(FromIter::new(vec![1, 2, 3, 4]), NonZeroUsize::new(3).unwrap());
29/// observable.subscribe_with_callback(
30///     |value| values.push(value),
31///     |termination| terminations.push(termination),
32/// );
33///
34/// assert_eq!(values, vec![vec![1, 2, 3], vec![4]]);
35/// assert_eq!(terminations, vec![Termination::Completed]);
36/// ```
37#[derive(Educe)]
38#[educe(Debug, Clone)]
39pub struct BufferWithCount<OE> {
40    source: OE,
41    count: NonZeroUsize,
42}
43
44impl<OE> BufferWithCount<OE> {
45    pub fn new(source: OE, count: NonZeroUsize) -> Self {
46        Self { source, count }
47    }
48}
49
50impl<'or, 'sub, T, E, OE> Observable<'or, 'sub, Vec<T>, E> for BufferWithCount<OE>
51where
52    T: NecessarySendSync + 'or,
53    OE: Observable<'or, 'sub, T, E>,
54{
55    fn subscribe(
56        self,
57        observer: impl Observer<Vec<T>, E> + NecessarySendSync + 'or,
58    ) -> Subscription<'sub> {
59        let observer = BufferWithCountObserver {
60            observer,
61            values: Vec::default(),
62            count: self.count,
63        };
64        self.source.subscribe(observer)
65    }
66}
67
68struct BufferWithCountObserver<T, OR> {
69    observer: OR,
70    values: Vec<T>,
71    count: NonZeroUsize,
72}
73
74impl<T, E, OR> Observer<T, E> for BufferWithCountObserver<T, OR>
75where
76    OR: Observer<Vec<T>, E>,
77{
78    fn on_next(&mut self, value: T) {
79        self.values.push(value);
80        if self.values.len() >= self.count.get() {
81            self.observer.on_next(std::mem::take(&mut self.values));
82        }
83    }
84
85    fn on_termination(mut self, termination: Termination<E>) {
86        match termination {
87            Termination::Completed => {
88                if !self.values.is_empty() {
89                    self.observer.on_next(std::mem::take(&mut self.values));
90                }
91            }
92            Termination::Error(_) => {}
93        }
94        self.observer.on_termination(termination);
95    }
96}