rx_rust/operators/transforming/
buffer_with_count.rs1use crate::utils::types::NecessarySendSync;
2use crate::{
3 disposable::subscription::Subscription,
4 observable::Observable,
5 observer::{Observer, Termination},
6};
7use educe::Educe;
8use std::num::NonZeroUsize;
9
10#[derive(Educe)]
38#[educe(Debug, Clone)]
39pub struct BufferWithCount<OE> {
40 source: OE,
41 count: NonZeroUsize,
42}
43
44impl<OE> BufferWithCount<OE> {
45 pub fn new(source: OE, count: NonZeroUsize) -> Self {
46 Self { source, count }
47 }
48}
49
50impl<'or, 'sub, T, E, OE> Observable<'or, 'sub, Vec<T>, E> for BufferWithCount<OE>
51where
52 T: NecessarySendSync + 'or,
53 OE: Observable<'or, 'sub, T, E>,
54{
55 fn subscribe(
56 self,
57 observer: impl Observer<Vec<T>, E> + NecessarySendSync + 'or,
58 ) -> Subscription<'sub> {
59 let observer = BufferWithCountObserver {
60 observer,
61 values: Vec::default(),
62 count: self.count,
63 };
64 self.source.subscribe(observer)
65 }
66}
67
68struct BufferWithCountObserver<T, OR> {
69 observer: OR,
70 values: Vec<T>,
71 count: NonZeroUsize,
72}
73
74impl<T, E, OR> Observer<T, E> for BufferWithCountObserver<T, OR>
75where
76 OR: Observer<Vec<T>, E>,
77{
78 fn on_next(&mut self, value: T) {
79 self.values.push(value);
80 if self.values.len() >= self.count.get() {
81 self.observer.on_next(std::mem::take(&mut self.values));
82 }
83 }
84
85 fn on_termination(mut self, termination: Termination<E>) {
86 match termination {
87 Termination::Completed => {
88 if !self.values.is_empty() {
89 self.observer.on_next(std::mem::take(&mut self.values));
90 }
91 }
92 Termination::Error(_) => {}
93 }
94 self.observer.on_termination(termination);
95 }
96}