use super::*;
use tokio::time::{sleep, Duration};
pub fn make_emit_u32_observable(
end: u32,
last_emit_assert: impl FnMut(u32) + Send + Sync + 'static,
) -> Observable<u32> {
let last_emit_assert = Arc::new(Mutex::new(last_emit_assert));
Observable::new(move |mut o: Subscriber<_>| {
let done = Arc::new(Mutex::new(false));
let done_c = Arc::clone(&done);
let (tx, rx) = std::sync::mpsc::channel();
std::thread::spawn(move || {
if let Ok(i) = rx.recv() {
*done_c.lock().unwrap() = i;
}
});
let last_emit_assert = Arc::clone(&last_emit_assert);
let jh = std::thread::spawn(move || {
let mut last_emit = 0;
for i in 0..=end {
if *done.lock().unwrap() == true {
break;
}
last_emit = i;
o.next(i);
std::thread::sleep(Duration::from_millis(1));
}
o.complete();
last_emit_assert.lock().unwrap()(last_emit);
});
Subscription::new(
UnsubscribeLogic::Logic(Box::new(move || {
if let Err(_) = tx.send(true) {
eprintln!("receiver dropped");
}
})),
SubscriptionHandle::JoinThread(jh),
)
})
}
#[tokio::test(flavor = "multi_thread")]
#[ignore = "reason: this test is unreliable"]
async fn exhaust_map_observable() {
let observable_occupied: Arc<Mutex<Option<u32>>> = Arc::new(Mutex::new(None));
let observable_occupied2 = Arc::clone(&observable_occupied);
let project_lock = Arc::new(Mutex::new(true));
let should_finish = Arc::new(Mutex::new(true));
let should_finish2 = Arc::clone(&should_finish);
let last_emits_count = Arc::new(Mutex::new(0));
let last_emits_count2 = Arc::clone(&last_emits_count);
let global_buffer = Arc::new(Mutex::new(Ok(())));
{
let global_buffer_clone = Arc::clone(&global_buffer);
let o = Subscriber::new(
|_| {
},
|_observable_error| {},
move || {
*observable_occupied2.lock().unwrap() = None;
*should_finish2.lock().unwrap() = true;
},
);
let outer_o_max_count = 100;
let inner_o_max_count = 10;
use std::panic::catch_unwind;
let observable = make_emit_u32_observable(outer_o_max_count, move |last_emit_value| {
*last_emits_count2.lock().unwrap() = last_emit_value;
assert!(
last_emit_value == outer_o_max_count,
"outer observable did not emit all values,
last value emitted {}, expected {}",
last_emit_value,
outer_o_max_count
);
});
let should_run = Arc::new(Mutex::new(Vec::<u32>::new()));
let should_run_clone = Arc::clone(&should_run);
let did_it_run = Arc::new(Mutex::new(Vec::<u32>::new()));
let did_it_run_clone = Arc::clone(&did_it_run);
let lock = Arc::new(Mutex::new(true));
let mut observable = observable.exhaust_map(move |v| {
let _project_guard = project_lock.lock().unwrap();
let global_buffer_clone = Arc::clone(&global_buffer_clone);
let should_finish_cl = Arc::clone(&should_finish);
let lock = Arc::clone(&lock);
if observable_occupied.lock().unwrap().is_none() {
*observable_occupied.lock().unwrap() = Some(v);
(*should_run_clone.lock().unwrap()).push(v);
} else {
*should_finish.lock().unwrap() = false;
}
let did_it_run_clone = Arc::clone(&did_it_run_clone);
make_emit_u32_observable(inner_o_max_count, move |last_emit_inner_value| {
let _guard = lock.lock().unwrap();
if global_buffer_clone.lock().unwrap().is_err() {
return;
}
if *should_finish_cl.lock().unwrap() {
*should_finish_cl.lock().unwrap() = false;
*global_buffer_clone.lock().unwrap() = catch_unwind(|| {
assert!(
last_emit_inner_value == inner_o_max_count,
"exhaust_map should emit all values for this inner observable.
Last emitted inner value is {} but it should have reached {}",
last_emit_inner_value,
inner_o_max_count
);
});
(*did_it_run_clone.lock().unwrap()).push(v);
} else {
*global_buffer_clone.lock().unwrap() = catch_unwind(|| {
assert!(
last_emit_inner_value < inner_o_max_count,
"exhaust_map did not unsubscribed inner observable properly.
It finished emitting inner observable that should be rejected because other inner observable
was emitting its values. Inner observable reached its last value {} but should have
been rejected. Outer observable is {}.",
last_emit_inner_value,
v
);
});
}
})
});
let s = observable.subscribe(o);
if let Err(e) = s.join() {
std::panic::resume_unwind(e);
};
sleep(Duration::from_millis(7000)).await;
assert!(
*last_emits_count.lock().unwrap() == outer_o_max_count,
"Outer observable should have emitted {} times, but emitted {} instead",
outer_o_max_count,
*last_emits_count.lock().unwrap()
);
assert!(
!((*should_run.lock().unwrap()).len() < (*did_it_run.lock().unwrap()).len()),
"exhaust_map emitted more values than it should,\nshould = {:?},\ndid = {:?}",
*should_run.lock().unwrap(),
*did_it_run.lock().unwrap(),
);
assert_eq!(
*should_run.lock().unwrap(),
*did_it_run.lock().unwrap(),
"exhaust_map has failed to let some inner observables to run to completion"
);
}
assert!(
Arc::strong_count(&global_buffer) == 1,
"strong count of the global buffer is {} but should be 1",
Arc::strong_count(&global_buffer)
);
let m = Arc::try_unwrap(global_buffer).unwrap();
let m = m.into_inner().unwrap();
if let Err(e) = m {
std::panic::resume_unwind(e);
};
}