Trait rxr::observable::ObservableExt

source ·
pub trait ObservableExt<T: 'static>: Subscribeable<ObsType = T> {
Show 19 methods // Provided methods fn map<U, F>(self, f: F) -> Observable<U> where Self: Sized + Send + Sync + 'static, F: FnOnce(T) -> U + Copy + Sync + Send + 'static, U: 'static { ... } fn filter<P>(self, predicate: P) -> Observable<T> where Self: Sized + Send + Sync + 'static, P: FnOnce(&T) -> bool + Copy + Sync + Send + 'static { ... } fn skip(self, n: usize) -> Observable<T> where Self: Sized + Send + Sync + 'static { ... } fn delay(self, num_of_ms: u64) -> Observable<T> where Self: Sized + Send + Sync + 'static { ... } fn scan<U, F>(self, acc: F, seed: Option<U>) -> Observable<U> where Self: Sized + Send + Sync + 'static, F: FnOnce(U, T) -> U + Copy + Sync + Send + 'static, U: From<T> + Clone + Send + Sync + 'static { ... } fn connectable(self) -> Connectable<T> where Self: Send + Sync + Sized + 'static, T: Send + Sync + Clone { ... } fn first<F>(self, predicate: F, default_value: Option<T>) -> Observable<T> where Self: Sized + Send + Sync + 'static, F: FnOnce(T, usize) -> bool + Copy + Send + Sync + 'static, T: Clone + Send + Sync { ... } fn zip(self, observable_inputs: Vec<Observable<T>>) -> Observable<Vec<T>> where Self: Clone + Sized + Send + Sync + 'static, T: Clone + Send { ... } fn take(self, n: usize) -> Observable<T> where Self: Sized + Send + Sync + 'static { ... } fn take_until<U: 'static>( self, notifier: Observable<U>, unsubscribe_notifier: bool ) -> Observable<T> where Self: Sized + Send + Sync + 'static { ... } fn take_while<P>(self, predicate: P) -> Observable<T> where Self: Sized + Send + Sync + 'static, P: FnOnce(&T) -> bool + Copy + Sync + Send + 'static { ... } fn take_last(self, count: usize) -> Observable<T> where Self: Sized + Send + Sync + 'static, T: Send { ... } fn tap(self, observer: Subscriber<T>) -> Observable<T> where Self: Sized + Send + Sync + 'static, T: Clone { ... } fn merge(self, sources: Vec<Observable<T>>) -> Observable<T> where Self: Sized + Send + Sync + 'static { ... } fn merge_one(self, source: Observable<T>) -> Observable<T> where Self: Sized + Send + Sync + 'static { ... } fn switch_map<R: 'static, F>(self, project: F) -> Observable<R> where Self: Sized + Send + Sync + 'static, F: FnMut(T) -> Observable<R> + Sync + Send + 'static { ... } fn merge_map<R: 'static, F>(self, project: F) -> Observable<R> where Self: Sized + Send + Sync + 'static, F: FnMut(T) -> Observable<R> + Sync + Send + 'static { ... } fn concat_map<R: 'static, F>(self, project: F) -> Observable<R> where Self: Sized + Send + Sync + 'static, F: FnMut(T) -> Observable<R> + Sync + Send + 'static { ... } fn exhaust_map<R: 'static, F>(self, project: F) -> Observable<R> where Self: Sized + Send + Sync + 'static, F: FnMut(T) -> Observable<R> + Sync + Send + 'static { ... }
}
Expand description

The ObservableExt trait provides a set of extension methods that can be applied to observables to transform and manipulate their behavior.

This trait enhances the capabilities of the Observable struct by allowing users to chain operators together, creating powerful reactive pipelines.

Provided Methods§

source

fn map<U, F>(self, f: F) -> Observable<U>
where Self: Sized + Send + Sync + 'static, F: FnOnce(T) -> U + Copy + Sync + Send + 'static, U: 'static,

Transforms the items emitted by the observable using a transformation function.

The transformation function f is applied to each item emitted by the observable, and the resulting value is emitted by the resulting observable.

Examples found in repository?
examples/subject_as_observer.rs (line 44)
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
pub fn main() {
    // Make an Observable.
    let mut observable = Observable::new(move |mut o: Subscriber<_>| {
        for i in 0..10 + 1 {
            o.next(i);
            std::thread::sleep(Duration::from_millis(1));
        }
        o.complete();
        Subscription::new(UnsubscribeLogic::Nil, SubscriptionHandle::Nil)
    });

    // Initialize a `Subject` and obtain its emitter and receiver.
    let (emitter, mut receiver) = Subject::emitter_receiver();

    // Register `Subscriber` 1.
    receiver.subscribe(create_subscriber(1));

    // Register `Subscriber` 2.
    receiver
        // We're cloning the receiver so we can use it again.
        // Shallow clone: clones only the pointer to the `Subject`.
        .clone()
        .take(7) // For performance, prioritize placing `take()` as the first operator.
        .delay(1000)
        .map(|v| format!("mapped {}", v))
        .subscribe(create_subscriber(2));

    // Register `Subscriber` 3.
    receiver
        .filter(|v| v % 2 == 0)
        .map(|v| format!("filtered {}", v))
        .subscribe(create_subscriber(3));

    // Convert the emitter into an observer and subscribe it to the observable.
    observable.subscribe(emitter.into());
}
More examples
Hide additional examples
examples/subject.rs (line 35)
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
pub fn main() {
    // Initialize a `Subject` and obtain its emitter and receiver.
    let (mut emitter, mut receiver) = Subject::emitter_receiver();

    // Registers `Subscriber` 1.
    receiver.subscribe(create_subscriber(1));

    emitter.next(101); // Emits 101 to registered `Subscriber` 1.
    emitter.next(102); // Emits 102 to registered `Subscriber` 1.

    // All Observable operators can be applied to the receiver.
    // Registers mapped `Subscriber` 2.
    receiver
        .clone() // Shallow clone: clones only the pointer to the `Subject` object.
        .map(|v| format!("mapped {}", v))
        .subscribe(Subscriber::new(
            move |v| println!("Subscriber #2 emitted: {}", v),
            |_| eprintln!("Error"),
            || println!("Completed 2"),
        ));

    // Registers `Subscriber` 3.
    receiver.subscribe(create_subscriber(3));

    emitter.next(103); // Emits 103 to registered `Subscriber`'s 1, 2 and 3.

    emitter.complete(); // Calls `complete` on registered `Subscriber`'s 1, 2 and 3.

    // Subscriber 4: post-completion subscribe, completes immediately.
    receiver.subscribe(create_subscriber(4));

    emitter.next(104); // Called post-completion, does not emit.
}
examples/async_subject.rs (line 37)
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
pub fn main() {
    // Initialize a `AsyncSubject` and obtain its emitter and receiver.
    let (mut emitter, mut receiver) = AsyncSubject::emitter_receiver();

    // Registers `Subscriber` 1.
    receiver.subscribe(create_subscriber(1));

    emitter.next(101); // Stores 101 ast the latest value.
    emitter.next(102); // Latest value is now 102.

    // All Observable operators can be applied to the receiver.
    // Registers mapped `Subscriber` 2.
    receiver
        .clone() // Shallow clone: clones only the pointer to the `AsyncSubject` object.
        .map(|v| format!("mapped {}", v))
        .subscribe(Subscriber::new(
            move |v| println!("Subscriber #2 emitted: {}", v),
            |_| eprintln!("Error"),
            || println!("Completed 2"),
        ));

    // Registers `Subscriber` 3.
    receiver.subscribe(create_subscriber(3));

    emitter.next(103); // Latest value is now 103.

    // Emits latest value (103) to registered `Subscriber`'s 1, 2 and 3 and calls
    // `complete` on each of them.
    emitter.complete();

    // Subscriber 4: post-completion subscribe, emits latest value (103) and completes.
    receiver.subscribe(create_subscriber(4));

    emitter.next(104); // Called post-completion, does not emit.
}
examples/async_subject_error.rs (line 53)
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
pub fn main() {
    // Initialize a `AsyncSubject` and obtain its emitter and receiver.
    let (mut emitter, mut receiver) = AsyncSubject::emitter_receiver();

    // Registers `Subscriber` 1.
    receiver.subscribe(create_subscriber(1));

    emitter.next(101); // Stores 101 ast the latest value.
    emitter.next(102); // Latest value is now 102.

    // All Observable operators can be applied to the receiver.
    // Registers mapped `Subscriber` 2.
    receiver
        .clone() // Shallow clone: clones only the pointer to the `AsyncSubject` object.
        .map(|v| format!("mapped {}", v))
        .subscribe(Subscriber::new(
            move |v| println!("Subscriber #2 emitted: {}", v),
            |e| eprintln!("Error: {} 2", e),
            || println!("Completed"),
        ));

    // Registers `Subscriber` 3.
    receiver.subscribe(create_subscriber(3));

    emitter.next(103); // Latest value is now 103.

    // Calls `error` on registered `Subscriber`'s 1, 2 and 3.
    emitter.error(Arc::new(AsyncSubjectError(
        "AsyncSubject error".to_string(),
    )));

    // Subscriber 4: subscribed after subject's error call; emits error and
    // does not emit further.
    receiver.subscribe(create_subscriber(4));

    emitter.next(104); // Called post-completion, does not emit.
}
examples/behavior_subject.rs (line 39)
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
pub fn main() {
    // Initialize a `BehaviorSubject` with an initial value and obtain
    // its emitter and receiver.
    let (mut emitter, mut receiver) = BehaviorSubject::emitter_receiver(100);

    // Registers `Subscriber` 1 and emits the default value 100 to it.
    receiver.subscribe(create_subscriber(1));

    emitter.next(101); // Emits 101 to registered `Subscriber` 1.
    emitter.next(102); // Emits 102 to registered `Subscriber` 1.

    // All Observable operators can be applied to the receiver.
    // Registers mapped `Subscriber` 2 and emits (now the default) value 102 to it.
    receiver
        .clone() // Shallow clone: clones only the pointer to the `BehaviorSubject` object.
        .map(|v| format!("mapped {}", v))
        .subscribe(Subscriber::new(
            move |v| println!("Subscriber #2 emitted: {}", v),
            |_| eprintln!("Error"),
            || println!("Completed 2"),
        ));

    // Registers `Subscriber` 3 and emits (now the default) value 102 to it.
    receiver.subscribe(create_subscriber(3));

    emitter.next(103); // Emits 103 to registered `Subscriber`'s 1, 2 and 3.

    emitter.complete(); // Calls `complete` on registered `Subscriber`'s 1, 2 and 3.

    // Subscriber 4: post-completion subscribe, completes immediately.
    receiver.subscribe(create_subscriber(4));

    emitter.next(104); // Called post-completion, does not emit.
}
examples/replay_subject.rs (line 45)
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
pub fn main() {
    // Initialize a `ReplaySubject` with an unbounded buffer size and obtain
    // its emitter and receiver.
    let (mut emitter, mut receiver) = ReplaySubject::emitter_receiver(BufSize::Unbounded);

    // Registers `Subscriber` 1.
    receiver.subscribe(create_subscriber(1));

    emitter.next(101); // Stores 101 and emits it to registered `Subscriber` 1.
    emitter.next(102); // Stores 102 and emits it to registered `Subscriber` 1.

    // All Observable operators can be applied to the receiver.
    // Registers mapped `Subscriber` 2 and emits buffered values (101, 102) to it.
    receiver
        .clone() // Shallow clone: clones only the pointer to the `ReplaySubject` object.
        .map(|v| format!("mapped {}", v))
        .subscribe(Subscriber::new(
            move |v| println!("Subscriber #2 emitted: {}", v),
            |_| eprintln!("Error"),
            || println!("Completed 2"),
        ));

    // Registers `Subscriber` 3 and emits buffered values (101, 102) to it.
    receiver.subscribe(create_subscriber(3));

    emitter.next(103); // Stores 103 and emits it to registered `Subscriber`'s 1, 2 and 3.

    emitter.complete(); // Calls `complete` on registered `Subscriber`'s 1, 2 and 3.

    // Subscriber 4: post-completion subscribe, emits buffered values (101, 102, 103)
    // and completes.
    receiver.subscribe(create_subscriber(4));

    emitter.next(104); // Called post-completion, does not emit.
}
source

fn filter<P>(self, predicate: P) -> Observable<T>
where Self: Sized + Send + Sync + 'static, P: FnOnce(&T) -> bool + Copy + Sync + Send + 'static,

Filters the items emitted by the observable based on a predicate function.

Only items for which the predicate function returns true will be emitted by the resulting observable.

Examples found in repository?
examples/subject_as_observer.rs (line 49)
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
pub fn main() {
    // Make an Observable.
    let mut observable = Observable::new(move |mut o: Subscriber<_>| {
        for i in 0..10 + 1 {
            o.next(i);
            std::thread::sleep(Duration::from_millis(1));
        }
        o.complete();
        Subscription::new(UnsubscribeLogic::Nil, SubscriptionHandle::Nil)
    });

    // Initialize a `Subject` and obtain its emitter and receiver.
    let (emitter, mut receiver) = Subject::emitter_receiver();

    // Register `Subscriber` 1.
    receiver.subscribe(create_subscriber(1));

    // Register `Subscriber` 2.
    receiver
        // We're cloning the receiver so we can use it again.
        // Shallow clone: clones only the pointer to the `Subject`.
        .clone()
        .take(7) // For performance, prioritize placing `take()` as the first operator.
        .delay(1000)
        .map(|v| format!("mapped {}", v))
        .subscribe(create_subscriber(2));

    // Register `Subscriber` 3.
    receiver
        .filter(|v| v % 2 == 0)
        .map(|v| format!("filtered {}", v))
        .subscribe(create_subscriber(3));

    // Convert the emitter into an observer and subscribe it to the observable.
    observable.subscribe(emitter.into());
}
More examples
Hide additional examples
examples/async_observable_os_threads.rs (line 57)
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
fn main() {
    // Create a custom observable that emits values in a separate thread.
    let observable = Observable::new(|mut o| {
        // Launch a new thread for the Observable's processing and store its handle.
        let join_handle = std::thread::spawn(move || {
            for i in 0..=15 {
                // Emit the value to the subscriber.
                o.next(i);
                // Important. Put an await point after each emit or after some emits.
                // This allows the `take()` operator to function properly.
                // Not required in this example.
                std::thread::sleep(Duration::from_millis(1));
            }
            // Signal completion to the subscriber.
            o.complete();
        });

        // Return the subscription.
        Subscription::new(
            // In this example, we omit the unsubscribe functionality. Without it, we
            // can't unsubscribe, which prevents the `take()` operator, as well as
            // higher-order operators like `switch_map`, `merge_map`, `concat_map`,
            // and `exhaust_map`, from functioning as expected.
            UnsubscribeLogic::Nil,
            // Store the `JoinHandle` to enable waiting functionality using the
            // `Subscription` for this Observable thread to complete.
            SubscriptionHandle::JoinThread(join_handle),
        )
    });

    // Create the `Subscriber` with a mandatory `next` function, and optional
    // `complete` function. No need for `error` function in this simple example.
    let mut observer = Subscriber::on_next(|v| println!("Emitted {}", v));
    observer.on_complete(|| println!("Completed"));

    // This observable uses OS threads so it will not block the current thread.
    // Observables are cold so if you comment out the statement bellow nothing
    // will be emitted.
    let subscription = observable
        .filter(|&v| v <= 10)
        .map(|v| format!("Mapped {}", v))
        .subscribe(observer);

    // Do something else here.
    println!("Print something while Observable is emitting.");

    // Because the subscription creates a new thread, we can utilize the `Subscription`
    // to wait for its completion. This ensures that the main thread won't terminate
    // prematurely and stop all child threads.
    if subscription.join().is_err() {
        // Handle error
    }

    println!("Custom Observable finished emmiting")
}
source

fn skip(self, n: usize) -> Observable<T>
where Self: Sized + Send + Sync + 'static,

Skips the first n items emitted by the observable and then emits the rest.

If n is greater than or equal to the total number of items, it behaves as if the observable is complete and emits no items.

source

fn delay(self, num_of_ms: u64) -> Observable<T>
where Self: Sized + Send + Sync + 'static,

Delays the emissions from the observable by the specified number of milliseconds.

The delay operator introduces a time delay for emissions from the observable, determined by the specified duration.

Examples found in repository?
examples/subject_as_observer.rs (line 43)
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
pub fn main() {
    // Make an Observable.
    let mut observable = Observable::new(move |mut o: Subscriber<_>| {
        for i in 0..10 + 1 {
            o.next(i);
            std::thread::sleep(Duration::from_millis(1));
        }
        o.complete();
        Subscription::new(UnsubscribeLogic::Nil, SubscriptionHandle::Nil)
    });

    // Initialize a `Subject` and obtain its emitter and receiver.
    let (emitter, mut receiver) = Subject::emitter_receiver();

    // Register `Subscriber` 1.
    receiver.subscribe(create_subscriber(1));

    // Register `Subscriber` 2.
    receiver
        // We're cloning the receiver so we can use it again.
        // Shallow clone: clones only the pointer to the `Subject`.
        .clone()
        .take(7) // For performance, prioritize placing `take()` as the first operator.
        .delay(1000)
        .map(|v| format!("mapped {}", v))
        .subscribe(create_subscriber(2));

    // Register `Subscriber` 3.
    receiver
        .filter(|v| v % 2 == 0)
        .map(|v| format!("filtered {}", v))
        .subscribe(create_subscriber(3));

    // Convert the emitter into an observer and subscribe it to the observable.
    observable.subscribe(emitter.into());
}
More examples
Hide additional examples
examples/scan_operator.rs (line 77)
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
async fn main() {
    let observable = Observable::new(|mut o| {
        let done = Arc::new(Mutex::new(false));
        let done_c = Arc::clone(&done);
        let (tx, mut rx) = channel(10);

        task::spawn(async move {
            if let Some(UNSUBSCRIBE_SIGNAL) = rx.recv().await {
                *done_c.lock().unwrap() = UNSUBSCRIBE_SIGNAL;
            }
        });

        let join_handle = task::spawn(async move {
            for i in 0..100 {
                if *done.lock().unwrap() == UNSUBSCRIBE_SIGNAL {
                    break;
                }
                o.next(i);
                time::sleep(time::Duration::from_millis(1)).await;
            }
            o.complete();
        });

        Subscription::new(
            UnsubscribeLogic::Future(Box::pin(async move {
                if tx.send(UNSUBSCRIBE_SIGNAL).await.is_err() {
                    println!("Receiver dropped.");
                }
            })),
            SubscriptionHandle::JoinTask(join_handle),
        )
    });

    let mut observer = Subscriber::on_next(|v| println!("Emitted: {}", v));
    observer.on_complete(|| println!("Completed"));

    // Accumulate response strings into a single string.
    // The types of `total` and `n` may differ as long as `total` implements the `From<n>` trait.
    // In this example, `total` is of type `String`, and `n` is of type `&str`.
    let subscription = observable
        .take(6)
        .delay(100)
        .merge_map(|_| get_response_observable())
        .scan(|total, n| format!("{} {}", total, n), None)
        .subscribe(observer);

    // Do something else here.
    println!("Do something while Observable is emitting.");

    // Wait for observable to finish before exiting the program.
    if subscription.join_concurrent().await.is_err() {
        // Handle error
    }

    println!("`main` function done")
}
examples/take_until_operator_os.rs (line 64)
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
fn main() {
    let observable = Observable::new(|mut o| {
        let done = Arc::new(Mutex::new(false));
        let done_c = Arc::clone(&done);
        let (tx, rx) = std::sync::mpsc::channel();

        std::thread::spawn(move || {
            if let Ok(UNSUBSCRIBE_SIGNAL) = rx.recv() {
                *done_c.lock().unwrap() = UNSUBSCRIBE_SIGNAL;
            }
        });

        let join_handle = std::thread::spawn(move || {
            for i in 0..100 {
                if *done.lock().unwrap() == UNSUBSCRIBE_SIGNAL {
                    break;
                }
                o.next(i);
                std::thread::sleep(Duration::from_millis(1));
            }
            o.complete();
        });

        Subscription::new(
            UnsubscribeLogic::Logic(Box::new(move || {
                if tx.send(UNSUBSCRIBE_SIGNAL).is_err() {
                    println!("Receiver dropped.");
                }
            })),
            SubscriptionHandle::JoinThread(join_handle),
        )
    });

    let mut observer = Subscriber::on_next(|v| println!("Emitted {}", v));
    observer.on_complete(|| println!("Completed"));

    // Create notifier, it can be observable or one of the Subject variants.
    let (mut emitter, receiver) = Subject::emitter_receiver();

    // We can chain the Subject, here we use `delay()` to slow down the notifier so
    // source observable has time to emmit some values. Note when we chain the
    // notifier with operators we don't have to use `into()`. To continue using the
    // receiver later, utilize `.clone()`, e.g. `receiver.clone().delay(20)`.
    let subscription = observable
        .take_until(receiver.delay(20), false)
        .subscribe(observer);

    // Allowing some time for the `take_until` function to register the notifier
    // before emitting a signal. This step is unnecessary if you're not immediately
    // sending a signal.
    std::thread::sleep(Duration::from_millis(1));

    // Send signal to stop source observable emitting values.
    emitter.next(());

    // Do something else here.
    println!("Do something while Observable is emitting.");

    // Wait for observable to finish before exiting the program.
    if subscription.join().is_err() {
        // Handle error
    }

    println!("`main` function done")
}
examples/connectable_chained_operator.rs (line 70)
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
async fn main() {
    // Make a source observable.
    let observable = Observable::new(|mut o| {
        let done = Arc::new(Mutex::new(false));
        let done_c = Arc::clone(&done);
        let (tx, mut rx) = channel(10);

        task::spawn(async move {
            if let Some(UNSUBSCRIBE_SIGNAL) = rx.recv().await {
                *done_c.lock().unwrap() = UNSUBSCRIBE_SIGNAL;
            }
        });

        let join_handle = task::spawn(async move {
            for i in 0..10 + 1 {
                if *done.lock().unwrap() == UNSUBSCRIBE_SIGNAL {
                    break;
                }
                o.next(i);
                time::sleep(time::Duration::from_millis(1)).await;
            }
            o.complete();
        });

        Subscription::new(
            UnsubscribeLogic::Future(Box::pin(async move {
                if tx.send(UNSUBSCRIBE_SIGNAL).await.is_err() {
                    println!("Receiver dropped.");
                }
            })),
            SubscriptionHandle::JoinTask(join_handle),
        )
    });

    let mut observer1 = Subscriber::on_next(|v| println!("Observer 1 emitted {}", v));
    observer1.on_complete(|| println!("Observer 1 completed"));

    let mut observer2 = Subscriber::on_next(|v| println!("Observer 2 emitted {}", v));
    observer2.on_complete(|| println!("Observer 2 completed"));

    // You can use other operators before calling `connectable()` operator.
    let observable = observable.tap(Subscriber::on_next(|v| println!("... emitting {v}")));

    // Make a `Connectable` observable from the source observable.
    let connectable = observable.connectable();

    // If you want to use other operators after calling `connectable()` operator you
    // can do it by cloning first.
    let mut connectable_chained = connectable.clone().map(|v| v + 10).delay(1000);

    // Subscribe observers to chained `Connectable`.
    connectable_chained.subscribe(observer1);
    connectable_chained.subscribe(observer2);

    // Connect `Connectable` to start emitting to all `Subscriber`'s.
    // No emissions happen if `connect()` is not called.
    let connected = connectable.connect();

    // Do something else here.
    println!("Do something while Observable is emitting.");

    // Wait for `Connectable` observable to finish before exiting the program.
    // You can also use `connected.unsubscribe();` to stop all emissions.
    if connected.join_concurrent().await.is_err() {
        // Handle error
    }

    println!("`main` function done")
}
examples/async_observable_tokio.rs (line 82)
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
async fn main() {
    // Create a custom observable that emits values in a separate task.
    let observable = Observable::new(|mut o| {
        let done = Arc::new(Mutex::new(false));
        let done_c = Arc::clone(&done);
        let (tx, mut rx) = channel(10);

        // Spawn a new Tokio task to await a signal sent from the unsubscribe logic.
        task::spawn(async move {
            // Attempt to receive a signal sent from the unsubscribe logic.
            if let Some(UNSUBSCRIBE_SIGNAL) = rx.recv().await {
                // Update the `done_c` mutex with the received signal.
                *done_c.lock().unwrap() = UNSUBSCRIBE_SIGNAL;
            }
        });

        // Launch a new Tokio task for the Observable's processing and store its handle.
        let join_handle = task::spawn(async move {
            for i in 0..=10000 {
                // If an unsubscribe signal is received, exit the loop and stop emissions.
                if *done.lock().unwrap() == UNSUBSCRIBE_SIGNAL {
                    break;
                }
                // Emit the value to the subscriber.
                o.next(i);
                // Important. Put an await point after each emit or after some emits.
                // This allows the `take()` operator to function properly.
                time::sleep(time::Duration::from_millis(1)).await;
            }
            // Signal completion to the subscriber.
            o.complete();
        });

        // Return a new `Subscription` with custom unsubscribe logic.
        Subscription::new(
            // The provided closure defines the behavior of the subscription when it
            // is unsubscribed. In this case, it sends a signal to an asynchronous
            // observable to stop emitting values. If your closure requires Tokio
            // tasks or channels to send unsubscribe signals, consider using
            // `UnsubscribeLogic::Future`.
            UnsubscribeLogic::Future(Box::pin(async move {
                if tx.send(UNSUBSCRIBE_SIGNAL).await.is_err() {
                    println!("Receiver dropped.");
                }
            })),
            // Store the `JoinHandle` for awaiting completion using the `Subscription`.
            SubscriptionHandle::JoinTask(join_handle),
        )
    });

    // Create the `Subscriber` with a mandatory `next` function, and optional
    // `complete` function. No need for `error` function in this simple example.
    let mut observer = Subscriber::on_next(|v| println!("Emitted {}", v));
    observer.on_complete(|| println!("Completed"));

    // This observable uses OS threads so it will not block the current thread.
    // Observables are cold so if you comment out the statement bellow nothing
    // will be emitted.
    let subscription = observable
        // take utilizes our unsubscribe function to stop background emissions after
        // a specified item count.
        .take(15)
        .map(|v| format!("Mapped {}", v))
        .delay(1000)
        .subscribe(observer);

    // Do something else here.
    println!("Do something while Observable is emitting.");

    // Wait for the subscription to either complete as a Tokio task or join an OS thread.
    if subscription.join_concurrent().await.is_err() {
        // Handle error
    }

    println!("`main` function done")
}
source

fn scan<U, F>(self, acc: F, seed: Option<U>) -> Observable<U>
where Self: Sized + Send + Sync + 'static, F: FnOnce(U, T) -> U + Copy + Sync + Send + 'static, U: From<T> + Clone + Send + Sync + 'static,

Accumulates values emitted by an observable over time, producing an accumulated result based on an accumulator function applied to each emitted value.

The scan operator applies an accumulator function over the values emitted by the source observable. It accumulates values into a single accumulated result, and each new value emitted by the source observable contributes to this accumulation. The accumulated result is emitted by the resulting observable. seed is optional. If omitted, the first emitted value is used as the seed.

Examples found in repository?
examples/scan_operator.rs (line 79)
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
async fn main() {
    let observable = Observable::new(|mut o| {
        let done = Arc::new(Mutex::new(false));
        let done_c = Arc::clone(&done);
        let (tx, mut rx) = channel(10);

        task::spawn(async move {
            if let Some(UNSUBSCRIBE_SIGNAL) = rx.recv().await {
                *done_c.lock().unwrap() = UNSUBSCRIBE_SIGNAL;
            }
        });

        let join_handle = task::spawn(async move {
            for i in 0..100 {
                if *done.lock().unwrap() == UNSUBSCRIBE_SIGNAL {
                    break;
                }
                o.next(i);
                time::sleep(time::Duration::from_millis(1)).await;
            }
            o.complete();
        });

        Subscription::new(
            UnsubscribeLogic::Future(Box::pin(async move {
                if tx.send(UNSUBSCRIBE_SIGNAL).await.is_err() {
                    println!("Receiver dropped.");
                }
            })),
            SubscriptionHandle::JoinTask(join_handle),
        )
    });

    let mut observer = Subscriber::on_next(|v| println!("Emitted: {}", v));
    observer.on_complete(|| println!("Completed"));

    // Accumulate response strings into a single string.
    // The types of `total` and `n` may differ as long as `total` implements the `From<n>` trait.
    // In this example, `total` is of type `String`, and `n` is of type `&str`.
    let subscription = observable
        .take(6)
        .delay(100)
        .merge_map(|_| get_response_observable())
        .scan(|total, n| format!("{} {}", total, n), None)
        .subscribe(observer);

    // Do something else here.
    println!("Do something while Observable is emitting.");

    // Wait for observable to finish before exiting the program.
    if subscription.join_concurrent().await.is_err() {
        // Handle error
    }

    println!("`main` function done")
}
source

fn connectable(self) -> Connectable<T>
where Self: Send + Sync + Sized + 'static, T: Send + Sync + Clone,

Creates a connectable observable from the source observable.

This operator converts the source observable into a connectable observable, allowing multiple subscribers to connect to the same source without causing multiple subscriptions to the underlying source.

The actual emission of values from the source observable will occur only after calling the connect() method on the resulting Connectable instance.

Examples found in repository?
examples/connectable_chained_operator.rs (line 66)
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
async fn main() {
    // Make a source observable.
    let observable = Observable::new(|mut o| {
        let done = Arc::new(Mutex::new(false));
        let done_c = Arc::clone(&done);
        let (tx, mut rx) = channel(10);

        task::spawn(async move {
            if let Some(UNSUBSCRIBE_SIGNAL) = rx.recv().await {
                *done_c.lock().unwrap() = UNSUBSCRIBE_SIGNAL;
            }
        });

        let join_handle = task::spawn(async move {
            for i in 0..10 + 1 {
                if *done.lock().unwrap() == UNSUBSCRIBE_SIGNAL {
                    break;
                }
                o.next(i);
                time::sleep(time::Duration::from_millis(1)).await;
            }
            o.complete();
        });

        Subscription::new(
            UnsubscribeLogic::Future(Box::pin(async move {
                if tx.send(UNSUBSCRIBE_SIGNAL).await.is_err() {
                    println!("Receiver dropped.");
                }
            })),
            SubscriptionHandle::JoinTask(join_handle),
        )
    });

    let mut observer1 = Subscriber::on_next(|v| println!("Observer 1 emitted {}", v));
    observer1.on_complete(|| println!("Observer 1 completed"));

    let mut observer2 = Subscriber::on_next(|v| println!("Observer 2 emitted {}", v));
    observer2.on_complete(|| println!("Observer 2 completed"));

    // You can use other operators before calling `connectable()` operator.
    let observable = observable.tap(Subscriber::on_next(|v| println!("... emitting {v}")));

    // Make a `Connectable` observable from the source observable.
    let connectable = observable.connectable();

    // If you want to use other operators after calling `connectable()` operator you
    // can do it by cloning first.
    let mut connectable_chained = connectable.clone().map(|v| v + 10).delay(1000);

    // Subscribe observers to chained `Connectable`.
    connectable_chained.subscribe(observer1);
    connectable_chained.subscribe(observer2);

    // Connect `Connectable` to start emitting to all `Subscriber`'s.
    // No emissions happen if `connect()` is not called.
    let connected = connectable.connect();

    // Do something else here.
    println!("Do something while Observable is emitting.");

    // Wait for `Connectable` observable to finish before exiting the program.
    // You can also use `connected.unsubscribe();` to stop all emissions.
    if connected.join_concurrent().await.is_err() {
        // Handle error
    }

    println!("`main` function done")
}
More examples
Hide additional examples
examples/connectable_operator.rs (line 65)
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
async fn main() {
    // Make a source observable.
    let observable = Observable::new(|mut o| {
        let done = Arc::new(Mutex::new(false));
        let done_c = Arc::clone(&done);
        let (tx, mut rx) = channel(10);

        task::spawn(async move {
            if let Some(UNSUBSCRIBE_SIGNAL) = rx.recv().await {
                *done_c.lock().unwrap() = UNSUBSCRIBE_SIGNAL;
            }
        });

        let join_handle = task::spawn(async move {
            for i in 0..10 + 1 {
                if *done.lock().unwrap() == UNSUBSCRIBE_SIGNAL {
                    break;
                }
                o.next(i);
                time::sleep(time::Duration::from_millis(1)).await;
            }
            o.complete();
        });

        Subscription::new(
            UnsubscribeLogic::Future(Box::pin(async move {
                if tx.send(UNSUBSCRIBE_SIGNAL).await.is_err() {
                    println!("Receiver dropped.");
                }
            })),
            SubscriptionHandle::JoinTask(join_handle),
        )
    });

    let mut observer1 = Subscriber::on_next(|v| println!("Observer 1 emitted {}", v));
    observer1.on_complete(|| println!("Observer 1 completed"));

    let mut observer2 = Subscriber::on_next(|v| println!("Observer 2 emitted {}", v));
    observer2.on_complete(|| println!("Observer 2 completed"));

    let mut observer3 = Subscriber::on_next(|v| println!("Observer 3 emitted {}", v));
    observer3.on_complete(|| println!("Observer 3 completed"));

    // Make a `Connectable` observable from the source observable.
    let mut connectable = observable.connectable();

    // Subscribe observers to `Connectable`.
    connectable.subscribe(observer1);
    connectable.subscribe(observer2);

    // You can also obtain a `Subscription` to unsubscribe individual observers
    // before calling `connect()`. In this case, `observer3` will be removed and will
    // not receive multicast emissions.
    let subscription3 = connectable.subscribe(observer3);
    subscription3.unsubscribe();

    // Connect `Connectable` to start emitting to all `Subscriber`'s.
    // No emissions happen if `connect()` is not called.
    let connected = connectable.connect();

    // Do something else here.
    println!("Do something while Observable is emitting.");

    // Wait for `Connectable` observable to finish before exiting the program.
    // You can also use `connected.unsubscribe();` to stop all emissions.
    if connected.join_concurrent().await.is_err() {
        // Handle error
    }

    println!("`main` function done")
}
source

fn first<F>(self, predicate: F, default_value: Option<T>) -> Observable<T>
where Self: Sized + Send + Sync + 'static, F: FnOnce(T, usize) -> bool + Copy + Send + Sync + 'static, T: Clone + Send + Sync,

Emits only the first item emitted by the source observable that satisfies the provided predicate, optionally applying a default value if no items match the predicate.

The predicate function takes two arguments: the emitted item T and the index usize of the emission. It should return true if the item meets the criteria.

If a default_value is provided and no item satisfies the predicate, the observable emits the default_value instead. If no default value is provided and no item satisfies the predicate, the observable emits an EmptyError.

The first operator unsubscribes from the background emissions as soon as it takes the first item that satisfies the predicate.

source

fn zip(self, observable_inputs: Vec<Observable<T>>) -> Observable<Vec<T>>
where Self: Clone + Sized + Send + Sync + 'static, T: Clone + Send,

Zips the values emitted by multiple observables into a single observable.

This method combines the values emitted by multiple observables into a single observable, emitting a vector containing the latest value from each observable in order when all observables have emitted a new value. This method is non-blocking and combines the latest values emitted by observables without waiting for completion. It completes as soon as the first observable in the sequence completes and attempts to unsubscribe all zipped observables. If any observable within the sequence encounters an error, it stops emissions, emits that error, and tries to unsubscribe all observables in the sequence.

source

fn take(self, n: usize) -> Observable<T>
where Self: Sized + Send + Sync + 'static,

Emits at most the first n items emitted by the observable, then unsubscribes.

If the observable emits more than n items, this operator will only allow the first n items to be emitted. After emitting n items, it will unsubscribe from the observable.

§Notes

For Subject variants, using take(n) as the initial operator (e.g., subject.take(n).delay(n)) will not call unsubscribe and remove registered subscribers for performance reasons.

However, when used as a non-initial operator (e.g., subject.delay(n).take(n)), it will call unsubscribe and remove registered subscribers.

Examples found in repository?
examples/subject_as_observer.rs (line 42)
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
pub fn main() {
    // Make an Observable.
    let mut observable = Observable::new(move |mut o: Subscriber<_>| {
        for i in 0..10 + 1 {
            o.next(i);
            std::thread::sleep(Duration::from_millis(1));
        }
        o.complete();
        Subscription::new(UnsubscribeLogic::Nil, SubscriptionHandle::Nil)
    });

    // Initialize a `Subject` and obtain its emitter and receiver.
    let (emitter, mut receiver) = Subject::emitter_receiver();

    // Register `Subscriber` 1.
    receiver.subscribe(create_subscriber(1));

    // Register `Subscriber` 2.
    receiver
        // We're cloning the receiver so we can use it again.
        // Shallow clone: clones only the pointer to the `Subject`.
        .clone()
        .take(7) // For performance, prioritize placing `take()` as the first operator.
        .delay(1000)
        .map(|v| format!("mapped {}", v))
        .subscribe(create_subscriber(2));

    // Register `Subscriber` 3.
    receiver
        .filter(|v| v % 2 == 0)
        .map(|v| format!("filtered {}", v))
        .subscribe(create_subscriber(3));

    // Convert the emitter into an observer and subscribe it to the observable.
    observable.subscribe(emitter.into());
}
More examples
Hide additional examples
examples/scan_operator.rs (line 76)
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
async fn main() {
    let observable = Observable::new(|mut o| {
        let done = Arc::new(Mutex::new(false));
        let done_c = Arc::clone(&done);
        let (tx, mut rx) = channel(10);

        task::spawn(async move {
            if let Some(UNSUBSCRIBE_SIGNAL) = rx.recv().await {
                *done_c.lock().unwrap() = UNSUBSCRIBE_SIGNAL;
            }
        });

        let join_handle = task::spawn(async move {
            for i in 0..100 {
                if *done.lock().unwrap() == UNSUBSCRIBE_SIGNAL {
                    break;
                }
                o.next(i);
                time::sleep(time::Duration::from_millis(1)).await;
            }
            o.complete();
        });

        Subscription::new(
            UnsubscribeLogic::Future(Box::pin(async move {
                if tx.send(UNSUBSCRIBE_SIGNAL).await.is_err() {
                    println!("Receiver dropped.");
                }
            })),
            SubscriptionHandle::JoinTask(join_handle),
        )
    });

    let mut observer = Subscriber::on_next(|v| println!("Emitted: {}", v));
    observer.on_complete(|| println!("Completed"));

    // Accumulate response strings into a single string.
    // The types of `total` and `n` may differ as long as `total` implements the `From<n>` trait.
    // In this example, `total` is of type `String`, and `n` is of type `&str`.
    let subscription = observable
        .take(6)
        .delay(100)
        .merge_map(|_| get_response_observable())
        .scan(|total, n| format!("{} {}", total, n), None)
        .subscribe(observer);

    // Do something else here.
    println!("Do something while Observable is emitting.");

    // Wait for observable to finish before exiting the program.
    if subscription.join_concurrent().await.is_err() {
        // Handle error
    }

    println!("`main` function done")
}
examples/unsubscribe_observable.rs (line 78)
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
fn main() {
    // Create a custom observable that emits values in a separate thread.
    let observable = Observable::new(|mut o| {
        let done = Arc::new(Mutex::new(false));
        let done_c = Arc::clone(&done);
        let (tx, rx) = std::sync::mpsc::channel();

        // Spawn a new thread to await a signal sent from the unsubscribe logic.
        std::thread::spawn(move || {
            // Attempt to receive a signal sent from the unsubscribe logic.
            if let Ok(UNSUBSCRIBE_SIGNAL) = rx.recv() {
                // Update the `done_c` mutex with the received signal.
                *done_c.lock().unwrap() = UNSUBSCRIBE_SIGNAL;
            }
        });

        // Launch a new thread for the Observable's processing and store its handle.
        let join_handle = std::thread::spawn(move || {
            for i in 0..=10000 {
                // If an unsubscribe signal is received, exit the loop and stop emissions.
                if *done.lock().unwrap() == UNSUBSCRIBE_SIGNAL {
                    break;
                }
                // Emit the value to the subscriber.
                o.next(i);
                // Important. Put an await point after each emit or after some emits.
                // This allows the `take()` operator to function properly.
                std::thread::sleep(Duration::from_millis(1));
            }
            // Signal completion to the subscriber.
            o.complete();
        });

        // Return a new `Subscription` with custom unsubscribe logic.
        Subscription::new(
            // The provided closure defines the behavior of the subscription when it
            // is unsubscribed. In this case, it sends a signal to an asynchronous
            // observable to stop emitting values.
            UnsubscribeLogic::Logic(Box::new(move || {
                if tx.send(UNSUBSCRIBE_SIGNAL).is_err() {
                    println!("Receiver dropped.");
                }
            })),
            // Store the `JoinHandle` for awaiting completion using the `Subscription`.
            SubscriptionHandle::JoinThread(join_handle),
        )
    });

    // Create the `Subscriber` with a mandatory `next` function, and optional
    // `complete` function. No need for `error` function in this simple example.
    let mut observer = Subscriber::on_next(|v| println!("Emitted {}", v));
    observer.on_complete(|| println!("Completed"));

    // This observable uses OS threads so it will not block the current thread.
    // Observables are cold so if you comment out the statement bellow nothing
    // will be emitted.
    let subscription = observable
        // take utilizes our unsubscribe function to stop background emissions after
        // a specified item count.
        .take(500)
        .map(|v| format!("Mapped {}", v))
        .subscribe(observer);

    // Do something else here.
    println!("Do something while Observable is emitting.");

    // Unsubscribe from the observable to stop emissions.
    subscription.unsubscribe();

    // Allow some time for the main thread to confirm that the observable indeed
    // isn't emitting.
    std::thread::sleep(Duration::from_millis(2000));
    println!("`main` function done")
}
examples/async_observable_tokio.rs (line 80)
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
async fn main() {
    // Create a custom observable that emits values in a separate task.
    let observable = Observable::new(|mut o| {
        let done = Arc::new(Mutex::new(false));
        let done_c = Arc::clone(&done);
        let (tx, mut rx) = channel(10);

        // Spawn a new Tokio task to await a signal sent from the unsubscribe logic.
        task::spawn(async move {
            // Attempt to receive a signal sent from the unsubscribe logic.
            if let Some(UNSUBSCRIBE_SIGNAL) = rx.recv().await {
                // Update the `done_c` mutex with the received signal.
                *done_c.lock().unwrap() = UNSUBSCRIBE_SIGNAL;
            }
        });

        // Launch a new Tokio task for the Observable's processing and store its handle.
        let join_handle = task::spawn(async move {
            for i in 0..=10000 {
                // If an unsubscribe signal is received, exit the loop and stop emissions.
                if *done.lock().unwrap() == UNSUBSCRIBE_SIGNAL {
                    break;
                }
                // Emit the value to the subscriber.
                o.next(i);
                // Important. Put an await point after each emit or after some emits.
                // This allows the `take()` operator to function properly.
                time::sleep(time::Duration::from_millis(1)).await;
            }
            // Signal completion to the subscriber.
            o.complete();
        });

        // Return a new `Subscription` with custom unsubscribe logic.
        Subscription::new(
            // The provided closure defines the behavior of the subscription when it
            // is unsubscribed. In this case, it sends a signal to an asynchronous
            // observable to stop emitting values. If your closure requires Tokio
            // tasks or channels to send unsubscribe signals, consider using
            // `UnsubscribeLogic::Future`.
            UnsubscribeLogic::Future(Box::pin(async move {
                if tx.send(UNSUBSCRIBE_SIGNAL).await.is_err() {
                    println!("Receiver dropped.");
                }
            })),
            // Store the `JoinHandle` for awaiting completion using the `Subscription`.
            SubscriptionHandle::JoinTask(join_handle),
        )
    });

    // Create the `Subscriber` with a mandatory `next` function, and optional
    // `complete` function. No need for `error` function in this simple example.
    let mut observer = Subscriber::on_next(|v| println!("Emitted {}", v));
    observer.on_complete(|| println!("Completed"));

    // This observable uses OS threads so it will not block the current thread.
    // Observables are cold so if you comment out the statement bellow nothing
    // will be emitted.
    let subscription = observable
        // take utilizes our unsubscribe function to stop background emissions after
        // a specified item count.
        .take(15)
        .map(|v| format!("Mapped {}", v))
        .delay(1000)
        .subscribe(observer);

    // Do something else here.
    println!("Do something while Observable is emitting.");

    // Wait for the subscription to either complete as a Tokio task or join an OS thread.
    if subscription.join_concurrent().await.is_err() {
        // Handle error
    }

    println!("`main` function done")
}
source

fn take_until<U: 'static>( self, notifier: Observable<U>, unsubscribe_notifier: bool ) -> Observable<T>
where Self: Sized + Send + Sync + 'static,

Continuously emits the values from the source observable until an event occurs, triggered by an emitted value from a separate notifier observable.

The takeUntil operator subscribes to and starts replicating the behavior of the source observable. Simultaneously, it observes a second observable, referred to as the notifier, provided by the user. When the notifier emits a value, the resulting observable stops replicating the source observable and completes. If the notifier completes without emitting any value, takeUntil will pass all values from the source observable. When the notifier triggers its first emission take_until unsubscribes from ongoing emissions of the source observable.

The take_until operator accepts a second parameter, unsubscribe_notifier, allowing control over whether takeUntil will attempt to unsubscribe from emissions of the notifier observable. When set to true, takeUntil actively attempts to unsubscribe from the notifier’s emissions. When set to false, takeUntil refrains from attempting to unsubscribe from the notifier, allowing the emissions to continue unaffected.

Examples found in repository?
examples/take_until_operator.rs (line 61)
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
async fn main() {
    let observable = Observable::new(|mut o| {
        let done = Arc::new(Mutex::new(false));
        let done_c = Arc::clone(&done);
        let (tx, mut rx) = channel(10);

        task::spawn(async move {
            if let Some(UNSUBSCRIBE_SIGNAL) = rx.recv().await {
                *done_c.lock().unwrap() = UNSUBSCRIBE_SIGNAL;
            }
        });

        let join_handle = task::spawn(async move {
            for i in 0..100 {
                if *done.lock().unwrap() == UNSUBSCRIBE_SIGNAL {
                    break;
                }
                o.next(i);
                time::sleep(time::Duration::from_millis(1)).await;
            }
            o.complete();
        });

        Subscription::new(
            UnsubscribeLogic::Future(Box::pin(async move {
                if tx.send(UNSUBSCRIBE_SIGNAL).await.is_err() {
                    println!("Receiver dropped.");
                }
            })),
            SubscriptionHandle::JoinTask(join_handle),
        )
    });

    let mut observer = Subscriber::on_next(|v| println!("Emitted {}", v));
    observer.on_complete(|| println!("Completed"));

    // Create notifier, it can be observable or one of the Subject variants.
    let (mut emitter, receiver) = Subject::<()>::emitter_receiver();

    // Turning Subject into an observable. To continue using the receiver later,
    // utilize `.clone()`, e.g. `receiver.clone().into()`.
    let subscription = observable
        .take_until(receiver.into(), false)
        .subscribe(observer);

    // Allowing some time for the `take_until` function to register the notifier
    // before emitting a signal. This step is unnecessary if you're not immediately
    // sending a signal.
    time::sleep(time::Duration::from_millis(1)).await;

    // Send signal to stop source observable emitting values.
    emitter.next(());

    // Do something else here.
    println!("Do something while Observable is emitting.");

    // Wait for observable to finish before exiting the program.
    if subscription.join_concurrent().await.is_err() {
        // Handle error
    }

    println!("`main` function done")
}
More examples
Hide additional examples
examples/take_until_operator_os.rs (line 64)
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
fn main() {
    let observable = Observable::new(|mut o| {
        let done = Arc::new(Mutex::new(false));
        let done_c = Arc::clone(&done);
        let (tx, rx) = std::sync::mpsc::channel();

        std::thread::spawn(move || {
            if let Ok(UNSUBSCRIBE_SIGNAL) = rx.recv() {
                *done_c.lock().unwrap() = UNSUBSCRIBE_SIGNAL;
            }
        });

        let join_handle = std::thread::spawn(move || {
            for i in 0..100 {
                if *done.lock().unwrap() == UNSUBSCRIBE_SIGNAL {
                    break;
                }
                o.next(i);
                std::thread::sleep(Duration::from_millis(1));
            }
            o.complete();
        });

        Subscription::new(
            UnsubscribeLogic::Logic(Box::new(move || {
                if tx.send(UNSUBSCRIBE_SIGNAL).is_err() {
                    println!("Receiver dropped.");
                }
            })),
            SubscriptionHandle::JoinThread(join_handle),
        )
    });

    let mut observer = Subscriber::on_next(|v| println!("Emitted {}", v));
    observer.on_complete(|| println!("Completed"));

    // Create notifier, it can be observable or one of the Subject variants.
    let (mut emitter, receiver) = Subject::emitter_receiver();

    // We can chain the Subject, here we use `delay()` to slow down the notifier so
    // source observable has time to emmit some values. Note when we chain the
    // notifier with operators we don't have to use `into()`. To continue using the
    // receiver later, utilize `.clone()`, e.g. `receiver.clone().delay(20)`.
    let subscription = observable
        .take_until(receiver.delay(20), false)
        .subscribe(observer);

    // Allowing some time for the `take_until` function to register the notifier
    // before emitting a signal. This step is unnecessary if you're not immediately
    // sending a signal.
    std::thread::sleep(Duration::from_millis(1));

    // Send signal to stop source observable emitting values.
    emitter.next(());

    // Do something else here.
    println!("Do something while Observable is emitting.");

    // Wait for observable to finish before exiting the program.
    if subscription.join().is_err() {
        // Handle error
    }

    println!("`main` function done")
}
source

fn take_while<P>(self, predicate: P) -> Observable<T>
where Self: Sized + Send + Sync + 'static, P: FnOnce(&T) -> bool + Copy + Sync + Send + 'static,

Continues emitting values from the source observable as long as each value meets the specified predicate criteria. The operation concludes immediately upon encountering the first value that doesn’t satisfy the predicate.

Upon subscription, takeWhile starts replicating the source observable. Every emitted value from the source is evaluated by the predicate function, returning a boolean that represents a condition for the source values. The resulting observable continues emitting source values until the predicate yields false. When the specified condition is no longer met, takeWhile ceases mirroring the source, subsequently unsubscribing from the source to stop background emissions.

Examples found in repository?
examples/take_while_operator.rs (line 59)
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
async fn main() {
    let observable = Observable::new(|mut o| {
        let done = Arc::new(Mutex::new(false));
        let done_c = Arc::clone(&done);
        let (tx, mut rx) = channel(10);

        task::spawn(async move {
            if let Some(UNSUBSCRIBE_SIGNAL) = rx.recv().await {
                *done_c.lock().unwrap() = UNSUBSCRIBE_SIGNAL;
            }
        });

        let join_handle = task::spawn(async move {
            for i in 0..100 {
                if *done.lock().unwrap() == UNSUBSCRIBE_SIGNAL {
                    break;
                }
                o.next(i);
                time::sleep(time::Duration::from_millis(1)).await;
            }
            o.complete();
        });

        Subscription::new(
            UnsubscribeLogic::Future(Box::pin(async move {
                if tx.send(UNSUBSCRIBE_SIGNAL).await.is_err() {
                    println!("Receiver dropped.");
                }
            })),
            SubscriptionHandle::JoinTask(join_handle),
        )
    });

    let mut observer = Subscriber::on_next(|v| println!("Emitted {}", v));
    observer.on_complete(|| println!("Completed"));

    // Emit values only when they are less than or equal to 40.
    let subscription = observable.take_while(|v| v <= &40).subscribe(observer);

    // Do something else here.
    println!("Do something while Observable is emitting.");

    // Wait for observable to finish before exiting the program.
    if subscription.join_concurrent().await.is_err() {
        // Handle error
    }

    println!("`main` function done")
}
source

fn take_last(self, count: usize) -> Observable<T>
where Self: Sized + Send + Sync + 'static, T: Send,

Produces an observable that emits, at maximum, the final count values emitted by the source observable.

Utilizing takeLast creates an observable that retains up to ‘count’ values in memory until the source observable completes. Upon completion, it delivers all stored values in their original order to the consumer and signals completion.

In scenarios where the source completes before reaching the specified count in takeLast, it emits all received values up to that point and then signals completion.

§Notes

When applied to an observable that never completes, takeLast yields an observable that doesn’t emit any value.

Examples found in repository?
examples/take_last_operator.rs (line 56)
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
async fn main() {
    let observable = Observable::new(|mut o| {
        let done = Arc::new(Mutex::new(false));
        let done_c = Arc::clone(&done);
        let (tx, mut rx) = channel(10);

        task::spawn(async move {
            if let Some(UNSUBSCRIBE_SIGNAL) = rx.recv().await {
                *done_c.lock().unwrap() = UNSUBSCRIBE_SIGNAL;
            }
        });

        let join_handle = task::spawn(async move {
            for i in 0..100 {
                if *done.lock().unwrap() == UNSUBSCRIBE_SIGNAL {
                    break;
                }
                o.next(i);
                time::sleep(time::Duration::from_millis(1)).await;
            }
            o.complete();
        });

        Subscription::new(
            UnsubscribeLogic::Future(Box::pin(async move {
                if tx.send(UNSUBSCRIBE_SIGNAL).await.is_err() {
                    println!("Receiver dropped.");
                }
            })),
            SubscriptionHandle::JoinTask(join_handle),
        )
    });

    let mut observer = Subscriber::on_next(|v| println!("Emitted {}", v));
    observer.on_complete(|| println!("Completed"));

    // Capture and process only the last 8 values.
    let subscription = observable.take_last(8).subscribe(observer);

    // Do something else here.
    println!("Do something while Observable is emitting.");

    // Wait for observable to finish before exiting the program.
    if subscription.join_concurrent().await.is_err() {
        // Handle error
    }

    println!("`main` function done")
}
source

fn tap(self, observer: Subscriber<T>) -> Observable<T>
where Self: Sized + Send + Sync + 'static, T: Clone,

The tap operator allows you to intercept the items emitted by an observable and perform side effects on those items without modifying the emitted data or the stream itself.

This operator is used primarily for side effects. It allows you to perform actions or operations on the items emitted by an observable without affecting the stream itself. The tap operator is best used for debugging, logging, or performing actions that don’t change the emitted values but are necessary for monitoring or debugging purposes such as console logging, data inspection, or triggering some external action based on the emitted values.

let log_observer = Subscriber::new(
    |v| println!("Filtered {}", v),
    |e| println!("Filtered error {}", e),
    || println!("Filtered complete")
);

observable
    .tap(Subscriber::on_next(|v| println!("Before filtering: {}", v)))
    .filter(|v| v % 2 == 0)
    .tap(log_observer)
    .subscribe(observer);
Examples found in repository?
examples/connectable_chained_operator.rs (line 63)
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
async fn main() {
    // Make a source observable.
    let observable = Observable::new(|mut o| {
        let done = Arc::new(Mutex::new(false));
        let done_c = Arc::clone(&done);
        let (tx, mut rx) = channel(10);

        task::spawn(async move {
            if let Some(UNSUBSCRIBE_SIGNAL) = rx.recv().await {
                *done_c.lock().unwrap() = UNSUBSCRIBE_SIGNAL;
            }
        });

        let join_handle = task::spawn(async move {
            for i in 0..10 + 1 {
                if *done.lock().unwrap() == UNSUBSCRIBE_SIGNAL {
                    break;
                }
                o.next(i);
                time::sleep(time::Duration::from_millis(1)).await;
            }
            o.complete();
        });

        Subscription::new(
            UnsubscribeLogic::Future(Box::pin(async move {
                if tx.send(UNSUBSCRIBE_SIGNAL).await.is_err() {
                    println!("Receiver dropped.");
                }
            })),
            SubscriptionHandle::JoinTask(join_handle),
        )
    });

    let mut observer1 = Subscriber::on_next(|v| println!("Observer 1 emitted {}", v));
    observer1.on_complete(|| println!("Observer 1 completed"));

    let mut observer2 = Subscriber::on_next(|v| println!("Observer 2 emitted {}", v));
    observer2.on_complete(|| println!("Observer 2 completed"));

    // You can use other operators before calling `connectable()` operator.
    let observable = observable.tap(Subscriber::on_next(|v| println!("... emitting {v}")));

    // Make a `Connectable` observable from the source observable.
    let connectable = observable.connectable();

    // If you want to use other operators after calling `connectable()` operator you
    // can do it by cloning first.
    let mut connectable_chained = connectable.clone().map(|v| v + 10).delay(1000);

    // Subscribe observers to chained `Connectable`.
    connectable_chained.subscribe(observer1);
    connectable_chained.subscribe(observer2);

    // Connect `Connectable` to start emitting to all `Subscriber`'s.
    // No emissions happen if `connect()` is not called.
    let connected = connectable.connect();

    // Do something else here.
    println!("Do something while Observable is emitting.");

    // Wait for `Connectable` observable to finish before exiting the program.
    // You can also use `connected.unsubscribe();` to stop all emissions.
    if connected.join_concurrent().await.is_err() {
        // Handle error
    }

    println!("`main` function done")
}
source

fn merge(self, sources: Vec<Observable<T>>) -> Observable<T>
where Self: Sized + Send + Sync + 'static,

Merges the current observable with a vector of observables, emitting items from all of them concurrently.

source

fn merge_one(self, source: Observable<T>) -> Observable<T>
where Self: Sized + Send + Sync + 'static,

Merges the current observable with another observable, emitting items from both concurrently.

source

fn switch_map<R: 'static, F>(self, project: F) -> Observable<R>
where Self: Sized + Send + Sync + 'static, F: FnMut(T) -> Observable<R> + Sync + Send + 'static,

Transforms the items emitted by an observable into observables, and flattens the emissions into a single observable, ignoring previous emissions once a new one is encountered. This is similar to map, but switches to a new inner observable whenever a new item is emitted.

§Parameters
  • project: A closure that maps each source item to an observable. The closure returns the observable for each item, and the emissions from these observables are flattened into a single observable.
§Returns

An observable that emits the items from the most recently emitted inner observable.

source

fn merge_map<R: 'static, F>(self, project: F) -> Observable<R>
where Self: Sized + Send + Sync + 'static, F: FnMut(T) -> Observable<R> + Sync + Send + 'static,

Transforms the items emitted by the source observable into other observables, and merges them into a single observable stream.

This operator applies the provided project function to each item emitted by the source observable. The function returns another observable. The operator subscribes to these inner observables concurrently and merges their emissions into one observable stream.

§Parameters
  • project: A closure that maps each item emitted by the source observable to another observable.
§Returns

An observable that emits items merged from the inner observables produced by the project function.

Examples found in repository?
examples/scan_operator.rs (line 78)
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
async fn main() {
    let observable = Observable::new(|mut o| {
        let done = Arc::new(Mutex::new(false));
        let done_c = Arc::clone(&done);
        let (tx, mut rx) = channel(10);

        task::spawn(async move {
            if let Some(UNSUBSCRIBE_SIGNAL) = rx.recv().await {
                *done_c.lock().unwrap() = UNSUBSCRIBE_SIGNAL;
            }
        });

        let join_handle = task::spawn(async move {
            for i in 0..100 {
                if *done.lock().unwrap() == UNSUBSCRIBE_SIGNAL {
                    break;
                }
                o.next(i);
                time::sleep(time::Duration::from_millis(1)).await;
            }
            o.complete();
        });

        Subscription::new(
            UnsubscribeLogic::Future(Box::pin(async move {
                if tx.send(UNSUBSCRIBE_SIGNAL).await.is_err() {
                    println!("Receiver dropped.");
                }
            })),
            SubscriptionHandle::JoinTask(join_handle),
        )
    });

    let mut observer = Subscriber::on_next(|v| println!("Emitted: {}", v));
    observer.on_complete(|| println!("Completed"));

    // Accumulate response strings into a single string.
    // The types of `total` and `n` may differ as long as `total` implements the `From<n>` trait.
    // In this example, `total` is of type `String`, and `n` is of type `&str`.
    let subscription = observable
        .take(6)
        .delay(100)
        .merge_map(|_| get_response_observable())
        .scan(|total, n| format!("{} {}", total, n), None)
        .subscribe(observer);

    // Do something else here.
    println!("Do something while Observable is emitting.");

    // Wait for observable to finish before exiting the program.
    if subscription.join_concurrent().await.is_err() {
        // Handle error
    }

    println!("`main` function done")
}
source

fn concat_map<R: 'static, F>(self, project: F) -> Observable<R>
where Self: Sized + Send + Sync + 'static, F: FnMut(T) -> Observable<R> + Sync + Send + 'static,

Transforms the items emitted by the source observable into other observables using a closure, and concatenates them into a single observable stream, waiting for each inner observable to complete before moving to the next.

This operator applies the provided project function to each item emitted by the source observable. The function returns another observable. The operator subscribes to these inner observables sequentially and concatenates their emissions into one observable stream.

§Parameters
  • project: A closure that maps each item emitted by the source observable to another observable.
§Returns

An observable that emits items concatenated from the inner observables produced by the project function.

source

fn exhaust_map<R: 'static, F>(self, project: F) -> Observable<R>
where Self: Sized + Send + Sync + 'static, F: FnMut(T) -> Observable<R> + Sync + Send + 'static,

Maps each item emitted by the source observable to an inner observable using a closure. It subscribes to these inner observables, ignoring new items until the current inner observable completes.

§Parameters
  • project: A closure that maps each item to an inner observable.
§Returns

An observable that emits inner observables exclusively. Inner observables do not emit and remain ignored if a preceding inner observable is still emitting. The emission of a subsequent inner observable is allowed only after the current one completes its emission.

Implementors§

source§

impl<O, T: 'static> ObservableExt<T> for O
where O: Subscribeable<ObsType = T>,