handle_error_observable/
handle_error_observable.rs

1//! This `Observable` waits for user input and emits both a value and a completion
2//! signal upon success. In case of any errors, it signals them to the attached `Observer`.
3//!
4//! Ensure errors are wrapped in an `Arc` before passing them to the Observer's
5//! `error` function.
6//!
7//! To run this example, execute `cargo run --example handle_error_observable`.
8
9use std::{error::Error, fmt::Display, io, sync::Arc};
10
11use rxr::{subscribe::*, Observable, Observer, Subscribeable};
12
13#[derive(Debug)]
14struct MyErr(i32);
15
16impl Display for MyErr {
17    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
18        write!(f, "number should be less than 100, you entered {}", self.0)
19    }
20}
21
22impl Error for MyErr {}
23
24// Creates an `Observable<i32>` that processes user input and emits or signals errors.
25pub fn get_less_than_100() -> Observable<i32> {
26    Observable::new(|mut observer| {
27        let mut input = String::new();
28
29        println!("Please enter an integer (less than 100):");
30
31        if let Err(e) = io::stdin().read_line(&mut input) {
32            // Send input error to the observer.
33            observer.error(Arc::new(e));
34            return Subscription::new(UnsubscribeLogic::Nil, SubscriptionHandle::Nil);
35        }
36
37        match input.trim().parse::<i32>() {
38            Err(e) => {
39                // Send parsing error to the observer.
40                observer.error(Arc::new(e));
41            }
42            Ok(num) if num > 100 => {
43                // Send custom error to the observer.
44                observer.error(Arc::new(MyErr(num)))
45            }
46            Ok(num) => {
47                // Emit the parsed value to the observer.
48                observer.next(num);
49            }
50        }
51
52        // Signal completion if there are no errors.
53        // Note: `complete` does not affect the outcome if `error` was called before it.
54        observer.complete();
55
56        Subscription::new(UnsubscribeLogic::Nil, SubscriptionHandle::Nil)
57    })
58}
59
60fn main() {
61    let observer = Subscriber::new(
62        |input| println!("You entered: {}", input),
63        |e| eprintln!("{}", e),
64        || println!("User input handled"),
65    );
66
67    let mut observable = get_less_than_100();
68
69    observable.subscribe(observer);
70}