use arc_swap::ArcSwapOption;
use crossbeam_queue::{ArrayQueue, SegQueue};
use never::Never;
use std::sync::{
atomic::{AtomicBool, AtomicUsize, Ordering as AtomicOrdering},
Arc,
};
use tracing::info;
use crate::common::VariantName;
use callbag::{from_iter, Message};
#[cfg(all(target_arch = "wasm32", not(target_os = "wasi")))]
use wasm_bindgen_test::wasm_bindgen_test;
#[cfg(all(
all(target_arch = "wasm32", not(target_os = "wasi")),
feature = "browser",
))]
use wasm_bindgen_test::wasm_bindgen_test_configure;
pub mod common;
#[cfg(all(
all(target_arch = "wasm32", not(target_os = "wasi")),
feature = "browser",
))]
wasm_bindgen_test_configure!(run_in_browser);
#[tracing::instrument]
#[test_log::test]
#[cfg_attr(
all(target_arch = "wasm32", not(target_os = "wasi")),
wasm_bindgen_test
)]
fn it_sends_array_items_iterable_to_a_puller_sink() {
let source = from_iter([10, 20, 30]);
let downwards_expected_types = ["Handshake", "Data", "Data", "Data", "Terminate"];
let downwards_expected_types = {
let q = ArrayQueue::new(downwards_expected_types.len());
for v in downwards_expected_types {
q.push(v).ok();
}
Arc::new(q)
};
let downwards_expected = [10, 20, 30];
let downwards_expected = {
let q = ArrayQueue::new(downwards_expected.len());
for v in downwards_expected {
q.push(v).ok();
}
Arc::new(q)
};
let talkback = ArcSwapOption::from(None);
source(Message::Handshake(Arc::new(
(move |message: Message<_, Never>| {
info!("down: {message:?}");
{
let et = downwards_expected_types.pop().unwrap();
assert_eq!(
message.variant_name(),
et,
"downwards type is expected: {et}"
);
}
if let Message::Handshake(source) = message {
talkback.store(Some(source));
let talkback = talkback.load();
let talkback = talkback.as_ref().unwrap();
talkback(Message::Pull);
} else if let Message::Data(data) = message {
{
let e = downwards_expected.pop().unwrap();
assert_eq!(data, e, "downwards data is expected: {e}");
}
let talkback = talkback.load();
let talkback = talkback.as_ref().unwrap();
talkback(Message::Pull);
}
})
.into(),
)));
}
#[tracing::instrument]
#[test_log::test]
#[cfg_attr(
all(target_arch = "wasm32", not(target_os = "wasi")),
wasm_bindgen_test
)]
fn it_sends_array_entries_iterator_to_a_puller_sink() {
let source = from_iter(["a", "b", "c"].into_iter().enumerate());
let downwards_expected_types = ["Handshake", "Data", "Data", "Data", "Terminate"];
let downwards_expected_types = {
let q = ArrayQueue::new(downwards_expected_types.len());
for v in downwards_expected_types {
q.push(v).ok();
}
Arc::new(q)
};
let downwards_expected = [(0, "a"), (1, "b"), (2, "c")];
let downwards_expected = {
let q = ArrayQueue::new(downwards_expected.len());
for v in downwards_expected {
q.push(v).ok();
}
Arc::new(q)
};
let talkback = ArcSwapOption::from(None);
source(Message::Handshake(Arc::new(
(move |message: Message<_, Never>| {
info!("down: {message:?}");
{
let et = downwards_expected_types.pop().unwrap();
assert_eq!(
message.variant_name(),
et,
"downwards type is expected: {et}"
);
}
if let Message::Handshake(source) = message {
talkback.store(Some(source));
let talkback = talkback.load();
let talkback = talkback.as_ref().unwrap();
talkback(Message::Pull);
} else if let Message::Data(data) = message {
{
let e = downwards_expected.pop().unwrap();
assert_eq!(data, e, "downwards data is expected: {e:?}");
}
let talkback = talkback.load();
let talkback = talkback.as_ref().unwrap();
talkback(Message::Pull);
}
})
.into(),
)));
}
#[tracing::instrument]
#[test_log::test]
#[cfg_attr(
all(target_arch = "wasm32", not(target_os = "wasi")),
wasm_bindgen_test
)]
fn it_does_not_blow_up_the_stack_when_iterating_something_huge() {
#[derive(Clone)]
struct Gen {
i: Arc<AtomicUsize>,
}
impl Gen {
fn new(i: Arc<AtomicUsize>) -> Self {
Gen { i }
}
}
impl Iterator for Gen {
type Item = usize;
fn next(&mut self) -> Option<Self::Item> {
let i = self.i.load(AtomicOrdering::Acquire);
if i < 1_000_000 {
self.i.fetch_add(1, AtomicOrdering::AcqRel);
Some(i)
} else {
None
}
}
}
let i = Arc::new(AtomicUsize::new(0));
let gen = Gen::new(Arc::clone(&i));
let source = from_iter(gen);
let talkback = ArcSwapOption::from(None);
let iterated = Arc::new(AtomicBool::new(false));
source(Message::Handshake(Arc::new(
{
let iterated = Arc::clone(&iterated);
move |message| {
if let Message::Handshake(source) = message {
talkback.store(Some(source));
let talkback = talkback.load();
let talkback = talkback.as_ref().unwrap();
talkback(Message::Pull);
} else if let Message::Data(_data) = message {
let talkback = talkback.load();
let talkback = talkback.as_ref().unwrap();
talkback(Message::Pull);
} else if let Message::Error(_) | Message::Terminate = message {
assert_eq!(
i.load(AtomicOrdering::Acquire),
1_000_000,
"1 million items were iterated"
);
iterated.store(true, AtomicOrdering::Release);
}
}
}
.into(),
)));
assert!(
iterated.load(AtomicOrdering::Acquire),
"iteration happened synchronously"
);
}
#[tracing::instrument]
#[test_log::test]
#[cfg_attr(
all(target_arch = "wasm32", not(target_os = "wasi")),
wasm_bindgen_test
)]
fn it_stops_sending_after_source_completion() {
let source = from_iter([10, 20, 30]);
let actual = Arc::new(SegQueue::new());
let downwards_expected_types = ["Handshake", "Data"];
let downwards_expected_types = {
let q = ArrayQueue::new(downwards_expected_types.len());
for v in downwards_expected_types {
q.push(v).ok();
}
Arc::new(q)
};
let talkback = ArcSwapOption::from(None);
source(Message::Handshake(Arc::new(
{
let actual = Arc::clone(&actual);
move |message: Message<_, Never>| {
info!("down: {message:?}");
{
let et = downwards_expected_types.pop().unwrap();
assert_eq!(
message.variant_name(),
et,
"downwards type is expected: {et}"
);
}
if let Message::Handshake(source) = message {
talkback.store(Some(source));
let talkback = talkback.load();
let talkback = talkback.as_ref().unwrap();
talkback(Message::Pull);
} else if let Message::Data(data) = message {
actual.push(data);
let talkback = talkback.load();
let talkback = talkback.as_ref().unwrap();
talkback(Message::Terminate);
talkback(Message::Pull);
talkback(Message::Pull);
}
}
}
.into(),
)));
assert_eq!(
&{
let mut v = vec![];
for _i in 0..actual.len() {
v.push(actual.pop().unwrap());
}
v
}[..],
[10]
);
}