Struct rxr::observable::Observable

source ·
pub struct Observable<T> { /* private fields */ }
Expand description

The Observable struct represents a source of values that can be observed and transformed.

This struct serves as the foundation for creating, transforming, and working with observables. It provides methods for applying operators, subscribing to emitted values, and creating new observables.

§Example: basic synchronous Observable

This simple Observable emits values and completes. It returns an empty Subscription, making it unable to be unsubscribed from. Some operators like take, switch_map, merge_map, concat_map, and exhaust_map require unsubscribe functionality to work correctly.

Additionally, this is a synchronous Observable, so it blocks the current thread until it completes emission.

use rxr::subscribe::{Subscriber, Subscription, SubscriptionHandle, UnsubscribeLogic};
use rxr::{Observable, Observer, Subscribeable};

// Create a custom observable that emits values from 1 to 10.
let mut emit_10_observable = Observable::new(|mut subscriber| {
    let mut i = 1;

    while i <= 10 {
        // Emit the value to the subscriber.
        subscriber.next(i);
        i += 1;
    }
    // Signal completion to the subscriber.
    subscriber.complete();

    // Return the empty subscription.
    Subscription::new(UnsubscribeLogic::Nil, SubscriptionHandle::Nil)
});

// Create the `Subscriber` with a mandatory `next` function, and optional
// `complete` function. No need for `error` function in this simple example.
let mut observer = Subscriber::on_next(|v| println!("Emitted {}", v));
observer.on_complete(|| println!("Completed"));

// This observable blocks until completion since it doesn't use async or
// threads. If you comment out the line below, no emissions will occur
// because observables are cold.
emit_10_observable.subscribe(observer);

println!("Custom Observable finished emmiting")

§Example: basic asynchronous Observable

Emits values and completes, returning an empty Subscription, making it unable to be unsubscribed from. Some operators like take, switch_map, merge_map, concat_map, and exhaust_map require unsubscribe functionality to work correctly.

Utilizes an OS thread for asynchronous processing, preventing it from blocking the current thread.

use std::time::Duration;

use rxr::{
    subscribe::{Subscriber, Subscription, SubscriptionHandle, UnsubscribeLogic},
    Observable, ObservableExt, Observer, Subscribeable,
};

// Create a custom observable that emits values from 1 to 10 in separate thread.
let observable = Observable::new(|mut o| {
    // Launch a new thread for the Observable's processing and store its handle.
    let join_handle = std::thread::spawn(move || {
        for i in 0..=15 {
            // Emit the value to the subscriber.
            o.next(i);
            // Important. Put an await point after each emit or after some emits.
            // This allows the `take()` operator to function properly.
            // Not required in this example.
            std::thread::sleep(Duration::from_millis(1));
        }
        // Signal completion to the subscriber.
        o.complete();
    });

    // Return the subscription.
    Subscription::new(
        // In this example, we omit the unsubscribe functionality. Without it, we
        // can't unsubscribe, which prevents the `take()` operator, as well as
        // higher-order operators like `switch_map`, `merge_map`, `concat_map`,
        // and `exhaust_map`, from functioning as expected.
        UnsubscribeLogic::Nil,
        // Store the `JoinHandle` to enable waiting functionality using the
        // `Subscription` for this Observable thread to complete.
        SubscriptionHandle::JoinThread(join_handle),
    )
});

// Create the `Subscriber` with a mandatory `next` function, and optional
// `complete` function. No need for `error` function in this simple example.
let mut observer = Subscriber::on_next(|v| println!("Emitted {}", v));
observer.on_complete(|| println!("Completed"));

// This observable uses OS threads so it will not block the current thread.
// Observables are cold so if you comment out the statement bellow nothing
// will be emitted.
let subscription = observable
    .filter(|&v| v <= 10)
    .map(|v| format!("Mapped {}", v))
    .subscribe(observer);

// Do something else here.
println!("Do something while Observable is emitting.");

// Because the subscription creates a new thread, we can utilize the `Subscription`
// to wait for its completion. This ensures that the main thread won't terminate
// prematurely and stop all child threads.
if subscription.join().is_err() {
    // Handle error
}

println!("Custom Observable finished emmiting")

§Example: asynchronous Observable with unsubscribe

Emits values and completes, returning a Subscription that can be unsubscribed from, enabling all operators to function correctly. Utilizes an OS thread for asynchronous processing, preventing it from blocking the current thread.

use std::{
    sync::{Arc, Mutex},
    time::Duration,
};

use rxr::{
    subscribe::{Subscriber, Subscription, SubscriptionHandle, UnsubscribeLogic, Unsubscribeable},
    Observable, ObservableExt, Observer, Subscribeable,
};

const UNSUBSCRIBE_SIGNAL: bool = true;

// Create a custom observable that emits values in a separate thread.
let observable = Observable::new(|mut o| {
    let done = Arc::new(Mutex::new(false));
    let done_c = Arc::clone(&done);
    let (tx, rx) = std::sync::mpsc::channel();

    // Spawn a new thread to await a signal sent from the unsubscribe logic.
    std::thread::spawn(move || {
        // Attempt to receive a signal sent from the unsubscribe logic.
        if let Ok(UNSUBSCRIBE_SIGNAL) = rx.recv() {
            // Update the `done_c` mutex with the received signal.
            *done_c.lock().unwrap() = UNSUBSCRIBE_SIGNAL;
        }
    });

    // Launch a new thread for the Observable's processing and store its handle.
    let join_handle = std::thread::spawn(move || {
        for i in 0..=10000 {
            // If an unsubscribe signal is received, exit the loop and stop emissions.
            if *done.lock().unwrap() == UNSUBSCRIBE_SIGNAL {
                break;
            }
            // Emit the value to the subscriber.
            o.next(i);
            // Important. Put an await point after each emit or after some emits.
            // This allows the `take()` operator to function properly.
            std::thread::sleep(Duration::from_millis(1));
        }
        // Signal completion to the subscriber.
        o.complete();
    });

    // Return a new `Subscription` with custom unsubscribe logic.
    Subscription::new(
        // The provided closure defines the behavior of the subscription when it
        // is unsubscribed. In this case, it sends a signal to an asynchronous
        // observable to stop emitting values.
        UnsubscribeLogic::Logic(Box::new(move || {
            if tx.send(UNSUBSCRIBE_SIGNAL).is_err() {
                println!("Receiver dropped.");
            }
        })),
        // Store the `JoinHandle` for awaiting completion using the `Subscription`.
        SubscriptionHandle::JoinThread(join_handle),
    )
});

// Create the `Subscriber` with a mandatory `next` function, and optional
// `complete` function. No need for `error` function in this simple example.
let mut observer = Subscriber::on_next(|v| println!("Emitted {}", v));
observer.on_complete(|| println!("Completed"));

// This observable uses OS threads so it will not block the current thread.
// Observables are cold so if you comment out the statement bellow nothing
// will be emitted.
let subscription = observable
    // `take` utilizes our unsubscribe function to stop background emissions
    // after a specified item count.
    .take(500)
    .map(|v| format!("Mapped {}", v))
    .subscribe(observer);

// Do something else here.
println!("Do something while Observable is emitting.");

// Unsubscribe from the observable to stop emissions.
subscription.unsubscribe();

// Allow some time for the main thread to confirm that the observable indeed
// isn't emitting.
std::thread::sleep(Duration::from_millis(2000));
println!("`main` function done")

§Example: asynchronous Observable with Tokio

Emits values and completes, returning a Subscription that can be unsubscribed from, enabling all operators to function correctly. Utilizes Tokio tasks for asynchronous processing, preventing it from blocking the current thread.

 use std::sync::{Arc, Mutex};

 use rxr::{
     subscribe::{Subscriber, Subscription, SubscriptionHandle, UnsubscribeLogic},
     Observable, ObservableExt, Observer, Subscribeable,
 };

 use tokio::{task, time, sync::mpsc::channel};

 const UNSUBSCRIBE_SIGNAL: bool = true;

 #[tokio::main()]
 async fn main() {
     // Create a custom observable that emits values in a separate task.
     let observable = Observable::new(|mut o| {
         let done = Arc::new(Mutex::new(false));
         let done_c = Arc::clone(&done);
         let (tx, mut rx) = channel(10);

         // Spawn a new Tokio task to await a signal sent from the unsubscribe logic.
         task::spawn(async move {
             // Attempt to receive a signal sent from the unsubscribe logic.
             if let Some(UNSUBSCRIBE_SIGNAL) = rx.recv().await {
                 // Update the `done_c` mutex with the received signal.
                 *done_c.lock().unwrap() = UNSUBSCRIBE_SIGNAL;
             }
         });

         // Launch a new Tokio task for the Observable's processing and store its handle.
         let join_handle = task::spawn(async move {
             for i in 0..=10000 {
                 // If an unsubscribe signal is received, exit the loop and stop emissions.
                 if *done.lock().unwrap() == UNSUBSCRIBE_SIGNAL {
                     break;
                 }
                 // Emit the value to the subscriber.
                 o.next(i);
                 // Important. Put an await point after each emit or after some emits.
                 // This allows the `take()` operator to function properly.
                 time::sleep(time::Duration::from_millis(1)).await;
             }
             // Signal completion to the subscriber.
             o.complete();
         });

         // Return a new `Subscription` with custom unsubscribe logic.
         Subscription::new(
             // The provided closure defines the behavior of the subscription when it
             // is unsubscribed. In this case, it sends a signal to an asynchronous
             // observable to stop emitting values. If your closure requires Tokio
             // tasks or channels to send unsubscribe signals, use `UnsubscribeLogic::Future`.
             UnsubscribeLogic::Future(Box::pin(async move {
                 if tx.send(UNSUBSCRIBE_SIGNAL).await.is_err() {
                     println!("Receiver dropped.");
                 }
             })),
             // Store the `JoinHandle` for awaiting completion using the `Subscription`.
             SubscriptionHandle::JoinTask(join_handle),
         )
     });

     // Create the `Subscriber` with a mandatory `next` function, and optional
     // `complete` function. No need for `error` function in this simple example.
     let mut observer = Subscriber::on_next(|v| println!("Emitted {}", v));
     observer.on_complete(|| println!("Completed"));

     // This observable uses Tokio tasks so it will not block the current thread.
     // Observables are cold so if you comment out the statement bellow nothing
     // will be emitted.
     let subscription = observable
         // `take` utilizes our unsubscribe function to stop background emissions
         // after a specified item count.
         .take(15)
         .map(|v| format!("Mapped {}", v))
         .delay(1000)
         .subscribe(observer);

     // Do something else here.
     println!("Do something while Observable is emitting.");

     // Wait for the subscription to either complete as a Tokio task or join an OS thread.
     if subscription.join_concurrent().await.is_err() {
         // Handle error
     }

     println!("`main` function done")
 }

§Example: Observable with error handling

Waits for user input and emits both a value and a completion signal upon success. In case of any errors, it signals them to the attached Observer.

Ensure errors are wrapped in an Arc before passing them to the Observer’s error function.

 use std::{error::Error, fmt::Display, io, sync::Arc};

 use rxr::{subscribe::*, Observable, Observer, Subscribeable};

 #[derive(Debug)]
 struct MyErr(i32);

 impl Display for MyErr {
     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
         write!(f, "number should be less than 100, you entered {}", self.0)
     }
 }

 impl Error for MyErr {}

 // Creates an `Observable<i32>` that processes user input and emits or signals errors.
 pub fn get_less_than_100() -> Observable<i32> {
     Observable::new(|mut observer| {
         let mut input = String::new();

         println!("Please enter an integer (less than 100):");

         if let Err(e) = io::stdin().read_line(&mut input) {
             // Send input error to the observer.
             observer.error(Arc::new(e));
             return Subscription::new(UnsubscribeLogic::Nil, SubscriptionHandle::Nil);
         }

         match input.trim().parse::<i32>() {
             Err(e) => {
                 // Send parsing error to the observer.
                 observer.error(Arc::new(e));
             }
             Ok(num) if num > 100 => {
                 // Send custom error to the observer.
                 observer.error(Arc::new(MyErr(num)))
             }
             Ok(num) => {
                 // Emit the parsed value to the observer.
                 observer.next(num);
             }
         }

         // Signal completion if there are no errors.
         // Note: `complete` does not affect the outcome if `error` was called before it.
         observer.complete();

         Subscription::new(UnsubscribeLogic::Nil, SubscriptionHandle::Nil)
     })
 }

 let observer = Subscriber::new(
     |input| println!("You entered: {}", input),
     |e| eprintln!("{}", e),
     || println!("User input handled"),
 );

 let mut observable = get_less_than_100();

 observable.subscribe(observer);

Implementations§

source§

impl<T> Observable<T>

source

pub fn new( sf: impl FnMut(Subscriber<T>) -> Subscription + Send + Sync + 'static ) -> Self

Creates a new Observable with the provided subscribe function.

This method allows you to define custom behavior for the Observable by providing a subscribe function (sf), a closure that defines the behavior of the Observable when subscribed. When the Observable is subscribed to, the sf function is invoked to manage the delivery of values to the Subscriber. It should also return a Subscription that enables unsubscribing and can be used for awaiting Tokio tasks or joining OS threads when the Observable is asynchronous.

Examples found in repository?
examples/scan_operator.rs (lines 25-32)
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
fn get_response_observable() -> Observable<&'static str> {
    Observable::new(|mut o| {
        task::spawn(async move {
            o.next("response");
            time::sleep(time::Duration::from_millis(1)).await;
            o.complete();
        });
        Subscription::new(UnsubscribeLogic::Nil, SubscriptionHandle::Nil)
    })
}

#[tokio::main]
async fn main() {
    let observable = Observable::new(|mut o| {
        let done = Arc::new(Mutex::new(false));
        let done_c = Arc::clone(&done);
        let (tx, mut rx) = channel(10);

        task::spawn(async move {
            if let Some(UNSUBSCRIBE_SIGNAL) = rx.recv().await {
                *done_c.lock().unwrap() = UNSUBSCRIBE_SIGNAL;
            }
        });

        let join_handle = task::spawn(async move {
            for i in 0..100 {
                if *done.lock().unwrap() == UNSUBSCRIBE_SIGNAL {
                    break;
                }
                o.next(i);
                time::sleep(time::Duration::from_millis(1)).await;
            }
            o.complete();
        });

        Subscription::new(
            UnsubscribeLogic::Future(Box::pin(async move {
                if tx.send(UNSUBSCRIBE_SIGNAL).await.is_err() {
                    println!("Receiver dropped.");
                }
            })),
            SubscriptionHandle::JoinTask(join_handle),
        )
    });

    let mut observer = Subscriber::on_next(|v| println!("Emitted: {}", v));
    observer.on_complete(|| println!("Completed"));

    // Accumulate response strings into a single string.
    // The types of `total` and `n` may differ as long as `total` implements the `From<n>` trait.
    // In this example, `total` is of type `String`, and `n` is of type `&str`.
    let subscription = observable
        .take(6)
        .delay(100)
        .merge_map(|_| get_response_observable())
        .scan(|total, n| format!("{} {}", total, n), None)
        .subscribe(observer);

    // Do something else here.
    println!("Do something while Observable is emitting.");

    // Wait for observable to finish before exiting the program.
    if subscription.join_concurrent().await.is_err() {
        // Handle error
    }

    println!("`main` function done")
}
More examples
Hide additional examples
examples/basic_observable.rs (lines 16-31)
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
fn main() {
    // Create a custom observable that emits values from 1 to 10.
    let mut emit_10_observable = Observable::new(|mut subscriber| {
        let mut i = 1;

        while i <= 10 {
            // Emit the value to the subscriber.
            subscriber.next(i);

            i += 1;
        }

        // Signal completion to the subscriber.
        subscriber.complete();

        // Return the empty subscription.
        Subscription::new(UnsubscribeLogic::Nil, SubscriptionHandle::Nil)
    });

    // Create the `Subscriber` with a mandatory `next` function, and optional
    // `complete` function. No need for `error` function in this simple example.
    let mut observer = Subscriber::on_next(|v| println!("Emitted {}", v));
    observer.on_complete(|| println!("Completed"));

    // This observable does not use async or threads so it will block until it is done.
    // Observables are cold so if you comment out the line bellow nothing will be emitted.
    emit_10_observable.subscribe(observer);

    println!("Custom Observable finished emmiting")
}
examples/handle_error_observable.rs (lines 26-57)
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
pub fn get_less_than_100() -> Observable<i32> {
    Observable::new(|mut observer| {
        let mut input = String::new();

        println!("Please enter an integer (less than 100):");

        if let Err(e) = io::stdin().read_line(&mut input) {
            // Send input error to the observer.
            observer.error(Arc::new(e));
            return Subscription::new(UnsubscribeLogic::Nil, SubscriptionHandle::Nil);
        }

        match input.trim().parse::<i32>() {
            Err(e) => {
                // Send parsing error to the observer.
                observer.error(Arc::new(e));
            }
            Ok(num) if num > 100 => {
                // Send custom error to the observer.
                observer.error(Arc::new(MyErr(num)))
            }
            Ok(num) => {
                // Emit the parsed value to the observer.
                observer.next(num);
            }
        }

        // Signal completion if there are no errors.
        // Note: `complete` does not affect the outcome if `error` was called before it.
        observer.complete();

        Subscription::new(UnsubscribeLogic::Nil, SubscriptionHandle::Nil)
    })
}
examples/subject_as_observer.rs (lines 22-29)
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
pub fn main() {
    // Make an Observable.
    let mut observable = Observable::new(move |mut o: Subscriber<_>| {
        for i in 0..10 + 1 {
            o.next(i);
            std::thread::sleep(Duration::from_millis(1));
        }
        o.complete();
        Subscription::new(UnsubscribeLogic::Nil, SubscriptionHandle::Nil)
    });

    // Initialize a `Subject` and obtain its emitter and receiver.
    let (emitter, mut receiver) = Subject::emitter_receiver();

    // Register `Subscriber` 1.
    receiver.subscribe(create_subscriber(1));

    // Register `Subscriber` 2.
    receiver
        // We're cloning the receiver so we can use it again.
        // Shallow clone: clones only the pointer to the `Subject`.
        .clone()
        .take(7) // For performance, prioritize placing `take()` as the first operator.
        .delay(1000)
        .map(|v| format!("mapped {}", v))
        .subscribe(create_subscriber(2));

    // Register `Subscriber` 3.
    receiver
        .filter(|v| v % 2 == 0)
        .map(|v| format!("filtered {}", v))
        .subscribe(create_subscriber(3));

    // Convert the emitter into an observer and subscribe it to the observable.
    observable.subscribe(emitter.into());
}
examples/take_last_operator.rs (lines 20-50)
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
async fn main() {
    let observable = Observable::new(|mut o| {
        let done = Arc::new(Mutex::new(false));
        let done_c = Arc::clone(&done);
        let (tx, mut rx) = channel(10);

        task::spawn(async move {
            if let Some(UNSUBSCRIBE_SIGNAL) = rx.recv().await {
                *done_c.lock().unwrap() = UNSUBSCRIBE_SIGNAL;
            }
        });

        let join_handle = task::spawn(async move {
            for i in 0..100 {
                if *done.lock().unwrap() == UNSUBSCRIBE_SIGNAL {
                    break;
                }
                o.next(i);
                time::sleep(time::Duration::from_millis(1)).await;
            }
            o.complete();
        });

        Subscription::new(
            UnsubscribeLogic::Future(Box::pin(async move {
                if tx.send(UNSUBSCRIBE_SIGNAL).await.is_err() {
                    println!("Receiver dropped.");
                }
            })),
            SubscriptionHandle::JoinTask(join_handle),
        )
    });

    let mut observer = Subscriber::on_next(|v| println!("Emitted {}", v));
    observer.on_complete(|| println!("Completed"));

    // Capture and process only the last 8 values.
    let subscription = observable.take_last(8).subscribe(observer);

    // Do something else here.
    println!("Do something while Observable is emitting.");

    // Wait for observable to finish before exiting the program.
    if subscription.join_concurrent().await.is_err() {
        // Handle error
    }

    println!("`main` function done")
}
examples/take_while_operator.rs (lines 23-53)
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
async fn main() {
    let observable = Observable::new(|mut o| {
        let done = Arc::new(Mutex::new(false));
        let done_c = Arc::clone(&done);
        let (tx, mut rx) = channel(10);

        task::spawn(async move {
            if let Some(UNSUBSCRIBE_SIGNAL) = rx.recv().await {
                *done_c.lock().unwrap() = UNSUBSCRIBE_SIGNAL;
            }
        });

        let join_handle = task::spawn(async move {
            for i in 0..100 {
                if *done.lock().unwrap() == UNSUBSCRIBE_SIGNAL {
                    break;
                }
                o.next(i);
                time::sleep(time::Duration::from_millis(1)).await;
            }
            o.complete();
        });

        Subscription::new(
            UnsubscribeLogic::Future(Box::pin(async move {
                if tx.send(UNSUBSCRIBE_SIGNAL).await.is_err() {
                    println!("Receiver dropped.");
                }
            })),
            SubscriptionHandle::JoinTask(join_handle),
        )
    });

    let mut observer = Subscriber::on_next(|v| println!("Emitted {}", v));
    observer.on_complete(|| println!("Completed"));

    // Emit values only when they are less than or equal to 40.
    let subscription = observable.take_while(|v| v <= &40).subscribe(observer);

    // Do something else here.
    println!("Do something while Observable is emitting.");

    // Wait for observable to finish before exiting the program.
    if subscription.join_concurrent().await.is_err() {
        // Handle error
    }

    println!("`main` function done")
}
source

pub fn empty() -> Self

Creates an empty observable.

The resulting observable does not emit any values and immediately completes upon subscription. It serves as a placeholder or a base case for some observable operations.

source

pub fn fuse(self) -> Self

Fuse the observable, allowing it to complete at most once.

If complete() is called on a fused observable, any subsequent emissions will have no effect. This ensures that the observable is closed after the first completion call.

By default, observables are not fused, allowing them to emit values even after calling complete() and permitting multiple calls to complete(). When an observable emits an error, it is considered closed and will no longer emit any further values, regardless of being fused or not.

§Notes

fuse() does not unsubscribe ongoing emissions from the observable; it simply ignores them after the first complete() call, ensuring that no more values are emitted.

source

pub fn defuse(self) -> Self

Defuse the observable, allowing it to complete and emit values after calling complete().

Observables are defused by default, enabling them to emit values even after completion and allowing multiple calls to complete(). Calling defuse() is not necessary unless the observable has been previously fused using fuse(). Once an observable is defused, it can emit values and call complete() multiple times on its observers.

§Notes

Defusing an observable does not allow it to emit an error after the first error emission. Once an error is emitted, the observable is considered closed and will not emit any further values, regardless of being defused or not.

Trait Implementations§

source§

impl<T: Clone> Clone for Observable<T>

source§

fn clone(&self) -> Observable<T>

Returns a copy of the value. Read more
1.0.0 · source§

fn clone_from(&mut self, source: &Self)

Performs copy-assignment from source. Read more
source§

impl<T: Clone + Send + Sync + 'static> From<AsyncSubjectReceiver<T>> for Observable<T>

source§

fn from(value: AsyncSubjectReceiver<T>) -> Self

Converts to this type from the input type.
source§

impl<T: Clone + Send + Sync + 'static> From<BehaviorSubjectReceiver<T>> for Observable<T>

source§

fn from(value: BehaviorSubjectReceiver<T>) -> Self

Converts to this type from the input type.
source§

impl<T: Clone + Send + Sync + 'static> From<ReplaySubjectReceiver<T>> for Observable<T>

source§

fn from(value: ReplaySubjectReceiver<T>) -> Self

Converts to this type from the input type.
source§

impl<T: Clone + Send + Sync + 'static> From<SubjectReceiver<T>> for Observable<T>

source§

fn from(value: SubjectReceiver<T>) -> Self

Converts to this type from the input type.
source§

impl<T: 'static> Subscribeable for Observable<T>

§

type ObsType = T

The type of items emitted by the observable stream.
source§

fn subscribe(&mut self, v: Subscriber<Self::ObsType>) -> Subscription

Subscribes to the observable stream and specifies how to handle emitted values. Read more
source§

fn is_subject(&self) -> bool

Checks if the object implementing this trait is a variant of a Subject. Read more

Auto Trait Implementations§

§

impl<T> Freeze for Observable<T>

§

impl<T> RefUnwindSafe for Observable<T>

§

impl<T> Send for Observable<T>

§

impl<T> Sync for Observable<T>

§

impl<T> Unpin for Observable<T>

§

impl<T> UnwindSafe for Observable<T>

Blanket Implementations§

source§

impl<T> Any for T
where T: 'static + ?Sized,

source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
source§

impl<T> Borrow<T> for T
where T: ?Sized,

source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
source§

impl<T> From<T> for T

source§

fn from(t: T) -> T

Returns the argument unchanged.

source§

impl<T, U> Into<U> for T
where U: From<T>,

source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

source§

impl<O, T> ObservableExt<T> for O
where T: 'static, O: Subscribeable<ObsType = T>,

source§

fn map<U, F>(self, f: F) -> Observable<U>
where Self: Sized + Send + Sync + 'static, F: FnOnce(T) -> U + Copy + Sync + Send + 'static, U: 'static,

Transforms the items emitted by the observable using a transformation function. Read more
source§

fn filter<P>(self, predicate: P) -> Observable<T>
where Self: Sized + Send + Sync + 'static, P: FnOnce(&T) -> bool + Copy + Sync + Send + 'static,

Filters the items emitted by the observable based on a predicate function. Read more
source§

fn skip(self, n: usize) -> Observable<T>
where Self: Sized + Send + Sync + 'static,

Skips the first n items emitted by the observable and then emits the rest. Read more
source§

fn delay(self, num_of_ms: u64) -> Observable<T>
where Self: Sized + Send + Sync + 'static,

Delays the emissions from the observable by the specified number of milliseconds. Read more
source§

fn scan<U, F>(self, acc: F, seed: Option<U>) -> Observable<U>
where Self: Sized + Send + Sync + 'static, F: FnOnce(U, T) -> U + Copy + Sync + Send + 'static, U: From<T> + Clone + Send + Sync + 'static,

Accumulates values emitted by an observable over time, producing an accumulated result based on an accumulator function applied to each emitted value. Read more
source§

fn connectable(self) -> Connectable<T>
where Self: Send + Sync + Sized + 'static, T: Send + Sync + Clone,

Creates a connectable observable from the source observable. Read more
source§

fn first<F>(self, predicate: F, default_value: Option<T>) -> Observable<T>
where Self: Sized + Send + Sync + 'static, F: FnOnce(T, usize) -> bool + Copy + Send + Sync + 'static, T: Clone + Send + Sync,

Emits only the first item emitted by the source observable that satisfies the provided predicate, optionally applying a default value if no items match the predicate. Read more
source§

fn zip(self, observable_inputs: Vec<Observable<T>>) -> Observable<Vec<T>>
where Self: Clone + Sized + Send + Sync + 'static, T: Clone + Send,

Zips the values emitted by multiple observables into a single observable. Read more
source§

fn take(self, n: usize) -> Observable<T>
where Self: Sized + Send + Sync + 'static,

Emits at most the first n items emitted by the observable, then unsubscribes. Read more
source§

fn take_until<U: 'static>( self, notifier: Observable<U>, unsubscribe_notifier: bool ) -> Observable<T>
where Self: Sized + Send + Sync + 'static,

Continuously emits the values from the source observable until an event occurs, triggered by an emitted value from a separate notifier observable. Read more
source§

fn take_while<P>(self, predicate: P) -> Observable<T>
where Self: Sized + Send + Sync + 'static, P: FnOnce(&T) -> bool + Copy + Sync + Send + 'static,

Continues emitting values from the source observable as long as each value meets the specified predicate criteria. The operation concludes immediately upon encountering the first value that doesn’t satisfy the predicate. Read more
source§

fn take_last(self, count: usize) -> Observable<T>
where Self: Sized + Send + Sync + 'static, T: Send,

Produces an observable that emits, at maximum, the final count values emitted by the source observable. Read more
source§

fn tap(self, observer: Subscriber<T>) -> Observable<T>
where Self: Sized + Send + Sync + 'static, T: Clone,

The tap operator allows you to intercept the items emitted by an observable and perform side effects on those items without modifying the emitted data or the stream itself. Read more
source§

fn merge(self, sources: Vec<Observable<T>>) -> Observable<T>
where Self: Sized + Send + Sync + 'static,

Merges the current observable with a vector of observables, emitting items from all of them concurrently.
source§

fn merge_one(self, source: Observable<T>) -> Observable<T>
where Self: Sized + Send + Sync + 'static,

Merges the current observable with another observable, emitting items from both concurrently.
source§

fn switch_map<R: 'static, F>(self, project: F) -> Observable<R>
where Self: Sized + Send + Sync + 'static, F: FnMut(T) -> Observable<R> + Sync + Send + 'static,

Transforms the items emitted by an observable into observables, and flattens the emissions into a single observable, ignoring previous emissions once a new one is encountered. This is similar to map, but switches to a new inner observable whenever a new item is emitted. Read more
source§

fn merge_map<R: 'static, F>(self, project: F) -> Observable<R>
where Self: Sized + Send + Sync + 'static, F: FnMut(T) -> Observable<R> + Sync + Send + 'static,

Transforms the items emitted by the source observable into other observables, and merges them into a single observable stream. Read more
source§

fn concat_map<R: 'static, F>(self, project: F) -> Observable<R>
where Self: Sized + Send + Sync + 'static, F: FnMut(T) -> Observable<R> + Sync + Send + 'static,

Transforms the items emitted by the source observable into other observables using a closure, and concatenates them into a single observable stream, waiting for each inner observable to complete before moving to the next. Read more
source§

fn exhaust_map<R: 'static, F>(self, project: F) -> Observable<R>
where Self: Sized + Send + Sync + 'static, F: FnMut(T) -> Observable<R> + Sync + Send + 'static,

Maps each item emitted by the source observable to an inner observable using a closure. It subscribes to these inner observables, ignoring new items until the current inner observable completes. Read more
source§

impl<T> ToOwned for T
where T: Clone,

§

type Owned = T

The resulting type after obtaining ownership.
source§

fn to_owned(&self) -> T

Creates owned data from borrowed data, usually by cloning. Read more
source§

fn clone_into(&self, target: &mut T)

Uses borrowed data to replace owned data, usually by cloning. Read more
source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

§

type Error = Infallible

The type returned in the event of a conversion error.
source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.