basic_observable/basic_observable.rs
1//! This simple `Observable` emits values and completes. It returns an empty
2//! `Subscription`, making it unable to be unsubscribed from. Some operators like
3//! `take`, `switch_map`, `merge_map`, `concat_map`, and `exhaust_map` require
4//! unsubscribe functionality to work correctly.
5//!
6//! Additionally, this is a synchronous Observable, so it blocks the current thread
7//! until it completes emissions.
8//!
9//! To run this example, execute `cargo run --example basic_observable`.
10
11use rxr::subscribe::{Subscriber, Subscription, SubscriptionHandle, UnsubscribeLogic};
12use rxr::{Observable, Observer, Subscribeable};
13
14fn main() {
15 // Create a custom observable that emits values from 1 to 10.
16 let mut emit_10_observable = Observable::new(|mut subscriber| {
17 let mut i = 1;
18
19 while i <= 10 {
20 // Emit the value to the subscriber.
21 subscriber.next(i);
22
23 i += 1;
24 }
25
26 // Signal completion to the subscriber.
27 subscriber.complete();
28
29 // Return the empty subscription.
30 Subscription::new(UnsubscribeLogic::Nil, SubscriptionHandle::Nil)
31 });
32
33 // Create the `Subscriber` with a mandatory `next` function, and optional
34 // `complete` function. No need for `error` function in this simple example.
35 let mut observer = Subscriber::on_next(|v| println!("Emitted {}", v));
36 observer.on_complete(|| println!("Completed"));
37
38 // This observable does not use async or threads so it will block until it is done.
39 // Observables are cold so if you comment out the line bellow nothing will be emitted.
40 emit_10_observable.subscribe(observer);
41
42 println!("Custom Observable finished emmiting")
43}