1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
//! `BehaviorSubject` example with error handling
//!
//! This example showcases the functionality of the `BehaviorSubject` emitting error
//! in the `rxr` library.
//!
//! The `BehaviorSubject` is a type of subject in reactive programming that emits the
//! most recently emitted item and all subsequent items of the source observable to
//! its subscribers. It holds a current value, and new subscribers receive the last
//! emitted value immediately upon subscription. If no items have been emitted yet,
//! subscribers will receive a default or specified initial value.
//!
//! To run this example, execute `cargo run --example behavior_subject_error`.

use std::error::Error;
use std::fmt::Display;
use std::sync::Arc;

use rxr::{subjects::BehaviorSubject, subscribe::Subscriber};
use rxr::{ObservableExt, Observer, Subscribeable};

pub fn create_subscriber(subscriber_id: i32) -> Subscriber<i32> {
    Subscriber::new(
        move |v| println!("Subscriber #{} emitted: {}", subscriber_id, v),
        move |e| eprintln!("Error: {} {}", e, subscriber_id),
        || println!("Completed"),
    )
}

#[derive(Debug)]
struct BehaviorSubjectError(String);

impl Display for BehaviorSubjectError {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        write!(f, "{}", self.0)
    }
}

impl Error for BehaviorSubjectError {}

pub fn main() {
    // Initialize a `BehaviorSubject` with an initial value and obtain
    // its emitter and receiver.
    let (mut emitter, mut receiver) = BehaviorSubject::emitter_receiver(100);

    // Registers `Subscriber` 1 and emits the default value 100 to it.
    receiver.subscribe(create_subscriber(1));

    emitter.next(101); // Emits 101 to registered `Subscriber` 1.
    emitter.next(102); // Emits 102 to registered `Subscriber` 1.

    // All Observable operators can be applied to the receiver.
    // Registers mapped `Subscriber` 2 and emits (now the default) value 102 to it.
    receiver
        .clone() // Shallow clone: clones only the pointer to the `BehaviorSubject` object.
        .map(|v| format!("mapped {}", v))
        .subscribe(Subscriber::new(
            move |v| println!("Subscriber #2 emitted: {}", v),
            |e| eprintln!("Error: {} 2", e),
            || println!("Completed"),
        ));

    // Registers `Subscriber` 3 and emits (now the default) value 102 to it.
    receiver.subscribe(create_subscriber(3));

    emitter.next(103); // Emits 103 to registered `Subscriber`'s 1, 2 and 3.

    // Calls `error` on registered `Subscriber`'s 1, 2 and 3.
    emitter.error(Arc::new(BehaviorSubjectError(
        "BehaviorSubject error".to_string(),
    )));

    // Subscriber 4: subscribed after subject's error call; emits error and
    // does not emit further.
    receiver.subscribe(create_subscriber(4));

    emitter.next(104); // Called after subject's error call, does not emit.
}