rxrust/observable/
of.rs

1use crate::prelude::*;
2
3/// Creates an observable producing a multiple values.
4///
5/// Completes immediately after emitting the values given. Never emits an error.
6///
7/// # Arguments
8///
9/// * `v` - A value to emits.
10///
11/// # Examples
12///
13/// ```
14/// use rxrust::prelude::*;
15/// use rxrust::of_sequence;
16///
17/// of_sequence!(1, 2, 3)
18///   .subscribe(|v| {println!("{},", v)});
19///
20/// // print log:
21/// // 1
22/// // 2
23/// // 3
24/// ```
25#[macro_export]
26macro_rules! of_sequence {
27    ( $( $item:expr ),* ) => {
28  {
29    $crate::observable::create(|mut s| {
30      $(
31        s.next($item);
32      )*
33      s.complete();
34    })
35  }
36}
37}
38
39/// Creates an observable producing a single value.
40///
41/// Completes immediately after emitting the value given. Never emits an error.
42///
43/// # Arguments
44///
45/// * `v` - A value to emits.
46///
47/// # Examples
48///
49/// ```
50/// use rxrust::prelude::*;
51///
52/// observable::of(123)
53///   .subscribe(|v| {println!("{},", v)});
54/// ```
55pub fn of<Item>(v: Item) -> ObservableBase<OfEmitter<Item>> {
56  ObservableBase::new(OfEmitter(v))
57}
58
59#[derive(Clone)]
60pub struct OfEmitter<Item>(pub(crate) Item);
61
62#[doc(hidden)]
63macro_rules! of_emitter {
64    ($subscription:ty, $($marker:ident +)* $lf: lifetime) => {
65  fn emit<O>(self, mut subscriber: Subscriber<O, $subscription>)
66  where
67    O: Observer<Item=Self::Item,Err= Self::Err> + $($marker +)* $lf
68  {
69      subscriber.next(self.0);
70      subscriber.complete();
71  }
72}
73}
74
75impl<Item> Emitter for OfEmitter<Item> {
76  type Item = Item;
77  type Err = ();
78}
79
80impl<'a, Item> LocalEmitter<'a> for OfEmitter<Item> {
81  of_emitter!(LocalSubscription, 'a);
82}
83
84impl<Item> SharedEmitter for OfEmitter<Item> {
85  of_emitter!(SharedSubscription, Send + Sync + 'static);
86}
87
88/// Creates an observable that emits value or the error from a [`Result`] given.
89///
90/// Completes immediately after.
91///
92/// # Arguments
93///
94/// * `r` - A [`Result`] argument to take a value, or an error to emits from.
95///
96/// # Examples
97///
98/// ```
99/// use rxrust::prelude::*;
100///
101/// observable::of_result(Ok(1234))
102///   .subscribe(|v| {println!("{},", v)});
103/// ```
104///
105/// ```
106/// use rxrust::prelude::*;
107///
108/// observable::of_result(Err("An error"))
109///   .subscribe_err(|v: &i32| {}, |e| {println!("Error:  {},", e)});
110/// ```
111pub fn of_result<Item, Err>(
112  r: Result<Item, Err>,
113) -> ObservableBase<ResultEmitter<Item, Err>> {
114  ObservableBase::new(ResultEmitter(r))
115}
116
117#[doc(hidden)]
118macro_rules! of_result_emitter {
119    ($subscription:ty, $($marker:ident +)* $lf: lifetime) => {
120  fn emit<O>(self, mut subscriber: Subscriber<O, $subscription>)
121  where
122    O: Observer<Item=Self::Item,Err= Self::Err> + $($marker +)* $lf
123  {
124      match self.0 {
125        Ok(v) => subscriber.next(v),
126        Err(e) => subscriber.error(e),
127      };
128      subscriber.complete();
129  }
130}
131}
132
133#[derive(Clone)]
134pub struct ResultEmitter<Item, Err>(pub(crate) Result<Item, Err>);
135
136impl<Item, Err> Emitter for ResultEmitter<Item, Err> {
137  type Item = Item;
138  type Err = Err;
139}
140
141impl<'a, Item, Err> LocalEmitter<'a> for ResultEmitter<Item, Err> {
142  of_result_emitter!(LocalSubscription, 'a);
143}
144
145impl<Item, Err> SharedEmitter for ResultEmitter<Item, Err> {
146  of_result_emitter!(SharedSubscription, Send + Sync + 'static);
147}
148
149/// Creates an observable that potentially emits a single value from [`Option`].
150///
151/// Emits the value if is there, and completes immediately after. When the
152/// given option has not value, completes immediately. Never emits an error.
153///
154/// # Arguments
155///
156/// * `o` - An optional used to take a value to emits from.
157///
158/// # Examples
159///
160/// ```
161/// use rxrust::prelude::*;
162///
163/// observable::of_option(Some(1234))
164///   .subscribe(|v| {println!("{},", v)});
165/// ```
166pub fn of_option<Item>(o: Option<Item>) -> ObservableBase<OptionEmitter<Item>> {
167  ObservableBase::new(OptionEmitter(o))
168}
169
170#[derive(Clone)]
171pub struct OptionEmitter<Item>(pub(crate) Option<Item>);
172
173#[doc(hidden)]
174macro_rules! of_option_emitter {
175    ($subscription:ty, $($marker:ident +)* $lf: lifetime) => {
176  fn emit<O>(self, mut subscriber: Subscriber<O, $subscription>)
177  where
178    O: Observer<Item=Self::Item,Err= Self::Err> + $($marker +)* $lf
179  {
180      if let Some(v) = self.0 {
181        subscriber.next(v)
182      }
183      subscriber.complete();
184  }
185}
186}
187
188impl<Item> Emitter for OptionEmitter<Item> {
189  type Item = Item;
190  type Err = ();
191}
192
193impl<'a, Item> LocalEmitter<'a> for OptionEmitter<Item> {
194  of_option_emitter!(LocalSubscription, 'a);
195}
196
197impl<Item> SharedEmitter for OptionEmitter<Item> {
198  of_option_emitter!(SharedSubscription, Send + Sync + 'static);
199}
200
201/// Creates an observable that emits the return value of a callable.
202///
203/// Never emits an error.
204///
205/// # Arguments
206///
207/// * `f` - A function that will be called to obtain its return value to emits.
208///
209/// # Examples
210///
211/// ```
212/// use rxrust::prelude::*;
213///
214/// observable::of_fn(|| {1234})
215///   .subscribe(|v| {println!("{},", v)});
216/// ```
217pub fn of_fn<F, Item>(f: F) -> ObservableBase<CallableEmitter<F>>
218where
219  F: FnOnce() -> Item,
220{
221  ObservableBase::new(CallableEmitter(f))
222}
223
224#[derive(Clone)]
225pub struct CallableEmitter<F>(F);
226
227#[doc(hidden)]
228macro_rules! of_fn_emitter {
229    ($subscription:ty, $($marker:ident +)* $lf: lifetime) => {
230  fn emit<O>(self, mut subscriber: Subscriber<O, $subscription>)
231  where
232    O: Observer<Item=Self::Item,Err= Self::Err> + $($marker +)* $lf
233  {
234      subscriber.next((self.0)());
235      subscriber.complete();
236  }
237}
238}
239
240impl<Item, F> Emitter for CallableEmitter<F>
241where
242  F: FnOnce() -> Item,
243{
244  type Item = Item;
245  type Err = ();
246}
247
248impl<'a, Item, F> LocalEmitter<'a> for CallableEmitter<F>
249where
250  F: FnOnce() -> Item,
251{
252  of_fn_emitter!(LocalSubscription, 'a);
253}
254
255impl<Item, F> SharedEmitter for CallableEmitter<F>
256where
257  F: FnOnce() -> Item,
258{
259  of_fn_emitter!(SharedSubscription, Send + Sync + 'static);
260}
261
262#[cfg(test)]
263mod test {
264  use crate::prelude::*;
265
266  #[test]
267  fn from_fn() {
268    let mut value = 0;
269    let mut completed = false;
270    let callable = || 123;
271    observable::of_fn(callable).subscribe_complete(
272      |v| {
273        value = v;
274      },
275      || completed = true,
276    );
277
278    assert_eq!(value, 123);
279    assert!(completed);
280  }
281
282  #[test]
283  fn of_option() {
284    let mut value1 = 0;
285    let mut completed1 = false;
286    observable::of_option(Some(123)).subscribe_complete(
287      |v| {
288        value1 = v;
289      },
290      || completed1 = true,
291    );
292
293    assert_eq!(value1, 123);
294    assert!(completed1);
295
296    let mut value2 = 0;
297    let mut completed2 = false;
298    observable::of_option(None).subscribe_complete(
299      |v| {
300        value2 = v;
301      },
302      || completed2 = true,
303    );
304
305    assert_eq!(value2, 0);
306    assert!(completed2);
307  }
308
309  #[test]
310  fn of_result() {
311    let mut value1 = 0;
312    let mut completed1 = false;
313    let r: Result<i32, &str> = Ok(123);
314    observable::of_result(r).subscribe_all(
315      |v| {
316        value1 = v;
317      },
318      |_| {},
319      || completed1 = true,
320    );
321
322    assert_eq!(value1, 123);
323    assert!(completed1);
324
325    let mut value2 = 0;
326    let mut error_reported = false;
327    let r: Result<i32, &str> = Err("error");
328    observable::of_result(r)
329      .subscribe_err(|_| value2 = 123, |_| error_reported = true);
330
331    assert_eq!(value2, 0);
332    assert!(error_reported);
333  }
334
335  #[test]
336  fn of() {
337    let mut value = 0;
338    let mut completed = false;
339    observable::of(100).subscribe_complete(|v| value = v, || completed = true);
340
341    assert_eq!(value, 100);
342    assert!(completed);
343  }
344
345  #[test]
346  fn of_macros() {
347    let mut value = 0;
348    of_sequence!(1, 2, 3).subscribe(|v| value += v);
349
350    assert_eq!(value, 6);
351  }
352
353  #[test]
354  fn bench() { do_bench(); }
355
356  benchmark_group!(do_bench, bench_of);
357
358  fn bench_of(b: &mut bencher::Bencher) { b.iter(of); }
359}