Struct rxr::subscription::subscribe::Subscriber

source ·
pub struct Subscriber<NextFnType> { /* private fields */ }
Expand description

A type that acts as an observer, allowing users to handle emitted values, errors, and completion when subscribing to an Observable or Subject.

Users can create a Subscriber instance using the new method and provide custom functions to handle the next, error, and complete events.

Implementations§

source§

impl<NextFnType> Subscriber<NextFnType>

source

pub fn new( next_fn: impl FnMut(NextFnType) + 'static + Send, error_fn: impl FnMut(Arc<dyn Error + Send + Sync>) + 'static + Send + Sync, complete_fn: impl FnMut() + 'static + Send + Sync ) -> Self

Creates a new Subscriber instance with custom handling functions for emitted values, errors, and completion.

Examples found in repository?
examples/behavior_subject.rs (lines 17-21)
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
pub fn create_subscriber(subscriber_id: i32) -> Subscriber<i32> {
    Subscriber::new(
        move |v| println!("Subscriber #{} emitted: {}", subscriber_id, v),
        |_| eprintln!("Error"),
        move || println!("Completed {}", subscriber_id),
    )
}

pub fn main() {
    // Initialize a `BehaviorSubject` with an initial value and obtain
    // its emitter and receiver.
    let (mut emitter, mut receiver) = BehaviorSubject::emitter_receiver(100);

    // Registers `Subscriber` 1 and emits the default value 100 to it.
    receiver.subscribe(create_subscriber(1));

    emitter.next(101); // Emits 101 to registered `Subscriber` 1.
    emitter.next(102); // Emits 102 to registered `Subscriber` 1.

    // All Observable operators can be applied to the receiver.
    // Registers mapped `Subscriber` 2 and emits (now the default) value 102 to it.
    receiver
        .clone() // Shallow clone: clones only the pointer to the `BehaviorSubject` object.
        .map(|v| format!("mapped {}", v))
        .subscribe(Subscriber::new(
            move |v| println!("Subscriber #2 emitted: {}", v),
            |_| eprintln!("Error"),
            || println!("Completed 2"),
        ));

    // Registers `Subscriber` 3 and emits (now the default) value 102 to it.
    receiver.subscribe(create_subscriber(3));

    emitter.next(103); // Emits 103 to registered `Subscriber`'s 1, 2 and 3.

    emitter.complete(); // Calls `complete` on registered `Subscriber`'s 1, 2 and 3.

    // Subscriber 4: post-completion subscribe, completes immediately.
    receiver.subscribe(create_subscriber(4));

    emitter.next(104); // Called post-completion, does not emit.
}
More examples
Hide additional examples
examples/replay_subject.rs (lines 23-27)
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
pub fn create_subscriber(subscriber_id: i32) -> Subscriber<i32> {
    Subscriber::new(
        move |v| println!("Subscriber #{} emitted: {}", subscriber_id, v),
        |_| eprintln!("Error"),
        move || println!("Completed {}", subscriber_id),
    )
}

pub fn main() {
    // Initialize a `ReplaySubject` with an unbounded buffer size and obtain
    // its emitter and receiver.
    let (mut emitter, mut receiver) = ReplaySubject::emitter_receiver(BufSize::Unbounded);

    // Registers `Subscriber` 1.
    receiver.subscribe(create_subscriber(1));

    emitter.next(101); // Stores 101 and emits it to registered `Subscriber` 1.
    emitter.next(102); // Stores 102 and emits it to registered `Subscriber` 1.

    // All Observable operators can be applied to the receiver.
    // Registers mapped `Subscriber` 2 and emits buffered values (101, 102) to it.
    receiver
        .clone() // Shallow clone: clones only the pointer to the `ReplaySubject` object.
        .map(|v| format!("mapped {}", v))
        .subscribe(Subscriber::new(
            move |v| println!("Subscriber #2 emitted: {}", v),
            |_| eprintln!("Error"),
            || println!("Completed 2"),
        ));

    // Registers `Subscriber` 3 and emits buffered values (101, 102) to it.
    receiver.subscribe(create_subscriber(3));

    emitter.next(103); // Stores 103 and emits it to registered `Subscriber`'s 1, 2 and 3.

    emitter.complete(); // Calls `complete` on registered `Subscriber`'s 1, 2 and 3.

    // Subscriber 4: post-completion subscribe, emits buffered values (101, 102, 103)
    // and completes.
    receiver.subscribe(create_subscriber(4));

    emitter.next(104); // Called post-completion, does not emit.
}
examples/async_subject.rs (lines 16-20)
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
pub fn create_subscriber(subscriber_id: i32) -> Subscriber<i32> {
    Subscriber::new(
        move |v| println!("Subscriber #{} emitted: {}", subscriber_id, v),
        |_| eprintln!("Error"),
        move || println!("Completed {}", subscriber_id),
    )
}

pub fn main() {
    // Initialize a `AsyncSubject` and obtain its emitter and receiver.
    let (mut emitter, mut receiver) = AsyncSubject::emitter_receiver();

    // Registers `Subscriber` 1.
    receiver.subscribe(create_subscriber(1));

    emitter.next(101); // Stores 101 ast the latest value.
    emitter.next(102); // Latest value is now 102.

    // All Observable operators can be applied to the receiver.
    // Registers mapped `Subscriber` 2.
    receiver
        .clone() // Shallow clone: clones only the pointer to the `AsyncSubject` object.
        .map(|v| format!("mapped {}", v))
        .subscribe(Subscriber::new(
            move |v| println!("Subscriber #2 emitted: {}", v),
            |_| eprintln!("Error"),
            || println!("Completed 2"),
        ));

    // Registers `Subscriber` 3.
    receiver.subscribe(create_subscriber(3));

    emitter.next(103); // Latest value is now 103.

    // Emits latest value (103) to registered `Subscriber`'s 1, 2 and 3 and calls
    // `complete` on each of them.
    emitter.complete();

    // Subscriber 4: post-completion subscribe, emits latest value (103) and completes.
    receiver.subscribe(create_subscriber(4));

    emitter.next(104); // Called post-completion, does not emit.
}
examples/subject.rs (lines 14-18)
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
pub fn create_subscriber(subscriber_id: i32) -> Subscriber<i32> {
    Subscriber::new(
        move |v| println!("Subscriber #{} emitted: {}", subscriber_id, v),
        |_| eprintln!("Error"),
        move || println!("Completed {}", subscriber_id),
    )
}

pub fn main() {
    // Initialize a `Subject` and obtain its emitter and receiver.
    let (mut emitter, mut receiver) = Subject::emitter_receiver();

    // Registers `Subscriber` 1.
    receiver.subscribe(create_subscriber(1));

    emitter.next(101); // Emits 101 to registered `Subscriber` 1.
    emitter.next(102); // Emits 102 to registered `Subscriber` 1.

    // All Observable operators can be applied to the receiver.
    // Registers mapped `Subscriber` 2.
    receiver
        .clone() // Shallow clone: clones only the pointer to the `Subject` object.
        .map(|v| format!("mapped {}", v))
        .subscribe(Subscriber::new(
            move |v| println!("Subscriber #2 emitted: {}", v),
            |_| eprintln!("Error"),
            || println!("Completed 2"),
        ));

    // Registers `Subscriber` 3.
    receiver.subscribe(create_subscriber(3));

    emitter.next(103); // Emits 103 to registered `Subscriber`'s 1, 2 and 3.

    emitter.complete(); // Calls `complete` on registered `Subscriber`'s 1, 2 and 3.

    // Subscriber 4: post-completion subscribe, completes immediately.
    receiver.subscribe(create_subscriber(4));

    emitter.next(104); // Called post-completion, does not emit.
}
examples/async_subject_error.rs (lines 21-25)
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
pub fn create_subscriber(subscriber_id: i32) -> Subscriber<i32> {
    Subscriber::new(
        move |v| println!("Subscriber #{} emitted: {}", subscriber_id, v),
        move |e| eprintln!("Error: {} {}", e, subscriber_id),
        || println!("Completed"),
    )
}

#[derive(Debug)]
struct AsyncSubjectError(String);

impl Display for AsyncSubjectError {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        write!(f, "{}", self.0)
    }
}

impl Error for AsyncSubjectError {}

pub fn main() {
    // Initialize a `AsyncSubject` and obtain its emitter and receiver.
    let (mut emitter, mut receiver) = AsyncSubject::emitter_receiver();

    // Registers `Subscriber` 1.
    receiver.subscribe(create_subscriber(1));

    emitter.next(101); // Stores 101 ast the latest value.
    emitter.next(102); // Latest value is now 102.

    // All Observable operators can be applied to the receiver.
    // Registers mapped `Subscriber` 2.
    receiver
        .clone() // Shallow clone: clones only the pointer to the `AsyncSubject` object.
        .map(|v| format!("mapped {}", v))
        .subscribe(Subscriber::new(
            move |v| println!("Subscriber #2 emitted: {}", v),
            |e| eprintln!("Error: {} 2", e),
            || println!("Completed"),
        ));

    // Registers `Subscriber` 3.
    receiver.subscribe(create_subscriber(3));

    emitter.next(103); // Latest value is now 103.

    // Calls `error` on registered `Subscriber`'s 1, 2 and 3.
    emitter.error(Arc::new(AsyncSubjectError(
        "AsyncSubject error".to_string(),
    )));

    // Subscriber 4: subscribed after subject's error call; emits error and
    // does not emit further.
    receiver.subscribe(create_subscriber(4));

    emitter.next(104); // Called post-completion, does not emit.
}
examples/behavior_subject_error.rs (lines 22-26)
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
pub fn create_subscriber(subscriber_id: i32) -> Subscriber<i32> {
    Subscriber::new(
        move |v| println!("Subscriber #{} emitted: {}", subscriber_id, v),
        move |e| eprintln!("Error: {} {}", e, subscriber_id),
        || println!("Completed"),
    )
}

#[derive(Debug)]
struct BehaviorSubjectError(String);

impl Display for BehaviorSubjectError {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        write!(f, "{}", self.0)
    }
}

impl Error for BehaviorSubjectError {}

pub fn main() {
    // Initialize a `BehaviorSubject` with an initial value and obtain
    // its emitter and receiver.
    let (mut emitter, mut receiver) = BehaviorSubject::emitter_receiver(100);

    // Registers `Subscriber` 1 and emits the default value 100 to it.
    receiver.subscribe(create_subscriber(1));

    emitter.next(101); // Emits 101 to registered `Subscriber` 1.
    emitter.next(102); // Emits 102 to registered `Subscriber` 1.

    // All Observable operators can be applied to the receiver.
    // Registers mapped `Subscriber` 2 and emits (now the default) value 102 to it.
    receiver
        .clone() // Shallow clone: clones only the pointer to the `BehaviorSubject` object.
        .map(|v| format!("mapped {}", v))
        .subscribe(Subscriber::new(
            move |v| println!("Subscriber #2 emitted: {}", v),
            |e| eprintln!("Error: {} 2", e),
            || println!("Completed"),
        ));

    // Registers `Subscriber` 3 and emits (now the default) value 102 to it.
    receiver.subscribe(create_subscriber(3));

    emitter.next(103); // Emits 103 to registered `Subscriber`'s 1, 2 and 3.

    // Calls `error` on registered `Subscriber`'s 1, 2 and 3.
    emitter.error(Arc::new(BehaviorSubjectError(
        "BehaviorSubject error".to_string(),
    )));

    // Subscriber 4: subscribed after subject's error call; emits error and
    // does not emit further.
    receiver.subscribe(create_subscriber(4));

    emitter.next(104); // Called after subject's error call, does not emit.
}
source

pub fn on_next(next_fn: impl FnMut(NextFnType) + 'static + Send) -> Self

Create a new Subscriber with the provided next function.

The next closure is called when the observable emits a new item. It takes a parameter of type NextFnType, which is an item emitted by the observable.

Examples found in repository?
examples/basic_observable.rs (line 35)
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
fn main() {
    // Create a custom observable that emits values from 1 to 10.
    let mut emit_10_observable = Observable::new(|mut subscriber| {
        let mut i = 1;

        while i <= 10 {
            // Emit the value to the subscriber.
            subscriber.next(i);

            i += 1;
        }

        // Signal completion to the subscriber.
        subscriber.complete();

        // Return the empty subscription.
        Subscription::new(UnsubscribeLogic::Nil, SubscriptionHandle::Nil)
    });

    // Create the `Subscriber` with a mandatory `next` function, and optional
    // `complete` function. No need for `error` function in this simple example.
    let mut observer = Subscriber::on_next(|v| println!("Emitted {}", v));
    observer.on_complete(|| println!("Completed"));

    // This observable does not use async or threads so it will block until it is done.
    // Observables are cold so if you comment out the line bellow nothing will be emitted.
    emit_10_observable.subscribe(observer);

    println!("Custom Observable finished emmiting")
}
More examples
Hide additional examples
examples/take_last_operator.rs (line 52)
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
async fn main() {
    let observable = Observable::new(|mut o| {
        let done = Arc::new(Mutex::new(false));
        let done_c = Arc::clone(&done);
        let (tx, mut rx) = channel(10);

        task::spawn(async move {
            if let Some(UNSUBSCRIBE_SIGNAL) = rx.recv().await {
                *done_c.lock().unwrap() = UNSUBSCRIBE_SIGNAL;
            }
        });

        let join_handle = task::spawn(async move {
            for i in 0..100 {
                if *done.lock().unwrap() == UNSUBSCRIBE_SIGNAL {
                    break;
                }
                o.next(i);
                time::sleep(time::Duration::from_millis(1)).await;
            }
            o.complete();
        });

        Subscription::new(
            UnsubscribeLogic::Future(Box::pin(async move {
                if tx.send(UNSUBSCRIBE_SIGNAL).await.is_err() {
                    println!("Receiver dropped.");
                }
            })),
            SubscriptionHandle::JoinTask(join_handle),
        )
    });

    let mut observer = Subscriber::on_next(|v| println!("Emitted {}", v));
    observer.on_complete(|| println!("Completed"));

    // Capture and process only the last 8 values.
    let subscription = observable.take_last(8).subscribe(observer);

    // Do something else here.
    println!("Do something while Observable is emitting.");

    // Wait for observable to finish before exiting the program.
    if subscription.join_concurrent().await.is_err() {
        // Handle error
    }

    println!("`main` function done")
}
examples/take_while_operator.rs (line 55)
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
async fn main() {
    let observable = Observable::new(|mut o| {
        let done = Arc::new(Mutex::new(false));
        let done_c = Arc::clone(&done);
        let (tx, mut rx) = channel(10);

        task::spawn(async move {
            if let Some(UNSUBSCRIBE_SIGNAL) = rx.recv().await {
                *done_c.lock().unwrap() = UNSUBSCRIBE_SIGNAL;
            }
        });

        let join_handle = task::spawn(async move {
            for i in 0..100 {
                if *done.lock().unwrap() == UNSUBSCRIBE_SIGNAL {
                    break;
                }
                o.next(i);
                time::sleep(time::Duration::from_millis(1)).await;
            }
            o.complete();
        });

        Subscription::new(
            UnsubscribeLogic::Future(Box::pin(async move {
                if tx.send(UNSUBSCRIBE_SIGNAL).await.is_err() {
                    println!("Receiver dropped.");
                }
            })),
            SubscriptionHandle::JoinTask(join_handle),
        )
    });

    let mut observer = Subscriber::on_next(|v| println!("Emitted {}", v));
    observer.on_complete(|| println!("Completed"));

    // Emit values only when they are less than or equal to 40.
    let subscription = observable.take_while(|v| v <= &40).subscribe(observer);

    // Do something else here.
    println!("Do something while Observable is emitting.");

    // Wait for observable to finish before exiting the program.
    if subscription.join_concurrent().await.is_err() {
        // Handle error
    }

    println!("`main` function done")
}
examples/scan_operator.rs (line 69)
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
async fn main() {
    let observable = Observable::new(|mut o| {
        let done = Arc::new(Mutex::new(false));
        let done_c = Arc::clone(&done);
        let (tx, mut rx) = channel(10);

        task::spawn(async move {
            if let Some(UNSUBSCRIBE_SIGNAL) = rx.recv().await {
                *done_c.lock().unwrap() = UNSUBSCRIBE_SIGNAL;
            }
        });

        let join_handle = task::spawn(async move {
            for i in 0..100 {
                if *done.lock().unwrap() == UNSUBSCRIBE_SIGNAL {
                    break;
                }
                o.next(i);
                time::sleep(time::Duration::from_millis(1)).await;
            }
            o.complete();
        });

        Subscription::new(
            UnsubscribeLogic::Future(Box::pin(async move {
                if tx.send(UNSUBSCRIBE_SIGNAL).await.is_err() {
                    println!("Receiver dropped.");
                }
            })),
            SubscriptionHandle::JoinTask(join_handle),
        )
    });

    let mut observer = Subscriber::on_next(|v| println!("Emitted: {}", v));
    observer.on_complete(|| println!("Completed"));

    // Accumulate response strings into a single string.
    // The types of `total` and `n` may differ as long as `total` implements the `From<n>` trait.
    // In this example, `total` is of type `String`, and `n` is of type `&str`.
    let subscription = observable
        .take(6)
        .delay(100)
        .merge_map(|_| get_response_observable())
        .scan(|total, n| format!("{} {}", total, n), None)
        .subscribe(observer);

    // Do something else here.
    println!("Do something while Observable is emitting.");

    // Wait for observable to finish before exiting the program.
    if subscription.join_concurrent().await.is_err() {
        // Handle error
    }

    println!("`main` function done")
}
examples/take_until_operator.rs (line 52)
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
async fn main() {
    let observable = Observable::new(|mut o| {
        let done = Arc::new(Mutex::new(false));
        let done_c = Arc::clone(&done);
        let (tx, mut rx) = channel(10);

        task::spawn(async move {
            if let Some(UNSUBSCRIBE_SIGNAL) = rx.recv().await {
                *done_c.lock().unwrap() = UNSUBSCRIBE_SIGNAL;
            }
        });

        let join_handle = task::spawn(async move {
            for i in 0..100 {
                if *done.lock().unwrap() == UNSUBSCRIBE_SIGNAL {
                    break;
                }
                o.next(i);
                time::sleep(time::Duration::from_millis(1)).await;
            }
            o.complete();
        });

        Subscription::new(
            UnsubscribeLogic::Future(Box::pin(async move {
                if tx.send(UNSUBSCRIBE_SIGNAL).await.is_err() {
                    println!("Receiver dropped.");
                }
            })),
            SubscriptionHandle::JoinTask(join_handle),
        )
    });

    let mut observer = Subscriber::on_next(|v| println!("Emitted {}", v));
    observer.on_complete(|| println!("Completed"));

    // Create notifier, it can be observable or one of the Subject variants.
    let (mut emitter, receiver) = Subject::<()>::emitter_receiver();

    // Turning Subject into an observable. To continue using the receiver later,
    // utilize `.clone()`, e.g. `receiver.clone().into()`.
    let subscription = observable
        .take_until(receiver.into(), false)
        .subscribe(observer);

    // Allowing some time for the `take_until` function to register the notifier
    // before emitting a signal. This step is unnecessary if you're not immediately
    // sending a signal.
    time::sleep(time::Duration::from_millis(1)).await;

    // Send signal to stop source observable emitting values.
    emitter.next(());

    // Do something else here.
    println!("Do something while Observable is emitting.");

    // Wait for observable to finish before exiting the program.
    if subscription.join_concurrent().await.is_err() {
        // Handle error
    }

    println!("`main` function done")
}
examples/take_until_operator_os.rs (line 53)
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
fn main() {
    let observable = Observable::new(|mut o| {
        let done = Arc::new(Mutex::new(false));
        let done_c = Arc::clone(&done);
        let (tx, rx) = std::sync::mpsc::channel();

        std::thread::spawn(move || {
            if let Ok(UNSUBSCRIBE_SIGNAL) = rx.recv() {
                *done_c.lock().unwrap() = UNSUBSCRIBE_SIGNAL;
            }
        });

        let join_handle = std::thread::spawn(move || {
            for i in 0..100 {
                if *done.lock().unwrap() == UNSUBSCRIBE_SIGNAL {
                    break;
                }
                o.next(i);
                std::thread::sleep(Duration::from_millis(1));
            }
            o.complete();
        });

        Subscription::new(
            UnsubscribeLogic::Logic(Box::new(move || {
                if tx.send(UNSUBSCRIBE_SIGNAL).is_err() {
                    println!("Receiver dropped.");
                }
            })),
            SubscriptionHandle::JoinThread(join_handle),
        )
    });

    let mut observer = Subscriber::on_next(|v| println!("Emitted {}", v));
    observer.on_complete(|| println!("Completed"));

    // Create notifier, it can be observable or one of the Subject variants.
    let (mut emitter, receiver) = Subject::emitter_receiver();

    // We can chain the Subject, here we use `delay()` to slow down the notifier so
    // source observable has time to emmit some values. Note when we chain the
    // notifier with operators we don't have to use `into()`. To continue using the
    // receiver later, utilize `.clone()`, e.g. `receiver.clone().delay(20)`.
    let subscription = observable
        .take_until(receiver.delay(20), false)
        .subscribe(observer);

    // Allowing some time for the `take_until` function to register the notifier
    // before emitting a signal. This step is unnecessary if you're not immediately
    // sending a signal.
    std::thread::sleep(Duration::from_millis(1));

    // Send signal to stop source observable emitting values.
    emitter.next(());

    // Do something else here.
    println!("Do something while Observable is emitting.");

    // Wait for observable to finish before exiting the program.
    if subscription.join().is_err() {
        // Handle error
    }

    println!("`main` function done")
}
source

pub fn on_complete(&mut self, complete_fn: impl FnMut() + 'static + Send + Sync)

Set the completion function for the Subscriber.

The provided closure will be called when the observable completes its emission sequence.

Examples found in repository?
examples/basic_observable.rs (line 36)
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
fn main() {
    // Create a custom observable that emits values from 1 to 10.
    let mut emit_10_observable = Observable::new(|mut subscriber| {
        let mut i = 1;

        while i <= 10 {
            // Emit the value to the subscriber.
            subscriber.next(i);

            i += 1;
        }

        // Signal completion to the subscriber.
        subscriber.complete();

        // Return the empty subscription.
        Subscription::new(UnsubscribeLogic::Nil, SubscriptionHandle::Nil)
    });

    // Create the `Subscriber` with a mandatory `next` function, and optional
    // `complete` function. No need for `error` function in this simple example.
    let mut observer = Subscriber::on_next(|v| println!("Emitted {}", v));
    observer.on_complete(|| println!("Completed"));

    // This observable does not use async or threads so it will block until it is done.
    // Observables are cold so if you comment out the line bellow nothing will be emitted.
    emit_10_observable.subscribe(observer);

    println!("Custom Observable finished emmiting")
}
More examples
Hide additional examples
examples/take_last_operator.rs (line 53)
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
async fn main() {
    let observable = Observable::new(|mut o| {
        let done = Arc::new(Mutex::new(false));
        let done_c = Arc::clone(&done);
        let (tx, mut rx) = channel(10);

        task::spawn(async move {
            if let Some(UNSUBSCRIBE_SIGNAL) = rx.recv().await {
                *done_c.lock().unwrap() = UNSUBSCRIBE_SIGNAL;
            }
        });

        let join_handle = task::spawn(async move {
            for i in 0..100 {
                if *done.lock().unwrap() == UNSUBSCRIBE_SIGNAL {
                    break;
                }
                o.next(i);
                time::sleep(time::Duration::from_millis(1)).await;
            }
            o.complete();
        });

        Subscription::new(
            UnsubscribeLogic::Future(Box::pin(async move {
                if tx.send(UNSUBSCRIBE_SIGNAL).await.is_err() {
                    println!("Receiver dropped.");
                }
            })),
            SubscriptionHandle::JoinTask(join_handle),
        )
    });

    let mut observer = Subscriber::on_next(|v| println!("Emitted {}", v));
    observer.on_complete(|| println!("Completed"));

    // Capture and process only the last 8 values.
    let subscription = observable.take_last(8).subscribe(observer);

    // Do something else here.
    println!("Do something while Observable is emitting.");

    // Wait for observable to finish before exiting the program.
    if subscription.join_concurrent().await.is_err() {
        // Handle error
    }

    println!("`main` function done")
}
examples/take_while_operator.rs (line 56)
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
async fn main() {
    let observable = Observable::new(|mut o| {
        let done = Arc::new(Mutex::new(false));
        let done_c = Arc::clone(&done);
        let (tx, mut rx) = channel(10);

        task::spawn(async move {
            if let Some(UNSUBSCRIBE_SIGNAL) = rx.recv().await {
                *done_c.lock().unwrap() = UNSUBSCRIBE_SIGNAL;
            }
        });

        let join_handle = task::spawn(async move {
            for i in 0..100 {
                if *done.lock().unwrap() == UNSUBSCRIBE_SIGNAL {
                    break;
                }
                o.next(i);
                time::sleep(time::Duration::from_millis(1)).await;
            }
            o.complete();
        });

        Subscription::new(
            UnsubscribeLogic::Future(Box::pin(async move {
                if tx.send(UNSUBSCRIBE_SIGNAL).await.is_err() {
                    println!("Receiver dropped.");
                }
            })),
            SubscriptionHandle::JoinTask(join_handle),
        )
    });

    let mut observer = Subscriber::on_next(|v| println!("Emitted {}", v));
    observer.on_complete(|| println!("Completed"));

    // Emit values only when they are less than or equal to 40.
    let subscription = observable.take_while(|v| v <= &40).subscribe(observer);

    // Do something else here.
    println!("Do something while Observable is emitting.");

    // Wait for observable to finish before exiting the program.
    if subscription.join_concurrent().await.is_err() {
        // Handle error
    }

    println!("`main` function done")
}
examples/scan_operator.rs (line 70)
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
async fn main() {
    let observable = Observable::new(|mut o| {
        let done = Arc::new(Mutex::new(false));
        let done_c = Arc::clone(&done);
        let (tx, mut rx) = channel(10);

        task::spawn(async move {
            if let Some(UNSUBSCRIBE_SIGNAL) = rx.recv().await {
                *done_c.lock().unwrap() = UNSUBSCRIBE_SIGNAL;
            }
        });

        let join_handle = task::spawn(async move {
            for i in 0..100 {
                if *done.lock().unwrap() == UNSUBSCRIBE_SIGNAL {
                    break;
                }
                o.next(i);
                time::sleep(time::Duration::from_millis(1)).await;
            }
            o.complete();
        });

        Subscription::new(
            UnsubscribeLogic::Future(Box::pin(async move {
                if tx.send(UNSUBSCRIBE_SIGNAL).await.is_err() {
                    println!("Receiver dropped.");
                }
            })),
            SubscriptionHandle::JoinTask(join_handle),
        )
    });

    let mut observer = Subscriber::on_next(|v| println!("Emitted: {}", v));
    observer.on_complete(|| println!("Completed"));

    // Accumulate response strings into a single string.
    // The types of `total` and `n` may differ as long as `total` implements the `From<n>` trait.
    // In this example, `total` is of type `String`, and `n` is of type `&str`.
    let subscription = observable
        .take(6)
        .delay(100)
        .merge_map(|_| get_response_observable())
        .scan(|total, n| format!("{} {}", total, n), None)
        .subscribe(observer);

    // Do something else here.
    println!("Do something while Observable is emitting.");

    // Wait for observable to finish before exiting the program.
    if subscription.join_concurrent().await.is_err() {
        // Handle error
    }

    println!("`main` function done")
}
examples/take_until_operator.rs (line 53)
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
async fn main() {
    let observable = Observable::new(|mut o| {
        let done = Arc::new(Mutex::new(false));
        let done_c = Arc::clone(&done);
        let (tx, mut rx) = channel(10);

        task::spawn(async move {
            if let Some(UNSUBSCRIBE_SIGNAL) = rx.recv().await {
                *done_c.lock().unwrap() = UNSUBSCRIBE_SIGNAL;
            }
        });

        let join_handle = task::spawn(async move {
            for i in 0..100 {
                if *done.lock().unwrap() == UNSUBSCRIBE_SIGNAL {
                    break;
                }
                o.next(i);
                time::sleep(time::Duration::from_millis(1)).await;
            }
            o.complete();
        });

        Subscription::new(
            UnsubscribeLogic::Future(Box::pin(async move {
                if tx.send(UNSUBSCRIBE_SIGNAL).await.is_err() {
                    println!("Receiver dropped.");
                }
            })),
            SubscriptionHandle::JoinTask(join_handle),
        )
    });

    let mut observer = Subscriber::on_next(|v| println!("Emitted {}", v));
    observer.on_complete(|| println!("Completed"));

    // Create notifier, it can be observable or one of the Subject variants.
    let (mut emitter, receiver) = Subject::<()>::emitter_receiver();

    // Turning Subject into an observable. To continue using the receiver later,
    // utilize `.clone()`, e.g. `receiver.clone().into()`.
    let subscription = observable
        .take_until(receiver.into(), false)
        .subscribe(observer);

    // Allowing some time for the `take_until` function to register the notifier
    // before emitting a signal. This step is unnecessary if you're not immediately
    // sending a signal.
    time::sleep(time::Duration::from_millis(1)).await;

    // Send signal to stop source observable emitting values.
    emitter.next(());

    // Do something else here.
    println!("Do something while Observable is emitting.");

    // Wait for observable to finish before exiting the program.
    if subscription.join_concurrent().await.is_err() {
        // Handle error
    }

    println!("`main` function done")
}
examples/take_until_operator_os.rs (line 54)
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
fn main() {
    let observable = Observable::new(|mut o| {
        let done = Arc::new(Mutex::new(false));
        let done_c = Arc::clone(&done);
        let (tx, rx) = std::sync::mpsc::channel();

        std::thread::spawn(move || {
            if let Ok(UNSUBSCRIBE_SIGNAL) = rx.recv() {
                *done_c.lock().unwrap() = UNSUBSCRIBE_SIGNAL;
            }
        });

        let join_handle = std::thread::spawn(move || {
            for i in 0..100 {
                if *done.lock().unwrap() == UNSUBSCRIBE_SIGNAL {
                    break;
                }
                o.next(i);
                std::thread::sleep(Duration::from_millis(1));
            }
            o.complete();
        });

        Subscription::new(
            UnsubscribeLogic::Logic(Box::new(move || {
                if tx.send(UNSUBSCRIBE_SIGNAL).is_err() {
                    println!("Receiver dropped.");
                }
            })),
            SubscriptionHandle::JoinThread(join_handle),
        )
    });

    let mut observer = Subscriber::on_next(|v| println!("Emitted {}", v));
    observer.on_complete(|| println!("Completed"));

    // Create notifier, it can be observable or one of the Subject variants.
    let (mut emitter, receiver) = Subject::emitter_receiver();

    // We can chain the Subject, here we use `delay()` to slow down the notifier so
    // source observable has time to emmit some values. Note when we chain the
    // notifier with operators we don't have to use `into()`. To continue using the
    // receiver later, utilize `.clone()`, e.g. `receiver.clone().delay(20)`.
    let subscription = observable
        .take_until(receiver.delay(20), false)
        .subscribe(observer);

    // Allowing some time for the `take_until` function to register the notifier
    // before emitting a signal. This step is unnecessary if you're not immediately
    // sending a signal.
    std::thread::sleep(Duration::from_millis(1));

    // Send signal to stop source observable emitting values.
    emitter.next(());

    // Do something else here.
    println!("Do something while Observable is emitting.");

    // Wait for observable to finish before exiting the program.
    if subscription.join().is_err() {
        // Handle error
    }

    println!("`main` function done")
}
source

pub fn on_error( &mut self, error_fn: impl FnMut(Arc<dyn Error + Send + Sync>) + 'static + Send + Sync )

Set the error-handling function for the Subscriber.

The provided closure will be called when the observable encounters an error during its emission sequence. It takes an Arc wrapping a trait object that implements the Error, Send, and Sync traits as its parameter.

source

pub fn is_fused(&self) -> bool

If you fuse an observable this method will return true. When making your observable, you can use this method to check if observable is fused or not.

Observable::new(|subscriber| {
    // ...
    if subscriber.is_fused() { ... };
    // ...
});

Trait Implementations§

source§

impl<T: Clone + Send + 'static> From<AsyncSubjectEmitter<T>> for Subscriber<T>

source§

fn from(value: AsyncSubjectEmitter<T>) -> Self

Converts to this type from the input type.
source§

impl<T: Clone + Send + 'static> From<BehaviorSubjectEmitter<T>> for Subscriber<T>

source§

fn from(value: BehaviorSubjectEmitter<T>) -> Self

Converts to this type from the input type.
source§

impl<T: Clone + Send + 'static> From<ReplaySubjectEmitter<T>> for Subscriber<T>

source§

fn from(value: ReplaySubjectEmitter<T>) -> Self

Converts to this type from the input type.
source§

impl<T: Clone + 'static> From<SubjectEmitter<T>> for Subscriber<T>

source§

fn from(value: SubjectEmitter<T>) -> Self

Converts to this type from the input type.
source§

impl<T> Observer for Subscriber<T>

§

type NextFnType = T

The type of values emitted by the observable stream.
source§

fn next(&mut self, v: Self::NextFnType)

Handles an emitted value from the observable stream. Read more
source§

fn complete(&mut self)

Handles the successful completion of the observable stream. Read more
source§

fn error(&mut self, observable_error: Arc<dyn Error + Send + Sync>)

Handles an error occurrence during emission from the observable stream. Read more

Auto Trait Implementations§

§

impl<NextFnType> Freeze for Subscriber<NextFnType>

§

impl<NextFnType> !RefUnwindSafe for Subscriber<NextFnType>

§

impl<NextFnType> Send for Subscriber<NextFnType>

§

impl<NextFnType> !Sync for Subscriber<NextFnType>

§

impl<NextFnType> Unpin for Subscriber<NextFnType>

§

impl<NextFnType> !UnwindSafe for Subscriber<NextFnType>

Blanket Implementations§

source§

impl<T> Any for T
where T: 'static + ?Sized,

source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
source§

impl<T> Borrow<T> for T
where T: ?Sized,

source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
source§

impl<T> From<T> for T

source§

fn from(t: T) -> T

Returns the argument unchanged.

source§

impl<T, U> Into<U> for T
where U: From<T>,

source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

§

type Error = Infallible

The type returned in the event of a conversion error.
source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.