rx_rust/operators/transforming/
buffer_with_time.rs1use crate::disposable::Disposable;
2use crate::disposable::boxed_disposal::BoxedDisposal;
3use crate::utils::types::{Mutable, MutableHelper, NecessarySendSync, Shared};
4use crate::{
5 disposable::subscription::Subscription,
6 observable::Observable,
7 observer::{Observer, Termination},
8 scheduler::Scheduler,
9};
10use crate::{
11 safe_lock, safe_lock_option, safe_lock_option_disposable, safe_lock_option_observer,
12 safe_lock_vec,
13};
14use educe::Educe;
15use std::time::Duration;
16
17#[derive(Educe)]
72#[educe(Debug, Clone)]
73pub struct BufferWithTime<OE, S> {
74 source: OE,
75 time_span: Duration,
76 scheduler: S,
77 delay: Option<Duration>,
78}
79
80impl<OE, S> BufferWithTime<OE, S> {
81 pub fn new(source: OE, time_span: Duration, scheduler: S, delay: Option<Duration>) -> Self {
82 Self {
83 source,
84 time_span,
85 scheduler,
86 delay,
87 }
88 }
89}
90
91impl<'or, 'sub, T, E, OE, S> Observable<'static, 'sub, Vec<T>, E> for BufferWithTime<OE, S>
92where
93 T: NecessarySendSync + 'static,
94 OE: Observable<'or, 'sub, T, E>,
95 S: Scheduler,
96{
97 fn subscribe(
98 self,
99 observer: impl Observer<Vec<T>, E> + NecessarySendSync + 'static,
100 ) -> Subscription<'sub> {
101 let observer = Shared::new(Mutable::new(Some(observer)));
102 let context = Shared::new(Mutable::new(BufferWithTimeContext {
103 values: Vec::new(),
104 timer: None,
105 }));
106 let sub = self.source.subscribe(BufferWithTimeObserver {
107 observer: observer.clone(),
108 context: context.clone(),
109 });
110 let context_cloned = context.clone();
111 let disposal = self.scheduler.schedule_periodically(
112 move |_| {
113 let values = safe_lock!(mem_take: context_cloned, values);
114 !safe_lock_option_observer!(on_next: observer, values)
115 },
116 self.time_span,
117 self.delay,
118 );
119 safe_lock_option!(replace: context, timer, BoxedDisposal::new(disposal));
120 sub + context
121 }
122}
123
124struct BufferWithTimeContext<T> {
125 values: Vec<T>,
126 timer: Option<BoxedDisposal<'static>>,
127}
128
129impl<T> Disposable for Shared<Mutable<BufferWithTimeContext<T>>> {
130 fn dispose(self) {
131 safe_lock_option_disposable!(dispose: self, timer);
132 }
133}
134
135struct BufferWithTimeObserver<T, OR> {
136 observer: Shared<Mutable<Option<OR>>>,
137 context: Shared<Mutable<BufferWithTimeContext<T>>>,
138}
139
140impl<T, E, OR> Observer<T, E> for BufferWithTimeObserver<T, OR>
141where
142 OR: Observer<Vec<T>, E>,
143{
144 fn on_next(&mut self, value: T) {
145 safe_lock_vec!(push: self.context, values, value);
146 }
147
148 fn on_termination(self, termination: Termination<E>) {
149 let values = self.context.lock_mut(|mut lock| {
150 if let Some(timer) = lock.timer.take() {
151 timer.dispose();
152 }
153 std::mem::take(&mut lock.values)
154 });
155 match termination {
156 Termination::Completed => {
157 if !values.is_empty() {
158 safe_lock_option_observer!(on_next_and_termination: self.observer, values, termination);
159 } else {
160 safe_lock_option_observer!(on_termination: self.observer, termination);
161 }
162 }
163 Termination::Error(_) => {
164 safe_lock_option_observer!(on_termination: self.observer, termination);
165 }
166 }
167 }
168}