take_until_operator_os/
take_until_operator_os.rs1use std::{
9 sync::{Arc, Mutex},
10 time::Duration,
11};
12
13use rxr::{
14 subscribe::{Subscriber, Subscription, SubscriptionHandle, UnsubscribeLogic},
15 Observable, ObservableExt, Observer, Subject, Subscribeable,
16};
17
18const UNSUBSCRIBE_SIGNAL: bool = true;
19
20fn main() {
21 let observable = Observable::new(|mut o| {
22 let done = Arc::new(Mutex::new(false));
23 let done_c = Arc::clone(&done);
24 let (tx, rx) = std::sync::mpsc::channel();
25
26 std::thread::spawn(move || {
27 if let Ok(UNSUBSCRIBE_SIGNAL) = rx.recv() {
28 *done_c.lock().unwrap() = UNSUBSCRIBE_SIGNAL;
29 }
30 });
31
32 let join_handle = std::thread::spawn(move || {
33 for i in 0..100 {
34 if *done.lock().unwrap() == UNSUBSCRIBE_SIGNAL {
35 break;
36 }
37 o.next(i);
38 std::thread::sleep(Duration::from_millis(1));
39 }
40 o.complete();
41 });
42
43 Subscription::new(
44 UnsubscribeLogic::Logic(Box::new(move || {
45 if tx.send(UNSUBSCRIBE_SIGNAL).is_err() {
46 println!("Receiver dropped.");
47 }
48 })),
49 SubscriptionHandle::JoinThread(join_handle),
50 )
51 });
52
53 let mut observer = Subscriber::on_next(|v| println!("Emitted {}", v));
54 observer.on_complete(|| println!("Completed"));
55
56 let (mut emitter, receiver) = Subject::emitter_receiver();
58
59 let subscription = observable
64 .take_until(receiver.delay(20), false)
65 .subscribe(observer);
66
67 std::thread::sleep(Duration::from_millis(1));
71
72 emitter.next(());
74
75 println!("Do something while Observable is emitting.");
77
78 if subscription.join().is_err() {
80 }
82
83 println!("`main` function done")
84}