rx_rust/operators/backpressure/
on_backpressure.rs1use crate::disposable::Disposable;
2use crate::utils::types::{MutGuard, Mutable, MutableHelper, NecessarySendSync, Shared};
3use crate::{
4 disposable::subscription::Subscription,
5 observable::Observable,
6 observer::{Observer, Termination},
7};
8use crate::{safe_lock_option, safe_lock_option_observer};
9use educe::Educe;
10
11cfg_if::cfg_if! {
12 if #[cfg(feature = "single-threaded")] {
13 pub type RequestCallbackType<'cb> = Box<dyn FnOnce() + 'cb>;
16 } else {
17 pub type RequestCallbackType<'cb> = Box<dyn FnOnce() + Send + Sync + 'cb>;
20 }
21}
22
23#[derive(Educe)]
61#[educe(Debug, Clone)]
62pub struct OnBackpressure<OE, F> {
63 source: OE,
64 receiving_strategy: F,
65}
66
67impl<OE, F> OnBackpressure<OE, F> {
68 pub fn new<'or, 'sub, T, E>(source: OE, receiving_strategy: F) -> Self
69 where
70 OE: Observable<'or, 'sub, T, E>,
71 F: FnMut(&mut Vec<T>, T),
72 {
73 Self {
74 source,
75 receiving_strategy,
76 }
77 }
78}
79
80impl<'or, 'sub, T, E, OE, F> Observable<'or, 'sub, (Vec<T>, RequestCallbackType<'or>), E>
81 for OnBackpressure<OE, F>
82where
83 T: NecessarySendSync + 'or + 'sub,
84 E: NecessarySendSync + 'or + 'sub,
85 OE: Observable<'or, 'sub, T, E>,
86 F: FnMut(&mut Vec<T>, T) + NecessarySendSync + 'or,
87{
88 fn subscribe(
89 self,
90 observer: impl Observer<(Vec<T>, RequestCallbackType<'or>), E> + NecessarySendSync + 'or,
91 ) -> Subscription<'sub> {
92 let context = Shared::new(Mutable::new(OnBackpressureContext {
93 buffer: Some(Vec::new()),
94 termination: None,
95 emit_directly: true,
96 }));
97 let observer = OnBackpressureObserver {
98 observer: Shared::new(Mutable::new(Some(observer))),
99 context: context.clone(),
100 receiving_strategy: self.receiving_strategy,
101 };
102 self.source.subscribe(observer) + context
103 }
104}
105
106struct OnBackpressureContext<T, E> {
107 buffer: Option<Vec<T>>, termination: Option<Termination<E>>,
109 emit_directly: bool,
110}
111
112impl<T, E> Disposable for Shared<Mutable<OnBackpressureContext<T, E>>> {
113 fn dispose(self) {
114 safe_lock_option!(take: self, buffer);
115 }
116}
117
118struct OnBackpressureObserver<T, E, OR, F> {
119 observer: Shared<Mutable<Option<OR>>>,
120 context: Shared<Mutable<OnBackpressureContext<T, E>>>,
121 receiving_strategy: F,
122}
123
124impl<'cb, T, E, OR, F> Observer<T, E> for OnBackpressureObserver<T, E, OR, F>
125where
126 T: NecessarySendSync + 'cb,
127 E: NecessarySendSync + 'cb,
128 OR: Observer<(Vec<T>, RequestCallbackType<'cb>), E> + NecessarySendSync + 'cb,
129 F: FnMut(&mut Vec<T>, T),
130{
131 fn on_next(&mut self, value: T) {
132 self.context.lock_mut(|mut lock| {
133 if let Some(buffer) = &mut lock.buffer {
134 (self.receiving_strategy)(buffer, value);
135 if lock.emit_directly {
136 emit(Some(lock), self.observer.clone(), self.context.clone());
137 }
138 }
139 });
140 }
141
142 fn on_termination(self, termination: Termination<E>) {
143 self.context.lock_mut(|mut lock| {
144 if lock.buffer.is_none() {
145 return;
146 }
147 lock.termination = Some(termination);
148 if lock.emit_directly {
149 emit(Some(lock), self.observer.clone(), self.context.clone());
150 }
151 });
152 }
153}
154
155fn emit<'cb, T, E, OR>(
156 lock: Option<MutGuard<'_, OnBackpressureContext<T, E>>>,
157 observer: Shared<Mutable<Option<OR>>>,
158 context: Shared<Mutable<OnBackpressureContext<T, E>>>,
159) where
160 T: NecessarySendSync + 'cb,
161 E: NecessarySendSync + 'cb,
162 OR: Observer<(Vec<T>, RequestCallbackType<'cb>), E> + NecessarySendSync + 'cb,
163{
164 let implementation = |mut lock: MutGuard<'_, OnBackpressureContext<T, E>>| {
165 if let Some(buffer) = &mut lock.buffer {
166 if buffer.is_empty() {
167 if let Some(termination) = lock.termination.take() {
168 drop(lock);
170 safe_lock_option_observer!(on_termination: observer, termination);
171 } else {
172 lock.emit_directly = true;
174 drop(lock);
175 }
176 } else {
177 let values = std::mem::take(buffer);
179 lock.emit_directly = false;
180 drop(lock);
181 let observer_cloned = observer.clone();
182 let context_cloned = context.clone();
183 let callback: RequestCallbackType = Box::new(move || {
184 emit(None, observer_cloned, context_cloned);
185 });
186 safe_lock_option_observer!(on_next: observer, (values, callback));
187 }
188 } else {
189 }
191 };
192 if let Some(lock) = lock {
193 implementation(lock);
194 } else {
195 context.lock_mut(implementation);
196 }
197}