mod generate_observable;
use std::{
sync::{Arc, Mutex},
time::Duration,
};
use generate_observable::{generate_delayed_observable, generate_u32_observable};
use rxr::subscribe::Subscriber;
use rxr::{ObservableExt, Subscribeable};
use tokio::time::sleep;
#[test]
fn switch_map_observable() {
let last_emits_count = Arc::new(Mutex::new(0_u32));
let last_emits_count2 = Arc::clone(&last_emits_count);
let inner_emits_cnt = Arc::new(Mutex::new(0_u32));
let inner_emits_cnt2 = Arc::clone(&inner_emits_cnt);
let global_buffer = Arc::new(Mutex::new(Ok(())));
{
let global_buffer_clone = Arc::clone(&global_buffer);
let o = Subscriber::new(
|_| {
},
|_observable_error| {},
|| {},
);
let outer_o_max_count = 100;
let inner_o_max_count = 10;
use std::panic::catch_unwind;
let observable = generate_u32_observable(outer_o_max_count, move |last_emit_value| {
*last_emits_count2.lock().unwrap() = last_emit_value;
assert!(
last_emit_value == outer_o_max_count,
"outer observable did not emit all values,
last value emitted {}, expected {}",
last_emit_value,
outer_o_max_count
);
});
let lock = Arc::new(Mutex::new(true));
let mut observable = observable.switch_map(move |v| {
let global_buffer_clone = Arc::clone(&global_buffer_clone);
let inner_emits_cnt2 = Arc::clone(&inner_emits_cnt2);
let lock = Arc::clone(&lock);
generate_u32_observable(inner_o_max_count, move |last_emit_inner_value| {
let _guard = lock.lock().unwrap();
*inner_emits_cnt2.lock().unwrap() += 1;
if global_buffer_clone.lock().unwrap().is_err() {
return;
}
if v < outer_o_max_count {
*global_buffer_clone.lock().unwrap() = catch_unwind(|| {
assert!(
last_emit_inner_value < inner_o_max_count,
"switch_map did not unsubscribed inner observable properly.
Outer value is {} which is not last value emitted by outer observable. Inner observable reached
{} which is its last value",
v,
last_emit_inner_value
);
});
} else {
*global_buffer_clone.lock().unwrap() = catch_unwind(|| {
assert!(
v == outer_o_max_count,
"switch_map emitted more values
than it should. Expected {}, found {}",
outer_o_max_count,
v
);
assert!(
last_emit_inner_value == inner_o_max_count,
"last inner observable should have emitted all of its values.
Expected {}, found {}",
inner_o_max_count,
last_emit_inner_value
);
});
}
})
});
let s = observable.subscribe(o);
if let Err(e) = s.join() {
std::panic::resume_unwind(e);
};
std::thread::sleep(Duration::from_millis(3000));
assert!(
*last_emits_count.lock().unwrap() == outer_o_max_count,
"switch_map should have emitted {} times, but emitted {} instead",
outer_o_max_count,
*last_emits_count.lock().unwrap()
);
assert!(
*inner_emits_cnt.lock().unwrap() != 0,
"switch_map did not projected any of the inner observables, should project {}",
outer_o_max_count
);
*inner_emits_cnt.lock().unwrap() -= 1;
assert!(
*inner_emits_cnt.lock().unwrap() == outer_o_max_count,
"switch_map did not projected all of the inner observables.
Projected {}, should project {}",
*inner_emits_cnt.lock().unwrap(),
outer_o_max_count
);
}
assert!(
Arc::strong_count(&global_buffer) == 1,
"strong count of the global buffer is {} but should be 1",
Arc::strong_count(&global_buffer)
);
let m = Arc::try_unwrap(global_buffer).unwrap();
let m = m.into_inner().unwrap();
if let Err(e) = m {
std::panic::resume_unwind(e);
};
}
#[tokio::test(flavor = "multi_thread", worker_threads = 8)]
async fn concat_map_observable() {
use std::panic::catch_unwind;
let compare_outer_values = Arc::new(Mutex::new(0_u32));
let compare_outer_values2 = Arc::clone(&compare_outer_values);
let lock = Arc::new(Mutex::new(true));
let ongoing_emissions = Arc::new(Mutex::new(false));
let ongoing_emissions2 = Arc::clone(&ongoing_emissions);
let global_buffer = Arc::new(Mutex::new(Ok(())));
{
let global_buffer_clone = Arc::clone(&global_buffer);
let global_buffer_clone2 = Arc::clone(&global_buffer);
let sequence_guard = Arc::new(Mutex::new(true));
let o = Subscriber::new(
move |v: u32| {
if v == 0 {
let _sequence_guard = sequence_guard.lock().unwrap();
let ongoing_emissions3 = Arc::clone(&ongoing_emissions);
if global_buffer_clone2.lock().unwrap().is_ok() {
*global_buffer_clone2.lock().unwrap() = catch_unwind(move || {
assert!(
*ongoing_emissions3.lock().unwrap() == false,
"concat_map started emitting next inner observable while previous one still not completed"
);
});
*ongoing_emissions.lock().unwrap() = true;
}
}
},
|_observable_error| {},
move || {
*ongoing_emissions2.lock().unwrap() = false;
},
);
let outer_o_max_count = 100;
let inner_o_max_count = 10;
let observable = generate_u32_observable(outer_o_max_count, move |last_emit_value| {
assert!(
last_emit_value == outer_o_max_count,
"outer observable did not emit all values,
last value emitted {}, expected {}",
last_emit_value,
outer_o_max_count
);
});
let mut observable = observable.concat_map(move |v| {
let global_buffer_clone = Arc::clone(&global_buffer_clone);
let compare_outer_values2 = Arc::clone(&compare_outer_values2);
let lock = Arc::clone(&lock);
generate_u32_observable(inner_o_max_count, move |last_emit_inner_value| {
let _guard = lock.lock().unwrap();
let expected_outer_value = *compare_outer_values2.lock().unwrap();
*compare_outer_values2.lock().unwrap() += 1;
if global_buffer_clone.lock().unwrap().is_err() {
return;
}
*global_buffer_clone.lock().unwrap() = catch_unwind(|| {
assert!(
last_emit_inner_value == inner_o_max_count,
"concat_map should emit all values for this inner observable.
Last emitted inner value is {} but it should have reached {}",
last_emit_inner_value,
inner_o_max_count
);
assert!(
expected_outer_value == v,
"concat_map did not finished emitting values in sequential order. Next emitted
outer observable value should have been {}, got {} instead",
expected_outer_value,
v
);
});
})
});
let s = observable.subscribe(o);
if let Err(e) = s.join() {
std::panic::resume_unwind(e);
};
sleep(Duration::from_millis(25000)).await;
assert!(
*compare_outer_values.lock().unwrap() != 0,
"concat_map did not project any of the inner observables, should project {}",
outer_o_max_count
);
let values_emitted_count = *compare_outer_values.lock().unwrap() - 1;
assert!(
!(values_emitted_count > outer_o_max_count),
"concat_map emitted more values than it should. Emitted {}, expected {}",
values_emitted_count,
outer_o_max_count
);
assert_eq!(
values_emitted_count, outer_o_max_count,
"concat_map has failed to project all of the inner observables.
Projected {}, should project {}",
values_emitted_count, outer_o_max_count
);
}
assert!(
Arc::strong_count(&global_buffer) == 1,
"strong count of the global buffer is {} but should be 1",
Arc::strong_count(&global_buffer)
);
let m = Arc::try_unwrap(global_buffer).unwrap();
let m = m.into_inner().unwrap();
if let Err(e) = m {
std::panic::resume_unwind(e);
};
}
#[tokio::test(flavor = "multi_thread")]
async fn merge_map_observable() {
let last_emits_count = Arc::new(Mutex::new(0_u32));
let last_emits_count2 = Arc::clone(&last_emits_count);
let inner_emits_cnt = Arc::new(Mutex::new(0_u32));
let inner_emits_cnt2 = Arc::clone(&inner_emits_cnt);
let global_buffer = Arc::new(Mutex::new(Ok(())));
{
let global_buffer_clone = Arc::clone(&global_buffer);
let o = Subscriber::new(
|_| {
},
|_observable_error| {},
|| {},
);
let outer_o_max_count = 100;
let inner_o_max_count = 10;
use std::panic::catch_unwind;
let observable = generate_u32_observable(outer_o_max_count, move |last_emit_value| {
*last_emits_count2.lock().unwrap() = last_emit_value;
assert!(
last_emit_value == outer_o_max_count,
"outer observable did not emit all values,
last value emitted {}, expected {}",
last_emit_value,
outer_o_max_count
);
});
let lock = Arc::new(Mutex::new(true));
let mut observable = observable.merge_map(move |_| {
let global_buffer_clone = Arc::clone(&global_buffer_clone);
let inner_emits_cnt2 = Arc::clone(&inner_emits_cnt2);
let lock = Arc::clone(&lock);
generate_u32_observable(inner_o_max_count, move |last_emit_inner_value| {
let _guard = lock.lock().unwrap();
*inner_emits_cnt2.lock().unwrap() += 1;
if global_buffer_clone.lock().unwrap().is_err() {
return;
}
*global_buffer_clone.lock().unwrap() = catch_unwind(|| {
assert!(
last_emit_inner_value == inner_o_max_count,
"inner observable should have emitted all of its values.
Expected {}, found {}",
inner_o_max_count,
last_emit_inner_value
);
});
})
});
let s = observable.subscribe(o);
if let Err(e) = s.join() {
std::panic::resume_unwind(e);
};
sleep(Duration::from_millis(10000)).await;
assert!(
*last_emits_count.lock().unwrap() == outer_o_max_count,
"merge_map should have emitted {} times, but emitted {} instead",
outer_o_max_count,
*last_emits_count.lock().unwrap()
);
assert!(
*inner_emits_cnt.lock().unwrap() != 0,
"merge_map did not projected any of the inner observables, should project {}",
outer_o_max_count
);
*inner_emits_cnt.lock().unwrap() -= 1;
assert!(
*inner_emits_cnt.lock().unwrap() == outer_o_max_count,
"merge_map did not projected all of the inner observables.
Projected {}, should project {}",
*inner_emits_cnt.lock().unwrap(),
outer_o_max_count
);
}
assert!(
Arc::strong_count(&global_buffer) == 1,
"strong count of the global buffer is {} but should be 1",
Arc::strong_count(&global_buffer)
);
let m = Arc::try_unwrap(global_buffer).unwrap();
let m = m.into_inner().unwrap();
if let Err(e) = m {
std::panic::resume_unwind(e);
};
}
#[tokio::test(flavor = "multi_thread")]
async fn debounce_map_observable_notify_all() {
let emitted_values_store = Arc::new(Mutex::new(Vec::with_capacity(29)));
let emitted_values_store_cl = Arc::clone(&emitted_values_store);
let all_notifeirs_unsubscribed = Arc::new(Mutex::new(true));
let all_notifeirs_unsubscribed_cl = Arc::clone(&all_notifeirs_unsubscribed);
let o = Subscriber::on_next(move |v: u32| {
emitted_values_store_cl.lock().unwrap().push(v);
});
let end = 28;
let notifier_end = 28;
let outer_observable = generate_delayed_observable(end.try_into().unwrap(), 100, |_| {});
let subscription = outer_observable
.debounce_map(move |_| {
let all_notifeirs_unsubscribed_cl = Arc::clone(&all_notifeirs_unsubscribed_cl);
generate_delayed_observable(notifier_end, 20, move |v| {
if v == notifier_end - 1 {
*all_notifeirs_unsubscribed_cl.lock().unwrap() = false;
}
})
})
.subscribe(o);
let _ = subscription.join_concurrent().await;
sleep(Duration::from_millis(2000)).await;
let stored_cnt = emitted_values_store.lock().unwrap().len();
assert_eq!(
*all_notifeirs_unsubscribed.lock().unwrap(),
true,
"one or more duration observables failed to unsubscribe"
);
assert_eq!(
stored_cnt, end,
"debounced observable should have emitted all {end} values, emitted {stored_cnt}"
);
}
#[tokio::test(flavor = "multi_thread")]
async fn debounce_map_observable_notify_none() {
let emitted_values_store = Arc::new(Mutex::new(Vec::with_capacity(29)));
let emitted_values_store_cl = Arc::clone(&emitted_values_store);
let all_notifeirs_unsubscribed = Arc::new(Mutex::new(true));
let all_notifeirs_unsubscribed_cl = Arc::clone(&all_notifeirs_unsubscribed);
let o = Subscriber::on_next(move |v: u32| {
emitted_values_store_cl.lock().unwrap().push(v);
});
let end = 28;
let notifier_end = 28;
let outer_observable = generate_delayed_observable(end.try_into().unwrap(), 20, |_| {});
let subscription = outer_observable
.debounce_map(move |_| {
let all_notifeirs_unsubscribed_cl = Arc::clone(&all_notifeirs_unsubscribed_cl);
generate_delayed_observable(notifier_end, 100, move |v| {
if v == notifier_end - 1 {
*all_notifeirs_unsubscribed_cl.lock().unwrap() = false;
}
})
})
.subscribe(o);
let _ = subscription.join_concurrent().await;
sleep(Duration::from_millis(4000)).await;
let stored_cnt = emitted_values_store.lock().unwrap().len();
assert_eq!(
*all_notifeirs_unsubscribed.lock().unwrap(),
true,
"one or more duration observables failed to unsubscribe"
);
assert_eq!(
stored_cnt, 1,
"debounced observable should have emitted one value upon complete, emitted {stored_cnt}"
);
}