1#![feature(trait_alias)]
63
64use std::sync::{RwLock, Arc};
65
66pub struct Observable<T, Error = ()>
69 where
70 T: Send + Sync + 'static,
71 Error: Send + Sync + 'static
72{
73 subscriber: BoxedSubscriberFunction<T, Error>,
74}
75
76impl<T, Error> Observable<T, Error>
77 where
78 T: Send + Sync + 'static,
79 Error: Send + Sync + 'static
80{
81 pub fn new<F, G>(subscriber: F) -> Self
83 where
84 F: Fn(SubscriptionObserver<T, Error>) -> G + Send + Sync + 'static,
85 G: Fn() + Send + Sync + 'static,
86 {
87 Self {
88 subscriber: Arc::new(move |subobserver| { Arc::new(subscriber(subobserver)) })
89 }
90 }
91
92 pub fn subscribe(&self, observer: impl Into<BoxedObserver<T, Error>>) -> Arc<Subscription<T, Error>> {
94 Subscription::new(observer.into(), Arc::clone(&self.subscriber))
95 }
96
97 pub fn map<U, F>(&self, map_fn: impl Fn(T) -> U + Send + Sync + 'static) -> Observable<U, Error>
99 where
100 U: Send + Sync + 'static,
101 F: SubscriberFunction<U, Error>,
102 {
103 let orig = self.clone();
104 let map_fn = Arc::new(map_fn);
105 Observable::<U, Error>::new(move |observer| {
106 let map_fn = map_fn.clone();
107 let observer = Arc::new(observer);
108 let subscription = orig.subscribe(observer! {
109 next: {
110 let observer = Arc::clone(&observer);
111 move |value: T| {
112 observer.next(map_fn(value));
113 }
114 },
115 error: {
116 let observer = Arc::clone(&observer);
117 move |error| {
118 observer.error(error);
119 }
120 },
121 complete: {
122 let observer = Arc::clone(&observer);
123 move || {
124 observer.complete();
125 }
126 },
127 });
128 move || {
129 subscription.unsubscribe();
130 }
131 })
132 }
133
134 pub fn filter<F>(&self, filter_fn: impl Fn(T) -> bool + 'static + Send + Sync) -> Observable<T, Error>
136 where
137 T: Clone,
138 F: SubscriberFunction<T, Error>,
139 {
140 let orig = self.clone();
141 let filter_fn = Arc::new(filter_fn);
142 Self::new(move |observer| {
143 let filter_fn = filter_fn.clone();
144 let observer = Arc::new(observer);
145 let subscription = orig.subscribe(observer! {
146 next: {
147 let observer = Arc::clone(&observer);
148 move |value: T| {
149 if filter_fn(value.clone()) {
150 observer.next(value);
151 }
152 }
153 },
154 error: {
155 let observer = Arc::clone(&observer);
156 move |error| {
157 observer.error(error);
158 }
159 },
160 complete: {
161 let observer = Arc::clone(&observer);
162 move || {
163 observer.complete();
164 }
165 },
166 });
167 move || {
168 subscription.unsubscribe();
169 }
170 })
171 }
172}
173
174impl<T, Iterable> From<Iterable> for Observable<T, ()>
175 where
176 Iterable: IntoIterator<Item = T> + Send + Sync,
177 T: Clone + Send + Sync + 'static
178{
179 fn from(value: Iterable) -> Self {
181 let value = value.into_iter().collect::<Vec<T>>();
182 Self::new(move |observer| {
183 let cleanup = || {};
184 for item in &value {
185 observer.next(item.clone());
186 if observer.closed() {
187 return cleanup;
188 }
189 }
190 observer.complete();
191 cleanup
192 })
193 }
194}
195
196impl<T, Error> Clone for Observable<T, Error>
197where
198 T: Send + Sync + 'static,
199 Error: Send + Sync + 'static,
200{
201 fn clone(&self) -> Self {
202 Self {
203 subscriber: Arc::clone(&self.subscriber)
204 }
205 }
206}
207
208pub trait SubscriberFunction<T, Error = ()> = Fn(SubscriptionObserver<T, Error>) -> Arc<dyn SubscriptionCleanupFunction> + Sync + Send + 'static
209 where
210 T: Send + Sync + 'static,
211 Error: Send + Sync + 'static;
212type BoxedSubscriberFunction<T, Error = ()> = Arc<(dyn SubscriberFunction<T, Error>)>;
213
214pub trait SubscriptionCleanupFunction = Fn() + Sync + Send + 'static;
215
216pub struct Subscription<T, Error = ()>
218 where
219 T: Send + Sync + 'static,
220 Error: Send + Sync + 'static
221{
222 cleanup: RwLock<Option<Arc<dyn Fn() + Sync + Send>>>,
223 observer: SubscriptionObserverLock<T, Error>,
224}
225
226type SubscriptionObserverLock<T, Error> = RwLock<Option<Arc<RwLock<BoxedObserver<T, Error>>>>>;
227
228impl<T, Error> Subscription<T, Error>
229 where
230 T: Send + Sync + 'static,
231 Error: Send + Sync + 'static
232{
233 fn new(observer: BoxedObserver<T, Error>, subscriber: BoxedSubscriberFunction<T, Error>) -> Arc<Self> {
234 let this = Arc::new(Self {
235 cleanup: RwLock::new(None),
236 observer: RwLock::new(Some(Arc::new(RwLock::new(observer)))),
237 });
238 this.observer.read().unwrap().as_ref().unwrap().read().unwrap().start(Arc::clone(&this));
239
240 if subscription_closed(&this) {
242 return this;
243 }
244
245 let observer = SubscriptionObserver { subscription: Arc::clone(&this) };
246
247 let cleanup = subscriber(observer);
249
250 *this.cleanup.write().unwrap() = Some(Arc::clone(&cleanup));
252
253 if subscription_closed(&this) {
254 cleanup_subscription(&this);
255 }
256
257 this
258 }
259
260 pub fn closed(&self) -> bool {
262 subscription_closed(self)
263 }
264
265 pub fn unsubscribe(&self) {
267 close_subscription(self);
268 }
269}
270
271pub struct SubscriptionObserver<T, Error = ()>
273 where
274 T: Send + Sync + 'static,
275 Error: Send + Sync + 'static
276{
277 subscription: Arc<Subscription<T, Error>>,
278}
279
280impl<T, Error> SubscriptionObserver<T, Error>
281 where
282 T: Send + Sync + 'static,
283 Error: Send + Sync + 'static
284{
285 pub fn closed(&self) -> bool {
287 subscription_closed(&self.subscription)
288 }
289
290 pub fn next(&self, value: T) {
292 let subscription = Arc::clone(&self.subscription);
293
294 if subscription_closed(&subscription) {
296 return;
297 }
298
299 let observer = subscription.observer.read().unwrap().clone();
300 if observer.is_none() {
301 return;
302 }
303
304 observer.unwrap().read().unwrap().next(value);
306 }
307
308 pub fn error(&self, error: Error) {
310 let subscription = Arc::clone(&self.subscription);
311
312 if subscription_closed(&subscription) {
314 return;
315 }
316
317 let observer = subscription.observer.read().unwrap();
318 if let Some(o) = observer.as_ref().map(Arc::clone) {
319 drop(observer);
320 *subscription.observer.write().unwrap() = None;
321 o.read().unwrap().error(error);
322 } else {
323 }
325
326 cleanup_subscription(&subscription);
327 }
328
329 pub fn complete(&self) {
331 let subscription = Arc::clone(&self.subscription);
332
333 if subscription_closed(&subscription) {
335 return;
336 }
337
338 let observer = subscription.observer.read().unwrap();
339 if let Some(o) = observer.as_ref().map(Arc::clone) {
340 drop(observer);
341 *subscription.observer.write().unwrap() = None;
342 o.read().unwrap().complete();
343 }
344
345 cleanup_subscription(&subscription);
346 }
347}
348
349pub type BoxedObserver<T, Error = ()> = Box<dyn AbstractObserver<T, Error>>;
351
352pub use rust_observable_literal::observer;
353
354pub struct Observer<T, Error = ()>
357 where
358 T: Send + Sync + 'static,
359 Error: Send + Sync + 'static
360{
361 pub next: Box<dyn Fn(T) + Sync + Send>,
363 pub error: Box<dyn Fn(Error) + Sync + Send>,
365 pub complete: Box<dyn Fn() + Sync + Send>,
367 pub start: Box<dyn ObserverStartFunction<T, Error>>,
369}
370
371pub trait ObserverStartFunction<T, Error> = Fn(Arc<Subscription<T, Error>>) + Sync + Send
372 where
373 T: Send + Sync + 'static,
374 Error: Send + Sync + 'static;
375
376impl<T, Error> AbstractObserver<T, Error> for Observer<T, Error>
377 where
378 T: Send + Sync + 'static,
379 Error: Send + Sync + 'static
380{
381 fn next(&self, value: T) {
382 (self.next)(value);
383 }
384 fn error(&self, error: Error) {
385 (self.error)(error);
386 }
387 fn complete(&self) {
388 (self.complete)();
389 }
390 fn start(&self, subscription: Arc<Subscription<T, Error>>) {
391 (self.start)(subscription);
392 }
393}
394
395impl<T, Error> Default for Observer<T, Error>
396 where
397 T: Send + Sync + 'static,
398 Error: Send + Sync + 'static
399{
400 fn default() -> Self {
401 Self {
402 next: Box::new(|_| {}),
403 error: Box::new(|_| {}),
404 complete: Box::new(|| {}),
405 start: Box::new(|_| {}),
406 }
407 }
408}
409
410impl<T, Error> From<Observer<T, Error>> for BoxedObserver<T, Error>
411 where
412 T: Send + Sync + 'static,
413 Error: Send + Sync + 'static
414{
415 fn from(value: Observer<T, Error>) -> Self {
416 Box::new(value)
417 }
418}
419
420pub trait AbstractObserver<T, Error = ()>: Send + Sync
423 where
424 T: Send + Sync + 'static,
425 Error: Send + Sync + 'static
426{
427 fn next(&self, value: T) {
429 let _ = value;
430 }
431 fn error(&self, error: Error) {
433 let _ = error;
434 }
435 fn complete(&self) {}
437 fn start(&self, subscription: Arc<Subscription<T, Error>>) {
439 let _ = subscription;
440 }
441}
442
443fn cleanup_subscription<T, Error>(subscription: &Subscription<T, Error>)
444 where
445 T: Send + Sync + 'static,
446 Error: Send + Sync + 'static
447{
448 assert!(subscription.observer.read().unwrap().is_none());
449 let cleanup = subscription.cleanup.read().unwrap().clone();
450 if cleanup.is_none() {
451 return;
452 }
453 let cleanup = Arc::clone(&cleanup.unwrap());
454
455 *subscription.cleanup.write().unwrap() = None;
458
459 cleanup();
461}
462
463fn subscription_closed<T, Error>(subscription: &Subscription<T, Error>) -> bool
464 where
465 T: Send + Sync + 'static,
466 Error: Send + Sync + 'static
467{
468 let observer = subscription.observer.read().unwrap().clone();
469 observer.is_none()
470}
471
472fn close_subscription<T, Error>(subscription: &Subscription<T, Error>)
473 where
474 T: Send + Sync + 'static,
475 Error: Send + Sync + 'static
476{
477 if subscription_closed(subscription) {
478 return;
479 }
480 *subscription.observer.write().unwrap() = None;
481 cleanup_subscription(subscription);
482}
483
484#[cfg(test)]
485mod test {
486 use super::*;
487
488 #[test]
489 fn subscription() {
490 let list = Arc::new(RwLock::new(vec![]));
491 Observable::<_, ()>::new(|observer| {
492 for color in ["red", "green", "blue"] {
493 observer.next(color.to_owned());
494 }
495 || {
496 }
498 })
499 .subscribe(observer! {
500 next: {
501 let list = Arc::clone(&list);
502 move |color| {
503 list.write().unwrap().push(color);
504 }
505 },
506 });
507 assert_eq!(
508 *list.read().unwrap(),
509 Vec::from_iter(["red", "green", "blue"])
510 );
511
512 let list = Arc::new(RwLock::new(vec![]));
514 Observable::from(Vec::from_iter(["red", "green", "blue"]))
515 .subscribe(observer! {
516 next: {
517 let list = Arc::clone(&list);
518 move |color| {
519 list.write().unwrap().push(color);
520 }
521 },
522 });
523 assert_eq!(
524 *list.read().unwrap(),
525 Vec::from_iter(["red", "green", "blue"])
526 );
527 }
528}