1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
// Rx -- Reactive programming for Rust
// Copyright 2016 Ruud van Asseldonk
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// A copy of the License has been included in the root of the repository.

use observer::Observer;
use observer::{NextObserver, CompletedObserver, ErrorObserver, OptionObserver, ResultObserver};
use std::fmt::Debug;
use transform::MapObservable;

/// A stream of values.
///
/// An observable represents a stream of values, much like an iterator,
/// but instead of being “pull-based” like an iterator, it is “push-based”.
/// Multiple observers can subscribe to an observable and when the observable
/// produces a value, all observers get called with this value.
///
/// An observable can be _finite_ or _infinite_. An example of an infinite
/// observable are mouse clicks: you never know if the user is going to click
/// once more. An example of a finite observable are the results of a database
/// query: a database is can hold only finitely many records, so one result is
/// the last one.
///
/// A finite observable can end in two ways:
///
///  * **Completed**: when the observable ends normally.
///    For instance, an observable of database query results
///    will complete after the last result has been produced.
///  * **Failed**: when an error occurred.
///    For instance, an observable of database query results
///    may fail if the connection is lost.
///
/// Failures are fatal: after an observable produces an error, it will not
/// produce any new values. If this is not the desired behavior, you can
/// use an observable of `Result`.
pub trait Observable {
    /// The value produced by the observable.
    type Item: Clone;

    /// The error produced if the observable fails.
    type Error: Clone;

    /// The result of subscribing an observer.
    type Subscription: Drop;

    /// Subscribes an observer and returns the subscription.
    ///
    /// After subscription, `on_next` will be called on the observer for every
    /// value produced. If the observable completes, `on_completed` is called.
    /// If the observable fails with an error, `on_error` is called. It is
    /// guaranteed that no methods will be called on the observer after
    /// `on_completed` or `on_error` have been called.
    ///
    /// _When_ the observer is called is not part of the observable contract,
    /// it depends on the kind of observable. The observer may be called before
    /// `subscribe` returns, or it may be called in the future.
    ///
    /// The returned value represents the subscription. Dropping the subscription
    /// will prevent further calls on the observer.
    fn subscribe<O>(&mut self, observer: O) -> Self::Subscription
        where O: Observer<Self::Item, Self::Error>;

    /// Subscribes a function to handle values produced by the observable.
    ///
    /// For every value produced by the observable, `on_next` is called.
    ///
    /// **This subscription panics if the observable fails with an error.**
    ///
    /// See also [`subscribe()`](#tymethod.subscribe).
    fn subscribe_next<FnNext>(&mut self,
                              on_next: FnNext)
                              -> Self::Subscription
        where Self::Error: Debug, FnNext: FnMut(Self::Item) {
        let observer = NextObserver {
            fn_next: on_next,
        };
        self.subscribe(observer)
    }

    /// Subscribes functions to handle next and completion.
    ///
    /// For every value produced by the observable, `on_next` is called. If the
    /// observable completes, `on_completed` is called. A failure will cause a
    /// panic. After `on_completed` has been called, it is guaranteed that neither
    /// `on_next` nor `on_completed` is called again.
    ///
    /// **This subscription panics if the observable fails with an error.**
    ///
    /// See also [`subscribe()`](#tymethod.subscribe).
    fn subscribe_completed<FnNext, FnCompleted>(&mut self,
                                                on_next: FnNext,
                                                on_completed: FnCompleted)
                                                -> Self::Subscription
        where Self::Error: Debug, FnNext: FnMut(Self::Item), FnCompleted: FnOnce() {
        let observer = CompletedObserver {
            fn_next: on_next,
            fn_completed: on_completed,
        };
        self.subscribe(observer)
    }

    /// Subscribes functions to handle next, completion, and error.
    ///
    /// For every value produced by the observable, `on_next` is called. If the
    /// observable completes, `on_completed` is called. If it fails, `on_error`
    /// is called. After `on_completed` or `on_error` have been called, it is
    /// guaranteed that none of the three functions are called again.
    ///
    /// See also [`subscribe()`](#tymethod.subscribe).
    fn subscribe_error<FnNext, FnCompleted, FnError>(&mut self,
                                                     on_next: FnNext,
                                                     on_completed: FnCompleted,
                                                     on_error: FnError)
                                                     -> Self::Subscription
        where FnNext: FnMut(Self::Item), FnCompleted: FnOnce(), FnError: FnOnce(Self::Error) {
        let observer = ErrorObserver {
            fn_next: on_next,
            fn_completed: on_completed,
            fn_error: on_error,
        };
        self.subscribe(observer)
    }

    /// Subscribes a function that takes an option.
    ///
    /// The function translates into an observer as follows:
    ///
    ///  * `on_next(x)`: calls the functions with `Some(x)`.
    ///  * `on_completed()`: calls the function with `None`.
    ///  * `on_error(error)`: panics.
    ///
    /// After the function has been called with `None`,
    /// it is guaranteed never to be called again.
    ///
    /// **This subscription panics if the observable fails with an error.**
    ///
    /// See also [`subscribe()`](#tymethod.subscribe).
    fn subscribe_option<FnOption>(&mut self,
                                  on_next_or_completed: FnOption)
                                  -> Self::Subscription
        where Self::Error: Debug, FnOption: FnMut(Option<Self::Item>) {
        let observer = OptionObserver {
            fn_option: on_next_or_completed
        };
        self.subscribe(observer)
    }

    /// Subscribes a function that takes a result of an option.
    ///
    /// The function translates into an observer as follows:
    ///
    ///  * `on_next(x)`: calls the function with `Ok(Some(x))`.
    ///  * `on_completed()`: calls the function with `Ok(None)`.
    ///  * `on_error(error)`: calls the function with `Err(error)`.
    ///
    /// After the function has been called with `Ok(None)` or `Err(error)`,
    /// it is guaranteed never to be called again.
    ///
    /// See also [`subscribe()`](#tymethod.subscribe).
    fn subscribe_result<FnResult>(&mut self,
                                  on_next_or_completed_or_error: FnResult)
                                  -> Self::Subscription
        where FnResult: FnMut(Result<Option<Self::Item>, Self::Error>) {
        let observer = ResultObserver {
            fn_result: on_next_or_completed_or_error
        };
        self.subscribe(observer)
    }

    /// Transforms an observable by applying f to every value produced.
    fn map<'s, U, F>(&'s mut self, f: F) -> MapObservable<'s, Self, F>
        where F: Fn(Self::Item) -> U {
        MapObservable::new(self, f)
    }
}