rx_rust/operators/transforming/
buffer.rs1use crate::utils::types::{Mutable, NecessarySendSync, Shared};
2use crate::{
3 disposable::subscription::Subscription,
4 observable::Observable,
5 observer::{Observer, Termination},
6 utils::unsub_after_termination::subscribe_unsub_after_termination,
7};
8use crate::{safe_lock, safe_lock_option, safe_lock_option_observer, safe_lock_vec};
9use educe::Educe;
10
11#[derive(Educe)]
54#[educe(Debug, Clone)]
55pub struct Buffer<OE, OE1> {
56 source: OE,
57 boundary: OE1,
58}
59
60impl<OE, OE1> Buffer<OE, OE1> {
61 pub fn new<'or, 'sub, T, E>(source: OE, boundary: OE1) -> Self
62 where
63 OE: Observable<'or, 'sub, T, E>,
64 OE1: Observable<'or, 'sub, (), E>,
65 {
66 Self { source, boundary }
67 }
68}
69
70impl<'or, 'sub, T, E, OE, OE1> Observable<'or, 'sub, Vec<T>, E> for Buffer<OE, OE1>
71where
72 T: NecessarySendSync + 'or,
73 OE: Observable<'or, 'sub, T, E>,
74 OE1: Observable<'or, 'sub, (), E>,
75 'sub: 'or,
76{
77 fn subscribe(
78 self,
79 observer: impl Observer<Vec<T>, E> + NecessarySendSync + 'or,
80 ) -> Subscription<'sub> {
81 subscribe_unsub_after_termination(observer, |observer| {
82 let observer = Shared::new(Mutable::new(Some(observer)));
83 let values = Shared::new(Mutable::new(Vec::default()));
84 let subscription_1 = self.boundary.subscribe(BoundaryObserver {
85 observer: observer.clone(),
86 values: values.clone(),
87 });
88 let subscription_2 = self.source.subscribe(BufferObserver { observer, values });
89 subscription_1 + subscription_2
90 })
91 }
92}
93
94struct BufferObserver<T, OR> {
95 observer: Shared<Mutable<Option<OR>>>,
96 values: Shared<Mutable<Vec<T>>>,
97}
98
99impl<T, E, OR> Observer<T, E> for BufferObserver<T, OR>
100where
101 OR: Observer<Vec<T>, E>,
102{
103 fn on_next(&mut self, value: T) {
104 safe_lock_vec!(push: self.values, value);
105 }
106
107 fn on_termination(self, termination: Termination<E>) {
108 match termination {
109 Termination::Completed => {
110 let values = safe_lock!(mem_take: self.values);
111 if !values.is_empty() {
112 safe_lock_option_observer!(on_next_and_termination: self.observer, values, termination);
113 } else {
114 safe_lock_option_observer!(on_termination: self.observer, termination);
115 }
116 }
117 Termination::Error(_) => {
118 safe_lock_option_observer!(on_termination: self.observer, termination);
119 }
120 }
121 }
122}
123
124struct BoundaryObserver<T, OR> {
125 observer: Shared<Mutable<Option<OR>>>,
126 values: Shared<Mutable<Vec<T>>>,
127}
128
129impl<T, E, OR> Observer<(), E> for BoundaryObserver<T, OR>
130where
131 OR: Observer<Vec<T>, E>,
132{
133 fn on_next(&mut self, _: ()) {
134 let values = safe_lock!(mem_take: self.values);
135 safe_lock_option_observer!(on_next: self.observer, values);
136 }
137
138 fn on_termination(self, termination: Termination<E>) {
139 match termination {
140 Termination::Completed => {
141 let values = safe_lock!(mem_take: self.values);
142 if !values.is_empty() {
143 safe_lock_option_observer!(on_next_and_termination: self.observer, values, termination);
144 } else {
145 safe_lock_option_observer!(on_termination: self.observer, termination);
146 }
147 }
148 Termination::Error(_) => {
149 safe_lock_option_observer!(on_termination: self.observer, termination);
150 }
151 }
152 }
153}