rx_rust/operators/transforming/
buffer_with_time.rs

1use crate::disposable::Disposable;
2use crate::disposable::boxed_disposal::BoxedDisposal;
3use crate::utils::types::{Mutable, MutableHelper, NecessarySendSync, Shared};
4use crate::{
5    disposable::subscription::Subscription,
6    observable::Observable,
7    observer::{Observer, Termination},
8    scheduler::Scheduler,
9};
10use crate::{
11    safe_lock, safe_lock_option, safe_lock_option_disposable, safe_lock_option_observer,
12    safe_lock_vec,
13};
14use educe::Educe;
15use std::time::Duration;
16
17/// Periodically gathers items from an Observable into bundles and emits these bundles as `Vec<T>`, after a specified time interval.
18/// See <https://reactivex.io/documentation/operators/buffer.html>
19///
20/// # Examples
21/// ```rust
22/// # #[cfg(not(feature = "tokio-scheduler"))]
23/// # fn main() {
24/// #     panic!("Use tokio-scheduler feature to run tests.");
25/// # }
26/// # #[cfg(feature = "tokio-scheduler")]
27/// #[tokio::main]
28/// async fn main() {
29///     use rx_rust::{
30///         observable::observable_ext::ObservableExt,
31///         observer::Termination,
32///         operators::{
33///             creating::from_iter::FromIter,
34///             transforming::buffer_with_time::BufferWithTime,
35///         },
36///     };
37///     use std::sync::{Arc, Mutex};
38///     use std::time::Duration;
39///     use tokio::time::sleep;
40///
41///     let handle = tokio::runtime::Handle::current();
42///     let values = Arc::new(Mutex::new(Vec::new()));
43///     let terminations = Arc::new(Mutex::new(Vec::new()));
44///     let values_observer = Arc::clone(&values);
45///     let terminations_observer = Arc::clone(&terminations);
46///
47///     let subscription = BufferWithTime::new(
48///         FromIter::new(vec![1, 2, 3]),
49///         Duration::from_millis(5),
50///         handle.clone(),
51///         None,
52///     )
53///     .subscribe_with_callback(
54///         move |value| values_observer.lock().unwrap().push(value),
55///         move |termination| terminations_observer
56///             .lock()
57///             .unwrap()
58///             .push(termination),
59///     );
60///
61///     sleep(Duration::from_millis(10)).await;
62///     drop(subscription);
63///
64///     assert_eq!(&*values.lock().unwrap(), &[vec![1, 2, 3]]);
65///     assert_eq!(
66///         &*terminations.lock().unwrap(),
67///         &[Termination::Completed]
68///     );
69/// }
70/// ```
71#[derive(Educe)]
72#[educe(Debug, Clone)]
73pub struct BufferWithTime<OE, S> {
74    source: OE,
75    time_span: Duration,
76    scheduler: S,
77    delay: Option<Duration>,
78}
79
80impl<OE, S> BufferWithTime<OE, S> {
81    pub fn new(source: OE, time_span: Duration, scheduler: S, delay: Option<Duration>) -> Self {
82        Self {
83            source,
84            time_span,
85            scheduler,
86            delay,
87        }
88    }
89}
90
91impl<'or, 'sub, T, E, OE, S> Observable<'static, 'sub, Vec<T>, E> for BufferWithTime<OE, S>
92where
93    T: NecessarySendSync + 'static,
94    OE: Observable<'or, 'sub, T, E>,
95    S: Scheduler,
96{
97    fn subscribe(
98        self,
99        observer: impl Observer<Vec<T>, E> + NecessarySendSync + 'static,
100    ) -> Subscription<'sub> {
101        let observer = Shared::new(Mutable::new(Some(observer)));
102        let context = Shared::new(Mutable::new(BufferWithTimeContext {
103            values: Vec::new(),
104            timer: None,
105        }));
106        let sub = self.source.subscribe(BufferWithTimeObserver {
107            observer: observer.clone(),
108            context: context.clone(),
109        });
110        let context_cloned = context.clone();
111        let disposal = self.scheduler.schedule_periodically(
112            move |_| {
113                let values = safe_lock!(mem_take: context_cloned, values);
114                !safe_lock_option_observer!(on_next: observer, values)
115            },
116            self.time_span,
117            self.delay,
118        );
119        safe_lock_option!(replace: context, timer, BoxedDisposal::new(disposal));
120        sub + context
121    }
122}
123
124struct BufferWithTimeContext<T> {
125    values: Vec<T>,
126    timer: Option<BoxedDisposal<'static>>,
127}
128
129impl<T> Disposable for Shared<Mutable<BufferWithTimeContext<T>>> {
130    fn dispose(self) {
131        safe_lock_option_disposable!(dispose: self, timer);
132    }
133}
134
135struct BufferWithTimeObserver<T, OR> {
136    observer: Shared<Mutable<Option<OR>>>,
137    context: Shared<Mutable<BufferWithTimeContext<T>>>,
138}
139
140impl<T, E, OR> Observer<T, E> for BufferWithTimeObserver<T, OR>
141where
142    OR: Observer<Vec<T>, E>,
143{
144    fn on_next(&mut self, value: T) {
145        safe_lock_vec!(push: self.context, values, value);
146    }
147
148    fn on_termination(self, termination: Termination<E>) {
149        let values = self.context.lock_mut(|mut lock| {
150            if let Some(timer) = lock.timer.take() {
151                timer.dispose();
152            }
153            std::mem::take(&mut lock.values)
154        });
155        match termination {
156            Termination::Completed => {
157                if !values.is_empty() {
158                    safe_lock_option_observer!(on_next_and_termination: self.observer, values, termination);
159                } else {
160                    safe_lock_option_observer!(on_termination: self.observer, termination);
161                }
162            }
163            Termination::Error(_) => {
164                safe_lock_option_observer!(on_termination: self.observer, termination);
165            }
166        }
167    }
168}