handle_error_observable/
handle_error_observable.rs1use std::{error::Error, fmt::Display, io, sync::Arc};
10
11use rxr::{subscribe::*, Observable, Observer, Subscribeable};
12
13#[derive(Debug)]
14struct MyErr(i32);
15
16impl Display for MyErr {
17 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
18 write!(f, "number should be less than 100, you entered {}", self.0)
19 }
20}
21
22impl Error for MyErr {}
23
24pub fn get_less_than_100() -> Observable<i32> {
26 Observable::new(|mut observer| {
27 let mut input = String::new();
28
29 println!("Please enter an integer (less than 100):");
30
31 if let Err(e) = io::stdin().read_line(&mut input) {
32 observer.error(Arc::new(e));
34 return Subscription::new(UnsubscribeLogic::Nil, SubscriptionHandle::Nil);
35 }
36
37 match input.trim().parse::<i32>() {
38 Err(e) => {
39 observer.error(Arc::new(e));
41 }
42 Ok(num) if num > 100 => {
43 observer.error(Arc::new(MyErr(num)))
45 }
46 Ok(num) => {
47 observer.next(num);
49 }
50 }
51
52 observer.complete();
55
56 Subscription::new(UnsubscribeLogic::Nil, SubscriptionHandle::Nil)
57 })
58}
59
60fn main() {
61 let observer = Subscriber::new(
62 |input| println!("You entered: {}", input),
63 |e| eprintln!("{}", e),
64 || println!("User input handled"),
65 );
66
67 let mut observable = get_less_than_100();
68
69 observable.subscribe(observer);
70}