rust_observable/
lib.rs

1/*!
2Work with observables.
3
4The [`Observable`] type can be used to model push-based
5data sources. In addition, observables are:
6
7- _Compositional:_ Observables can be composed with higher-order
8combinators.
9- _Lazy:_ Observables do not start emitting data until an **observer**
10has subscribed.
11
12This module follows the [TC39 `Observable`](https://github.com/tc39/proposal-observable) proposal.
13User observers other than `Observer` can be defined by implementing
14the `AbstractObserver` trait.
15
16# Example
17
18```
19use rust_observable::*;
20
21fn my_observable() -> Observable<String> {
22    Observable::new(|observer| {
23        // send initial data
24        observer.next("initial value".into());
25
26        // return a cleanup function that runs on
27        // unsubscribe.
28        || {
29            println!("cleanup on unsubscribe");
30        }
31    })
32}
33
34let _ = my_observable()
35    .subscribe(observer! {
36        next: |value| {},
37        error: |error| {},
38        complete: || {},
39        start: |subscription| {},
40    })
41    .unsubscribe();
42
43// you can also use functional methods such as `filter` and `map`.
44let _ = my_observable()
45    .filter(|value| true)
46    .map(|value| value);
47```
48
49You can directly construct an `Observable` from a list of values:
50
51```
52# use rust_observable::*;
53Observable::from(["red", "green", "blue"])
54    .subscribe(observer! {
55        next: |color| {
56            println!("{}", color);
57        },
58    });
59```
60*/
61
62#![feature(trait_alias)]
63
64use std::sync::{RwLock, Arc};
65
66/// An `Observable` represents a sequence of values which
67/// may be observed.
68pub struct Observable<T, Error = ()>
69    where
70        T: Send + Sync + 'static,
71        Error: Send + Sync + 'static
72{
73    subscriber: BoxedSubscriberFunction<T, Error>,
74}
75
76impl<T, Error> Observable<T, Error>
77    where
78        T: Send + Sync + 'static,
79        Error: Send + Sync + 'static
80{
81    /// Constructs an observable given a callback.
82    pub fn new<F, G>(subscriber: F) -> Self
83        where
84            F: Fn(SubscriptionObserver<T, Error>) -> G + Send + Sync + 'static,
85            G: Fn() + Send + Sync + 'static,
86    {
87        Self {
88            subscriber: Arc::new(move |subobserver| { Arc::new(subscriber(subobserver)) })
89        }
90    }
91
92    /// Subscribes to the sequence with an observer.
93    pub fn subscribe(&self, observer: impl Into<BoxedObserver<T, Error>>) -> Arc<Subscription<T, Error>> {
94        Subscription::new(observer.into(), Arc::clone(&self.subscriber))
95    }
96
97    /// Returns a new `Observable` that performs a map on data from the original.
98    pub fn map<U, F>(&self, map_fn: impl Fn(T) -> U + Send + Sync + 'static) -> Observable<U, Error>
99        where
100            U: Send + Sync + 'static,
101            F: SubscriberFunction<U, Error>,
102    {
103        let orig = self.clone();
104        let map_fn = Arc::new(map_fn);
105        Observable::<U, Error>::new(move |observer| {
106            let map_fn = map_fn.clone();
107            let observer = Arc::new(observer);
108            let subscription = orig.subscribe(observer! {
109                next: {
110                    let observer = Arc::clone(&observer);
111                    move |value: T| {
112                        observer.next(map_fn(value));
113                    }
114                },
115                error: {
116                    let observer = Arc::clone(&observer);
117                    move |error| {
118                        observer.error(error);
119                    }
120                },
121                complete: {
122                    let observer = Arc::clone(&observer);
123                    move || {
124                        observer.complete();
125                    }
126                },
127            });
128            move || {
129                subscription.unsubscribe();
130            }
131        })
132    }
133
134    /// Returns a new `Observable` that filters data specified by the predicate.
135    pub fn filter<F>(&self, filter_fn: impl Fn(T) -> bool + 'static + Send + Sync) -> Observable<T, Error>
136        where
137            T: Clone,
138            F: SubscriberFunction<T, Error>,
139    {
140        let orig = self.clone();
141        let filter_fn = Arc::new(filter_fn);
142        Self::new(move |observer| {
143            let filter_fn = filter_fn.clone();
144            let observer = Arc::new(observer);
145            let subscription = orig.subscribe(observer! {
146                next: {
147                    let observer = Arc::clone(&observer);
148                    move |value: T| {
149                        if filter_fn(value.clone()) {
150                            observer.next(value);
151                        }
152                    }
153                },
154                error: {
155                    let observer = Arc::clone(&observer);
156                    move |error| {
157                        observer.error(error);
158                    }
159                },
160                complete: {
161                    let observer = Arc::clone(&observer);
162                    move || {
163                        observer.complete();
164                    }
165                },
166            });
167            move || {
168                subscription.unsubscribe();
169            }
170        })
171    }
172}
173
174impl<T, Iterable> From<Iterable> for Observable<T, ()>
175    where
176        Iterable: IntoIterator<Item = T> + Send + Sync,
177        T: Clone + Send + Sync + 'static
178{
179    /// Constructs an `Observable` from a list of values.
180    fn from(value: Iterable) -> Self {
181        let value = value.into_iter().collect::<Vec<T>>();
182        Self::new(move |observer| {
183            let cleanup = || {};
184            for item in &value {
185                observer.next(item.clone());
186                if observer.closed() {
187                    return cleanup;
188                }
189            }
190            observer.complete();
191            cleanup
192        })
193    }
194}
195
196impl<T, Error> Clone for Observable<T, Error>
197where
198    T: Send + Sync + 'static,
199    Error: Send + Sync + 'static,
200{
201    fn clone(&self) -> Self {
202        Self {
203            subscriber: Arc::clone(&self.subscriber)
204        }
205    }
206}
207
208pub trait SubscriberFunction<T, Error = ()> = Fn(SubscriptionObserver<T, Error>) -> Arc<dyn SubscriptionCleanupFunction> + Sync + Send + 'static
209    where
210        T: Send + Sync + 'static,
211        Error: Send + Sync + 'static;
212type BoxedSubscriberFunction<T, Error = ()> = Arc<(dyn SubscriberFunction<T, Error>)>;
213
214pub trait SubscriptionCleanupFunction = Fn() + Sync + Send + 'static;
215
216/// A `Subscription` is returned by `subscribe`.
217pub struct Subscription<T, Error = ()>
218    where
219        T: Send + Sync + 'static,
220        Error: Send + Sync + 'static
221{
222    cleanup: RwLock<Option<Arc<dyn Fn() + Sync + Send>>>,
223    observer: SubscriptionObserverLock<T, Error>,
224}
225
226type SubscriptionObserverLock<T, Error> = RwLock<Option<Arc<RwLock<BoxedObserver<T, Error>>>>>;
227
228impl<T, Error> Subscription<T, Error>
229    where
230        T: Send + Sync + 'static,
231        Error: Send + Sync + 'static
232{
233    fn new(observer: BoxedObserver<T, Error>, subscriber: BoxedSubscriberFunction<T, Error>) -> Arc<Self> {
234        let this = Arc::new(Self {
235            cleanup: RwLock::new(None),
236            observer: RwLock::new(Some(Arc::new(RwLock::new(observer)))),
237        });
238        this.observer.read().unwrap().as_ref().unwrap().read().unwrap().start(Arc::clone(&this));
239
240        // if the observer has unsubscribed from the start method, exit
241        if subscription_closed(&this) {
242            return this;
243        }
244
245        let observer = SubscriptionObserver { subscription: Arc::clone(&this) };
246
247        // call the subscriber function.
248        let cleanup = subscriber(observer);
249
250        // the return value of the cleanup is always a function.
251        *this.cleanup.write().unwrap() = Some(Arc::clone(&cleanup));
252
253        if subscription_closed(&this) {
254            cleanup_subscription(&this);
255        }
256
257        this
258    }
259
260    /// Indicates whether the subscription is closed.
261    pub fn closed(&self) -> bool {
262        subscription_closed(self)
263    }
264
265    /// Cancels the subscription.
266    pub fn unsubscribe(&self) {
267        close_subscription(self);
268    }
269}
270
271/// A `SubscriptionObserver` wraps the observer object supplied to `subscribe`.
272pub struct SubscriptionObserver<T, Error = ()>
273    where
274        T: Send + Sync + 'static,
275        Error: Send + Sync + 'static
276{
277    subscription: Arc<Subscription<T, Error>>,
278}
279
280impl<T, Error> SubscriptionObserver<T, Error>
281    where
282        T: Send + Sync + 'static,
283        Error: Send + Sync + 'static
284{
285    /// Indicates whether the subscription is closed.
286    pub fn closed(&self) -> bool {
287        subscription_closed(&self.subscription)
288    }
289
290    /// Sends the next value in the sequence.
291    pub fn next(&self, value: T) {
292        let subscription = Arc::clone(&self.subscription);
293
294        // if the stream if closed, then exit.
295        if subscription_closed(&subscription) {
296            return;
297        }
298
299        let observer = subscription.observer.read().unwrap().clone();
300        if observer.is_none() {
301            return;
302        }
303
304        // send the next value to the sink.
305        observer.unwrap().read().unwrap().next(value);
306    }
307
308    /// Sends the sequence error.
309    pub fn error(&self, error: Error) {
310        let subscription = Arc::clone(&self.subscription);
311
312        // if the stream if closed, throw the error to the caller.
313        if subscription_closed(&subscription) {
314            return;
315        }
316
317        let observer = subscription.observer.read().unwrap();
318        if let Some(o) = observer.as_ref().map(Arc::clone) {
319            drop(observer);
320            *subscription.observer.write().unwrap() = None;
321            o.read().unwrap().error(error);
322        } else {
323            // host_report_errors(e)
324        }
325
326        cleanup_subscription(&subscription);
327    }
328
329    /// Sends the completion notification.
330    pub fn complete(&self) {
331        let subscription = Arc::clone(&self.subscription);
332
333        // if the stream if closed, throw the error to the caller.
334        if subscription_closed(&subscription) {
335            return;
336        }
337
338        let observer = subscription.observer.read().unwrap();
339        if let Some(o) = observer.as_ref().map(Arc::clone) {
340            drop(observer);
341            *subscription.observer.write().unwrap() = None;
342            o.read().unwrap().complete();
343        }
344
345        cleanup_subscription(&subscription);
346    }
347}
348
349/// The `BoxedObserver` type represents an abstract observer into a box.
350pub type BoxedObserver<T, Error = ()> = Box<dyn AbstractObserver<T, Error>>;
351
352pub use rust_observable_literal::observer;
353
354/// An `Observer` is used to receive data from an `Observable`, and
355/// is supplied as an argument to `subscribe`.
356pub struct Observer<T, Error = ()>
357    where
358        T: Send + Sync + 'static,
359        Error: Send + Sync + 'static
360{
361    /// Receives the next value in the sequence.
362    pub next: Box<dyn Fn(T) + Sync + Send>,
363    /// Receives the sequence error.
364    pub error: Box<dyn Fn(Error) + Sync + Send>,
365    /// Receives a completion notification.
366    pub complete: Box<dyn Fn() + Sync + Send>,
367    /// Receives the subscription object when `subscribe` is called.
368    pub start: Box<dyn ObserverStartFunction<T, Error>>,
369}
370
371pub trait ObserverStartFunction<T, Error> = Fn(Arc<Subscription<T, Error>>) + Sync + Send
372    where
373        T: Send + Sync + 'static,
374        Error: Send + Sync + 'static;
375
376impl<T, Error> AbstractObserver<T, Error> for Observer<T, Error>
377    where
378        T: Send + Sync + 'static,
379        Error: Send + Sync + 'static
380{
381    fn next(&self, value: T) {
382        (self.next)(value);
383    }
384    fn error(&self, error: Error) {
385        (self.error)(error);
386    }
387    fn complete(&self) {
388        (self.complete)();
389    }
390    fn start(&self, subscription: Arc<Subscription<T, Error>>) {
391        (self.start)(subscription);
392    }
393}
394
395impl<T, Error> Default for Observer<T, Error>
396    where
397        T: Send + Sync + 'static,
398        Error: Send + Sync + 'static
399{
400    fn default() -> Self {
401        Self {
402            next: Box::new(|_| {}),
403            error: Box::new(|_| {}),
404            complete: Box::new(|| {}),
405            start: Box::new(|_| {}),
406        }
407    }
408}
409
410impl<T, Error> From<Observer<T, Error>> for BoxedObserver<T, Error>
411    where
412        T: Send + Sync + 'static,
413        Error: Send + Sync + 'static
414{
415    fn from(value: Observer<T, Error>) -> Self {
416        Box::new(value)
417    }
418}
419
420/// An `AbstractObserver` is used to receive data from an `Observable`, and
421/// is supplied as an argument to `subscribe` in boxed form.
422pub trait AbstractObserver<T, Error = ()>: Send + Sync
423    where
424        T: Send + Sync + 'static,
425        Error: Send + Sync + 'static
426{
427    /// Receives the next value in the sequence.
428    fn next(&self, value: T) {
429        let _ = value;
430    }
431    /// Receives the sequence error.
432    fn error(&self, error: Error) {
433        let _ = error;
434    }
435    /// Receives a completion notification.
436    fn complete(&self) {}
437    /// Receives the subscription object when `subscribe` is called.
438    fn start(&self, subscription: Arc<Subscription<T, Error>>) {
439        let _ = subscription;
440    }
441}
442
443fn cleanup_subscription<T, Error>(subscription: &Subscription<T, Error>)
444    where
445        T: Send + Sync + 'static,
446        Error: Send + Sync + 'static
447{
448    assert!(subscription.observer.read().unwrap().is_none());
449    let cleanup = subscription.cleanup.read().unwrap().clone();
450    if cleanup.is_none() {
451        return;
452    }
453    let cleanup = Arc::clone(&cleanup.unwrap());
454
455    // drop the reference to the cleanup function so that we won't call it
456    // more than once.
457    *subscription.cleanup.write().unwrap() = None;
458
459    // call the cleanup function.
460    cleanup();
461}
462
463fn subscription_closed<T, Error>(subscription: &Subscription<T, Error>) -> bool
464    where
465        T: Send + Sync + 'static,
466        Error: Send + Sync + 'static
467{
468    let observer = subscription.observer.read().unwrap().clone();
469    observer.is_none()
470}
471
472fn close_subscription<T, Error>(subscription: &Subscription<T, Error>)
473    where
474        T: Send + Sync + 'static,
475        Error: Send + Sync + 'static
476{
477    if subscription_closed(subscription) {
478        return;
479    }
480    *subscription.observer.write().unwrap() = None;
481    cleanup_subscription(subscription);
482}
483
484#[cfg(test)]
485mod test {
486    use super::*;
487
488    #[test]
489    fn subscription() {
490        let list = Arc::new(RwLock::new(vec![]));
491        Observable::<_, ()>::new(|observer| {
492            for color in ["red", "green", "blue"] {
493                observer.next(color.to_owned());
494            }
495            || {
496                // cleanup
497            }
498        })
499            .subscribe(observer! {
500                next: {
501                    let list = Arc::clone(&list);
502                    move |color| {
503                        list.write().unwrap().push(color);
504                    }
505                },
506            });
507        assert_eq!(
508            *list.read().unwrap(),
509            Vec::from_iter(["red", "green", "blue"])
510        );
511
512        // from a collection
513        let list = Arc::new(RwLock::new(vec![]));
514        Observable::from(Vec::from_iter(["red", "green", "blue"]))
515            .subscribe(observer! {
516                next: {
517                    let list = Arc::clone(&list);
518                    move |color| {
519                        list.write().unwrap().push(color);
520                    }
521                },
522            });
523        assert_eq!(
524            *list.read().unwrap(),
525            Vec::from_iter(["red", "green", "blue"])
526        );
527    }
528}