Struct rxr::subscription::subscribe::Subscriber
source · pub struct Subscriber<NextFnType> { /* private fields */ }
Expand description
A type that acts as an observer, allowing users to handle emitted values, errors,
and completion when subscribing to an Observable
or Subject
.
Users can create a Subscriber
instance using the new
method and provide
custom functions to handle the next
, error
, and complete
events.
Implementations§
source§impl<NextFnType> Subscriber<NextFnType>
impl<NextFnType> Subscriber<NextFnType>
sourcepub fn new(
next_fn: impl FnMut(NextFnType) + 'static + Send,
error_fn: impl FnMut(Arc<dyn Error + Send + Sync>) + 'static + Send + Sync,
complete_fn: impl FnMut() + 'static + Send + Sync
) -> Self
pub fn new( next_fn: impl FnMut(NextFnType) + 'static + Send, error_fn: impl FnMut(Arc<dyn Error + Send + Sync>) + 'static + Send + Sync, complete_fn: impl FnMut() + 'static + Send + Sync ) -> Self
Creates a new Subscriber
instance with custom handling functions for emitted
values, errors, and completion.
Examples found in repository?
16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57
pub fn create_subscriber(subscriber_id: i32) -> Subscriber<i32> {
Subscriber::new(
move |v| println!("Subscriber #{} emitted: {}", subscriber_id, v),
|_| eprintln!("Error"),
move || println!("Completed {}", subscriber_id),
)
}
pub fn main() {
// Initialize a `BehaviorSubject` with an initial value and obtain
// its emitter and receiver.
let (mut emitter, mut receiver) = BehaviorSubject::emitter_receiver(100);
// Registers `Subscriber` 1 and emits the default value 100 to it.
receiver.subscribe(create_subscriber(1));
emitter.next(101); // Emits 101 to registered `Subscriber` 1.
emitter.next(102); // Emits 102 to registered `Subscriber` 1.
// All Observable operators can be applied to the receiver.
// Registers mapped `Subscriber` 2 and emits (now the default) value 102 to it.
receiver
.clone() // Shallow clone: clones only the pointer to the `BehaviorSubject` object.
.map(|v| format!("mapped {}", v))
.subscribe(Subscriber::new(
move |v| println!("Subscriber #2 emitted: {}", v),
|_| eprintln!("Error"),
|| println!("Completed 2"),
));
// Registers `Subscriber` 3 and emits (now the default) value 102 to it.
receiver.subscribe(create_subscriber(3));
emitter.next(103); // Emits 103 to registered `Subscriber`'s 1, 2 and 3.
emitter.complete(); // Calls `complete` on registered `Subscriber`'s 1, 2 and 3.
// Subscriber 4: post-completion subscribe, completes immediately.
receiver.subscribe(create_subscriber(4));
emitter.next(104); // Called post-completion, does not emit.
}
More examples
22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64
pub fn create_subscriber(subscriber_id: i32) -> Subscriber<i32> {
Subscriber::new(
move |v| println!("Subscriber #{} emitted: {}", subscriber_id, v),
|_| eprintln!("Error"),
move || println!("Completed {}", subscriber_id),
)
}
pub fn main() {
// Initialize a `ReplaySubject` with an unbounded buffer size and obtain
// its emitter and receiver.
let (mut emitter, mut receiver) = ReplaySubject::emitter_receiver(BufSize::Unbounded);
// Registers `Subscriber` 1.
receiver.subscribe(create_subscriber(1));
emitter.next(101); // Stores 101 and emits it to registered `Subscriber` 1.
emitter.next(102); // Stores 102 and emits it to registered `Subscriber` 1.
// All Observable operators can be applied to the receiver.
// Registers mapped `Subscriber` 2 and emits buffered values (101, 102) to it.
receiver
.clone() // Shallow clone: clones only the pointer to the `ReplaySubject` object.
.map(|v| format!("mapped {}", v))
.subscribe(Subscriber::new(
move |v| println!("Subscriber #2 emitted: {}", v),
|_| eprintln!("Error"),
|| println!("Completed 2"),
));
// Registers `Subscriber` 3 and emits buffered values (101, 102) to it.
receiver.subscribe(create_subscriber(3));
emitter.next(103); // Stores 103 and emits it to registered `Subscriber`'s 1, 2 and 3.
emitter.complete(); // Calls `complete` on registered `Subscriber`'s 1, 2 and 3.
// Subscriber 4: post-completion subscribe, emits buffered values (101, 102, 103)
// and completes.
receiver.subscribe(create_subscriber(4));
emitter.next(104); // Called post-completion, does not emit.
}
15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57
pub fn create_subscriber(subscriber_id: i32) -> Subscriber<i32> {
Subscriber::new(
move |v| println!("Subscriber #{} emitted: {}", subscriber_id, v),
|_| eprintln!("Error"),
move || println!("Completed {}", subscriber_id),
)
}
pub fn main() {
// Initialize a `AsyncSubject` and obtain its emitter and receiver.
let (mut emitter, mut receiver) = AsyncSubject::emitter_receiver();
// Registers `Subscriber` 1.
receiver.subscribe(create_subscriber(1));
emitter.next(101); // Stores 101 ast the latest value.
emitter.next(102); // Latest value is now 102.
// All Observable operators can be applied to the receiver.
// Registers mapped `Subscriber` 2.
receiver
.clone() // Shallow clone: clones only the pointer to the `AsyncSubject` object.
.map(|v| format!("mapped {}", v))
.subscribe(Subscriber::new(
move |v| println!("Subscriber #2 emitted: {}", v),
|_| eprintln!("Error"),
|| println!("Completed 2"),
));
// Registers `Subscriber` 3.
receiver.subscribe(create_subscriber(3));
emitter.next(103); // Latest value is now 103.
// Emits latest value (103) to registered `Subscriber`'s 1, 2 and 3 and calls
// `complete` on each of them.
emitter.complete();
// Subscriber 4: post-completion subscribe, emits latest value (103) and completes.
receiver.subscribe(create_subscriber(4));
emitter.next(104); // Called post-completion, does not emit.
}
13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53
pub fn create_subscriber(subscriber_id: i32) -> Subscriber<i32> {
Subscriber::new(
move |v| println!("Subscriber #{} emitted: {}", subscriber_id, v),
|_| eprintln!("Error"),
move || println!("Completed {}", subscriber_id),
)
}
pub fn main() {
// Initialize a `Subject` and obtain its emitter and receiver.
let (mut emitter, mut receiver) = Subject::emitter_receiver();
// Registers `Subscriber` 1.
receiver.subscribe(create_subscriber(1));
emitter.next(101); // Emits 101 to registered `Subscriber` 1.
emitter.next(102); // Emits 102 to registered `Subscriber` 1.
// All Observable operators can be applied to the receiver.
// Registers mapped `Subscriber` 2.
receiver
.clone() // Shallow clone: clones only the pointer to the `Subject` object.
.map(|v| format!("mapped {}", v))
.subscribe(Subscriber::new(
move |v| println!("Subscriber #2 emitted: {}", v),
|_| eprintln!("Error"),
|| println!("Completed 2"),
));
// Registers `Subscriber` 3.
receiver.subscribe(create_subscriber(3));
emitter.next(103); // Emits 103 to registered `Subscriber`'s 1, 2 and 3.
emitter.complete(); // Calls `complete` on registered `Subscriber`'s 1, 2 and 3.
// Subscriber 4: post-completion subscribe, completes immediately.
receiver.subscribe(create_subscriber(4));
emitter.next(104); // Called post-completion, does not emit.
}
20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75
pub fn create_subscriber(subscriber_id: i32) -> Subscriber<i32> {
Subscriber::new(
move |v| println!("Subscriber #{} emitted: {}", subscriber_id, v),
move |e| eprintln!("Error: {} {}", e, subscriber_id),
|| println!("Completed"),
)
}
#[derive(Debug)]
struct AsyncSubjectError(String);
impl Display for AsyncSubjectError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.0)
}
}
impl Error for AsyncSubjectError {}
pub fn main() {
// Initialize a `AsyncSubject` and obtain its emitter and receiver.
let (mut emitter, mut receiver) = AsyncSubject::emitter_receiver();
// Registers `Subscriber` 1.
receiver.subscribe(create_subscriber(1));
emitter.next(101); // Stores 101 ast the latest value.
emitter.next(102); // Latest value is now 102.
// All Observable operators can be applied to the receiver.
// Registers mapped `Subscriber` 2.
receiver
.clone() // Shallow clone: clones only the pointer to the `AsyncSubject` object.
.map(|v| format!("mapped {}", v))
.subscribe(Subscriber::new(
move |v| println!("Subscriber #2 emitted: {}", v),
|e| eprintln!("Error: {} 2", e),
|| println!("Completed"),
));
// Registers `Subscriber` 3.
receiver.subscribe(create_subscriber(3));
emitter.next(103); // Latest value is now 103.
// Calls `error` on registered `Subscriber`'s 1, 2 and 3.
emitter.error(Arc::new(AsyncSubjectError(
"AsyncSubject error".to_string(),
)));
// Subscriber 4: subscribed after subject's error call; emits error and
// does not emit further.
receiver.subscribe(create_subscriber(4));
emitter.next(104); // Called post-completion, does not emit.
}
21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77
pub fn create_subscriber(subscriber_id: i32) -> Subscriber<i32> {
Subscriber::new(
move |v| println!("Subscriber #{} emitted: {}", subscriber_id, v),
move |e| eprintln!("Error: {} {}", e, subscriber_id),
|| println!("Completed"),
)
}
#[derive(Debug)]
struct BehaviorSubjectError(String);
impl Display for BehaviorSubjectError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.0)
}
}
impl Error for BehaviorSubjectError {}
pub fn main() {
// Initialize a `BehaviorSubject` with an initial value and obtain
// its emitter and receiver.
let (mut emitter, mut receiver) = BehaviorSubject::emitter_receiver(100);
// Registers `Subscriber` 1 and emits the default value 100 to it.
receiver.subscribe(create_subscriber(1));
emitter.next(101); // Emits 101 to registered `Subscriber` 1.
emitter.next(102); // Emits 102 to registered `Subscriber` 1.
// All Observable operators can be applied to the receiver.
// Registers mapped `Subscriber` 2 and emits (now the default) value 102 to it.
receiver
.clone() // Shallow clone: clones only the pointer to the `BehaviorSubject` object.
.map(|v| format!("mapped {}", v))
.subscribe(Subscriber::new(
move |v| println!("Subscriber #2 emitted: {}", v),
|e| eprintln!("Error: {} 2", e),
|| println!("Completed"),
));
// Registers `Subscriber` 3 and emits (now the default) value 102 to it.
receiver.subscribe(create_subscriber(3));
emitter.next(103); // Emits 103 to registered `Subscriber`'s 1, 2 and 3.
// Calls `error` on registered `Subscriber`'s 1, 2 and 3.
emitter.error(Arc::new(BehaviorSubjectError(
"BehaviorSubject error".to_string(),
)));
// Subscriber 4: subscribed after subject's error call; emits error and
// does not emit further.
receiver.subscribe(create_subscriber(4));
emitter.next(104); // Called after subject's error call, does not emit.
}
sourcepub fn on_next(next_fn: impl FnMut(NextFnType) + 'static + Send) -> Self
pub fn on_next(next_fn: impl FnMut(NextFnType) + 'static + Send) -> Self
Create a new Subscriber with the provided next
function.
The next
closure is called when the observable emits a new item. It takes
a parameter of type NextFnType
, which is an item emitted by the observable.
Examples found in repository?
14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43
fn main() {
// Create a custom observable that emits values from 1 to 10.
let mut emit_10_observable = Observable::new(|mut subscriber| {
let mut i = 1;
while i <= 10 {
// Emit the value to the subscriber.
subscriber.next(i);
i += 1;
}
// Signal completion to the subscriber.
subscriber.complete();
// Return the empty subscription.
Subscription::new(UnsubscribeLogic::Nil, SubscriptionHandle::Nil)
});
// Create the `Subscriber` with a mandatory `next` function, and optional
// `complete` function. No need for `error` function in this simple example.
let mut observer = Subscriber::on_next(|v| println!("Emitted {}", v));
observer.on_complete(|| println!("Completed"));
// This observable does not use async or threads so it will block until it is done.
// Observables are cold so if you comment out the line bellow nothing will be emitted.
emit_10_observable.subscribe(observer);
println!("Custom Observable finished emmiting")
}
More examples
19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67
async fn main() {
let observable = Observable::new(|mut o| {
let done = Arc::new(Mutex::new(false));
let done_c = Arc::clone(&done);
let (tx, mut rx) = channel(10);
task::spawn(async move {
if let Some(UNSUBSCRIBE_SIGNAL) = rx.recv().await {
*done_c.lock().unwrap() = UNSUBSCRIBE_SIGNAL;
}
});
let join_handle = task::spawn(async move {
for i in 0..100 {
if *done.lock().unwrap() == UNSUBSCRIBE_SIGNAL {
break;
}
o.next(i);
time::sleep(time::Duration::from_millis(1)).await;
}
o.complete();
});
Subscription::new(
UnsubscribeLogic::Future(Box::pin(async move {
if tx.send(UNSUBSCRIBE_SIGNAL).await.is_err() {
println!("Receiver dropped.");
}
})),
SubscriptionHandle::JoinTask(join_handle),
)
});
let mut observer = Subscriber::on_next(|v| println!("Emitted {}", v));
observer.on_complete(|| println!("Completed"));
// Capture and process only the last 8 values.
let subscription = observable.take_last(8).subscribe(observer);
// Do something else here.
println!("Do something while Observable is emitting.");
// Wait for observable to finish before exiting the program.
if subscription.join_concurrent().await.is_err() {
// Handle error
}
println!("`main` function done")
}
22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70
async fn main() {
let observable = Observable::new(|mut o| {
let done = Arc::new(Mutex::new(false));
let done_c = Arc::clone(&done);
let (tx, mut rx) = channel(10);
task::spawn(async move {
if let Some(UNSUBSCRIBE_SIGNAL) = rx.recv().await {
*done_c.lock().unwrap() = UNSUBSCRIBE_SIGNAL;
}
});
let join_handle = task::spawn(async move {
for i in 0..100 {
if *done.lock().unwrap() == UNSUBSCRIBE_SIGNAL {
break;
}
o.next(i);
time::sleep(time::Duration::from_millis(1)).await;
}
o.complete();
});
Subscription::new(
UnsubscribeLogic::Future(Box::pin(async move {
if tx.send(UNSUBSCRIBE_SIGNAL).await.is_err() {
println!("Receiver dropped.");
}
})),
SubscriptionHandle::JoinTask(join_handle),
)
});
let mut observer = Subscriber::on_next(|v| println!("Emitted {}", v));
observer.on_complete(|| println!("Completed"));
// Emit values only when they are less than or equal to 40.
let subscription = observable.take_while(|v| v <= &40).subscribe(observer);
// Do something else here.
println!("Do something while Observable is emitting.");
// Wait for observable to finish before exiting the program.
if subscription.join_concurrent().await.is_err() {
// Handle error
}
println!("`main` function done")
}
36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91
async fn main() {
let observable = Observable::new(|mut o| {
let done = Arc::new(Mutex::new(false));
let done_c = Arc::clone(&done);
let (tx, mut rx) = channel(10);
task::spawn(async move {
if let Some(UNSUBSCRIBE_SIGNAL) = rx.recv().await {
*done_c.lock().unwrap() = UNSUBSCRIBE_SIGNAL;
}
});
let join_handle = task::spawn(async move {
for i in 0..100 {
if *done.lock().unwrap() == UNSUBSCRIBE_SIGNAL {
break;
}
o.next(i);
time::sleep(time::Duration::from_millis(1)).await;
}
o.complete();
});
Subscription::new(
UnsubscribeLogic::Future(Box::pin(async move {
if tx.send(UNSUBSCRIBE_SIGNAL).await.is_err() {
println!("Receiver dropped.");
}
})),
SubscriptionHandle::JoinTask(join_handle),
)
});
let mut observer = Subscriber::on_next(|v| println!("Emitted: {}", v));
observer.on_complete(|| println!("Completed"));
// Accumulate response strings into a single string.
// The types of `total` and `n` may differ as long as `total` implements the `From<n>` trait.
// In this example, `total` is of type `String`, and `n` is of type `&str`.
let subscription = observable
.take(6)
.delay(100)
.merge_map(|_| get_response_observable())
.scan(|total, n| format!("{} {}", total, n), None)
.subscribe(observer);
// Do something else here.
println!("Do something while Observable is emitting.");
// Wait for observable to finish before exiting the program.
if subscription.join_concurrent().await.is_err() {
// Handle error
}
println!("`main` function done")
}
19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81
async fn main() {
let observable = Observable::new(|mut o| {
let done = Arc::new(Mutex::new(false));
let done_c = Arc::clone(&done);
let (tx, mut rx) = channel(10);
task::spawn(async move {
if let Some(UNSUBSCRIBE_SIGNAL) = rx.recv().await {
*done_c.lock().unwrap() = UNSUBSCRIBE_SIGNAL;
}
});
let join_handle = task::spawn(async move {
for i in 0..100 {
if *done.lock().unwrap() == UNSUBSCRIBE_SIGNAL {
break;
}
o.next(i);
time::sleep(time::Duration::from_millis(1)).await;
}
o.complete();
});
Subscription::new(
UnsubscribeLogic::Future(Box::pin(async move {
if tx.send(UNSUBSCRIBE_SIGNAL).await.is_err() {
println!("Receiver dropped.");
}
})),
SubscriptionHandle::JoinTask(join_handle),
)
});
let mut observer = Subscriber::on_next(|v| println!("Emitted {}", v));
observer.on_complete(|| println!("Completed"));
// Create notifier, it can be observable or one of the Subject variants.
let (mut emitter, receiver) = Subject::<()>::emitter_receiver();
// Turning Subject into an observable. To continue using the receiver later,
// utilize `.clone()`, e.g. `receiver.clone().into()`.
let subscription = observable
.take_until(receiver.into(), false)
.subscribe(observer);
// Allowing some time for the `take_until` function to register the notifier
// before emitting a signal. This step is unnecessary if you're not immediately
// sending a signal.
time::sleep(time::Duration::from_millis(1)).await;
// Send signal to stop source observable emitting values.
emitter.next(());
// Do something else here.
println!("Do something while Observable is emitting.");
// Wait for observable to finish before exiting the program.
if subscription.join_concurrent().await.is_err() {
// Handle error
}
println!("`main` function done")
}
20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84
fn main() {
let observable = Observable::new(|mut o| {
let done = Arc::new(Mutex::new(false));
let done_c = Arc::clone(&done);
let (tx, rx) = std::sync::mpsc::channel();
std::thread::spawn(move || {
if let Ok(UNSUBSCRIBE_SIGNAL) = rx.recv() {
*done_c.lock().unwrap() = UNSUBSCRIBE_SIGNAL;
}
});
let join_handle = std::thread::spawn(move || {
for i in 0..100 {
if *done.lock().unwrap() == UNSUBSCRIBE_SIGNAL {
break;
}
o.next(i);
std::thread::sleep(Duration::from_millis(1));
}
o.complete();
});
Subscription::new(
UnsubscribeLogic::Logic(Box::new(move || {
if tx.send(UNSUBSCRIBE_SIGNAL).is_err() {
println!("Receiver dropped.");
}
})),
SubscriptionHandle::JoinThread(join_handle),
)
});
let mut observer = Subscriber::on_next(|v| println!("Emitted {}", v));
observer.on_complete(|| println!("Completed"));
// Create notifier, it can be observable or one of the Subject variants.
let (mut emitter, receiver) = Subject::emitter_receiver();
// We can chain the Subject, here we use `delay()` to slow down the notifier so
// source observable has time to emmit some values. Note when we chain the
// notifier with operators we don't have to use `into()`. To continue using the
// receiver later, utilize `.clone()`, e.g. `receiver.clone().delay(20)`.
let subscription = observable
.take_until(receiver.delay(20), false)
.subscribe(observer);
// Allowing some time for the `take_until` function to register the notifier
// before emitting a signal. This step is unnecessary if you're not immediately
// sending a signal.
std::thread::sleep(Duration::from_millis(1));
// Send signal to stop source observable emitting values.
emitter.next(());
// Do something else here.
println!("Do something while Observable is emitting.");
// Wait for observable to finish before exiting the program.
if subscription.join().is_err() {
// Handle error
}
println!("`main` function done")
}
sourcepub fn on_complete(&mut self, complete_fn: impl FnMut() + 'static + Send + Sync)
pub fn on_complete(&mut self, complete_fn: impl FnMut() + 'static + Send + Sync)
Set the completion function for the Subscriber.
The provided closure will be called when the observable completes its emission sequence.
Examples found in repository?
14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43
fn main() {
// Create a custom observable that emits values from 1 to 10.
let mut emit_10_observable = Observable::new(|mut subscriber| {
let mut i = 1;
while i <= 10 {
// Emit the value to the subscriber.
subscriber.next(i);
i += 1;
}
// Signal completion to the subscriber.
subscriber.complete();
// Return the empty subscription.
Subscription::new(UnsubscribeLogic::Nil, SubscriptionHandle::Nil)
});
// Create the `Subscriber` with a mandatory `next` function, and optional
// `complete` function. No need for `error` function in this simple example.
let mut observer = Subscriber::on_next(|v| println!("Emitted {}", v));
observer.on_complete(|| println!("Completed"));
// This observable does not use async or threads so it will block until it is done.
// Observables are cold so if you comment out the line bellow nothing will be emitted.
emit_10_observable.subscribe(observer);
println!("Custom Observable finished emmiting")
}
More examples
19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67
async fn main() {
let observable = Observable::new(|mut o| {
let done = Arc::new(Mutex::new(false));
let done_c = Arc::clone(&done);
let (tx, mut rx) = channel(10);
task::spawn(async move {
if let Some(UNSUBSCRIBE_SIGNAL) = rx.recv().await {
*done_c.lock().unwrap() = UNSUBSCRIBE_SIGNAL;
}
});
let join_handle = task::spawn(async move {
for i in 0..100 {
if *done.lock().unwrap() == UNSUBSCRIBE_SIGNAL {
break;
}
o.next(i);
time::sleep(time::Duration::from_millis(1)).await;
}
o.complete();
});
Subscription::new(
UnsubscribeLogic::Future(Box::pin(async move {
if tx.send(UNSUBSCRIBE_SIGNAL).await.is_err() {
println!("Receiver dropped.");
}
})),
SubscriptionHandle::JoinTask(join_handle),
)
});
let mut observer = Subscriber::on_next(|v| println!("Emitted {}", v));
observer.on_complete(|| println!("Completed"));
// Capture and process only the last 8 values.
let subscription = observable.take_last(8).subscribe(observer);
// Do something else here.
println!("Do something while Observable is emitting.");
// Wait for observable to finish before exiting the program.
if subscription.join_concurrent().await.is_err() {
// Handle error
}
println!("`main` function done")
}
22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70
async fn main() {
let observable = Observable::new(|mut o| {
let done = Arc::new(Mutex::new(false));
let done_c = Arc::clone(&done);
let (tx, mut rx) = channel(10);
task::spawn(async move {
if let Some(UNSUBSCRIBE_SIGNAL) = rx.recv().await {
*done_c.lock().unwrap() = UNSUBSCRIBE_SIGNAL;
}
});
let join_handle = task::spawn(async move {
for i in 0..100 {
if *done.lock().unwrap() == UNSUBSCRIBE_SIGNAL {
break;
}
o.next(i);
time::sleep(time::Duration::from_millis(1)).await;
}
o.complete();
});
Subscription::new(
UnsubscribeLogic::Future(Box::pin(async move {
if tx.send(UNSUBSCRIBE_SIGNAL).await.is_err() {
println!("Receiver dropped.");
}
})),
SubscriptionHandle::JoinTask(join_handle),
)
});
let mut observer = Subscriber::on_next(|v| println!("Emitted {}", v));
observer.on_complete(|| println!("Completed"));
// Emit values only when they are less than or equal to 40.
let subscription = observable.take_while(|v| v <= &40).subscribe(observer);
// Do something else here.
println!("Do something while Observable is emitting.");
// Wait for observable to finish before exiting the program.
if subscription.join_concurrent().await.is_err() {
// Handle error
}
println!("`main` function done")
}
36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91
async fn main() {
let observable = Observable::new(|mut o| {
let done = Arc::new(Mutex::new(false));
let done_c = Arc::clone(&done);
let (tx, mut rx) = channel(10);
task::spawn(async move {
if let Some(UNSUBSCRIBE_SIGNAL) = rx.recv().await {
*done_c.lock().unwrap() = UNSUBSCRIBE_SIGNAL;
}
});
let join_handle = task::spawn(async move {
for i in 0..100 {
if *done.lock().unwrap() == UNSUBSCRIBE_SIGNAL {
break;
}
o.next(i);
time::sleep(time::Duration::from_millis(1)).await;
}
o.complete();
});
Subscription::new(
UnsubscribeLogic::Future(Box::pin(async move {
if tx.send(UNSUBSCRIBE_SIGNAL).await.is_err() {
println!("Receiver dropped.");
}
})),
SubscriptionHandle::JoinTask(join_handle),
)
});
let mut observer = Subscriber::on_next(|v| println!("Emitted: {}", v));
observer.on_complete(|| println!("Completed"));
// Accumulate response strings into a single string.
// The types of `total` and `n` may differ as long as `total` implements the `From<n>` trait.
// In this example, `total` is of type `String`, and `n` is of type `&str`.
let subscription = observable
.take(6)
.delay(100)
.merge_map(|_| get_response_observable())
.scan(|total, n| format!("{} {}", total, n), None)
.subscribe(observer);
// Do something else here.
println!("Do something while Observable is emitting.");
// Wait for observable to finish before exiting the program.
if subscription.join_concurrent().await.is_err() {
// Handle error
}
println!("`main` function done")
}
19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81
async fn main() {
let observable = Observable::new(|mut o| {
let done = Arc::new(Mutex::new(false));
let done_c = Arc::clone(&done);
let (tx, mut rx) = channel(10);
task::spawn(async move {
if let Some(UNSUBSCRIBE_SIGNAL) = rx.recv().await {
*done_c.lock().unwrap() = UNSUBSCRIBE_SIGNAL;
}
});
let join_handle = task::spawn(async move {
for i in 0..100 {
if *done.lock().unwrap() == UNSUBSCRIBE_SIGNAL {
break;
}
o.next(i);
time::sleep(time::Duration::from_millis(1)).await;
}
o.complete();
});
Subscription::new(
UnsubscribeLogic::Future(Box::pin(async move {
if tx.send(UNSUBSCRIBE_SIGNAL).await.is_err() {
println!("Receiver dropped.");
}
})),
SubscriptionHandle::JoinTask(join_handle),
)
});
let mut observer = Subscriber::on_next(|v| println!("Emitted {}", v));
observer.on_complete(|| println!("Completed"));
// Create notifier, it can be observable or one of the Subject variants.
let (mut emitter, receiver) = Subject::<()>::emitter_receiver();
// Turning Subject into an observable. To continue using the receiver later,
// utilize `.clone()`, e.g. `receiver.clone().into()`.
let subscription = observable
.take_until(receiver.into(), false)
.subscribe(observer);
// Allowing some time for the `take_until` function to register the notifier
// before emitting a signal. This step is unnecessary if you're not immediately
// sending a signal.
time::sleep(time::Duration::from_millis(1)).await;
// Send signal to stop source observable emitting values.
emitter.next(());
// Do something else here.
println!("Do something while Observable is emitting.");
// Wait for observable to finish before exiting the program.
if subscription.join_concurrent().await.is_err() {
// Handle error
}
println!("`main` function done")
}
20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84
fn main() {
let observable = Observable::new(|mut o| {
let done = Arc::new(Mutex::new(false));
let done_c = Arc::clone(&done);
let (tx, rx) = std::sync::mpsc::channel();
std::thread::spawn(move || {
if let Ok(UNSUBSCRIBE_SIGNAL) = rx.recv() {
*done_c.lock().unwrap() = UNSUBSCRIBE_SIGNAL;
}
});
let join_handle = std::thread::spawn(move || {
for i in 0..100 {
if *done.lock().unwrap() == UNSUBSCRIBE_SIGNAL {
break;
}
o.next(i);
std::thread::sleep(Duration::from_millis(1));
}
o.complete();
});
Subscription::new(
UnsubscribeLogic::Logic(Box::new(move || {
if tx.send(UNSUBSCRIBE_SIGNAL).is_err() {
println!("Receiver dropped.");
}
})),
SubscriptionHandle::JoinThread(join_handle),
)
});
let mut observer = Subscriber::on_next(|v| println!("Emitted {}", v));
observer.on_complete(|| println!("Completed"));
// Create notifier, it can be observable or one of the Subject variants.
let (mut emitter, receiver) = Subject::emitter_receiver();
// We can chain the Subject, here we use `delay()` to slow down the notifier so
// source observable has time to emmit some values. Note when we chain the
// notifier with operators we don't have to use `into()`. To continue using the
// receiver later, utilize `.clone()`, e.g. `receiver.clone().delay(20)`.
let subscription = observable
.take_until(receiver.delay(20), false)
.subscribe(observer);
// Allowing some time for the `take_until` function to register the notifier
// before emitting a signal. This step is unnecessary if you're not immediately
// sending a signal.
std::thread::sleep(Duration::from_millis(1));
// Send signal to stop source observable emitting values.
emitter.next(());
// Do something else here.
println!("Do something while Observable is emitting.");
// Wait for observable to finish before exiting the program.
if subscription.join().is_err() {
// Handle error
}
println!("`main` function done")
}
sourcepub fn on_error(
&mut self,
error_fn: impl FnMut(Arc<dyn Error + Send + Sync>) + 'static + Send + Sync
)
pub fn on_error( &mut self, error_fn: impl FnMut(Arc<dyn Error + Send + Sync>) + 'static + Send + Sync )
Set the error-handling function for the Subscriber.
The provided closure will be called when the observable encounters an error
during its emission sequence. It takes an Arc
wrapping a trait object that
implements the Error
, Send
, and Sync
traits as its parameter.