1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170
// Rx -- Reactive programming for Rust // Copyright 2016 Ruud van Asseldonk // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // A copy of the License has been included in the root of the repository. use observer::Observer; use observer::{NextObserver, CompletedObserver, ErrorObserver, OptionObserver, ResultObserver}; use std::fmt::Debug; /// A stream of values. /// /// An observable represents a stream of values, much like an iterator, /// but instead of being “pull-based” like an iterator, it is “push-based”. /// Multiple observers can subscribe to an observable and when the observable /// produces a value, all observers get called with this value. /// /// An observable can be _finite_ or _infinite_. An example of an infinite /// observable are mouse clicks: you never know if the user is going to click /// once more. An example of a finite observable are the results of a database /// query: a database is can hold only finitely many records, so one result is /// the last one. /// /// A finite observable can end in two ways: /// /// * **Completed**: when the observable ends normally. /// For instance, an observable of database query results /// will complete after the last result has been produced. /// * **Failed**: when an error occurred. /// For instance, an observable of database query results /// may fail if the connection is lost. /// /// Failures are fatal: after an observable produces an error, it will not /// produce any new values. If this is not the desired behavior, you can /// use an observable of `Result`. pub trait Observable { /// The value produced by the observable. type Item: Clone; /// The error produced if the observable fails. type Error: Clone; /// The result of subscribing an observer. type Subscription: Drop; /// Subscribes an observer and returns the subscription. /// /// After subscription, `on_next` will be called on the observer for every /// value produced. If the observable completes, `on_completed` is called. /// If the observable fails with an error, `on_error` is called. It is /// guaranteed that no methods will be called on the observer after /// `on_completed` or `on_error` have been called. /// /// _When_ the observer is called is not part of the observable contract, /// it depends on the kind of observable. The observer may be called before /// `subscribe` returns, or it may be called in the future. /// /// The returned value represents the subscription. Dropping the subscription /// will prevent further calls on the observer. fn subscribe<O>(&mut self, observer: O) -> Self::Subscription where O: Observer<Self::Item, Self::Error>; /// Subscribes a function to handle values produced by the observable. /// /// For every value produced by the observable, `on_next` is called. /// /// **This subscription panics if the observable fails with an error.** /// /// See also [`subscribe()`](#tymethod.subscribe). fn subscribe_next<FnNext>(&mut self, on_next: FnNext) -> Self::Subscription where Self::Error: Debug, FnNext: FnMut(Self::Item) { let observer = NextObserver { fn_next: on_next, }; self.subscribe(observer) } /// Subscribes functions to handle next and completion. /// /// For every value produced by the observable, `on_next` is called. If the /// observable completes, `on_completed` is called. A failure will cause a /// panic. After `on_completed` has been called, it is guaranteed that neither /// `on_next` nor `on_completed` is called again. /// /// **This subscription panics if the observable fails with an error.** /// /// See also [`subscribe()`](#tymethod.subscribe). fn subscribe_completed<FnNext, FnCompleted>(&mut self, on_next: FnNext, on_completed: FnCompleted) -> Self::Subscription where Self::Error: Debug, FnNext: FnMut(Self::Item), FnCompleted: FnOnce() { let observer = CompletedObserver { fn_next: on_next, fn_completed: on_completed, }; self.subscribe(observer) } /// Subscribes functions to handle next, completion, and error. /// /// For every value produced by the observable, `on_next` is called. If the /// observable completes, `on_completed` is called. If it fails, `on_error` /// is called. After `on_completed` or `on_error` have been called, it is /// guaranteed that none of the three functions are called again. /// /// See also [`subscribe()`](#tymethod.subscribe). fn subscribe_error<FnNext, FnCompleted, FnError>(&mut self, on_next: FnNext, on_completed: FnCompleted, on_error: FnError) -> Self::Subscription where FnNext: FnMut(Self::Item), FnCompleted: FnOnce(), FnError: FnOnce(Self::Error) { let observer = ErrorObserver { fn_next: on_next, fn_completed: on_completed, fn_error: on_error, }; self.subscribe(observer) } /// Subscribes a function that takes an option. /// /// The function translates into an observer as follows: /// /// * `on_next(x)`: calls the functions with `Some(x)`. /// * `on_completed()`: calls the function with `None`. /// * `on_error(error)`: panics. /// /// After the function has been called with `None`, /// it is guaranteed never to be called again. /// /// **This subscription panics if the observable fails with an error.** /// /// See also [`subscribe()`](#tymethod.subscribe). fn subscribe_option<FnOption>(&mut self, on_next_or_completed: FnOption) -> Self::Subscription where Self::Error: Debug, FnOption: FnMut(Option<Self::Item>) { let observer = OptionObserver { fn_option: on_next_or_completed }; self.subscribe(observer) } /// Subscribes a function that takes a result of an option. /// /// The function translates into an observer as follows: /// /// * `on_next(x)`: calls the function with `Ok(Some(x))`. /// * `on_completed()`: calls the function with `Ok(None)`. /// * `on_error(error)`: calls the function with `Err(error)`. /// /// After the function has been called with `Ok(None)` or `Err(error)`, /// it is guaranteed never to be called again. /// /// See also [`subscribe()`](#tymethod.subscribe). fn subscribe_result<FnResult>(&mut self, on_next_or_completed_or_error: FnResult) -> Self::Subscription where FnResult: FnMut(Result<Option<Self::Item>, Self::Error>) { let observer = ResultObserver { fn_result: on_next_or_completed_or_error }; self.subscribe(observer) } }