use super::{ZeroMqPub, ZmqStatus, zmq_sub};
use crate::{Graph, Node, NodeOperators, RunFor, RunMode, StreamOperators, ticker};
use log::Level::Info;
use std::rc::Rc;
use std::time::Duration;
#[test]
fn zmq_deserialization_error_propagates() {
_ = env_logger::try_init();
let port = 5562;
std::thread::spawn(move || {
let ctx = zmq::Context::new();
let sock = ctx.socket(zmq::PUB).unwrap();
sock.bind(&format!("tcp://127.0.0.1:{port}")).unwrap();
std::thread::sleep(Duration::from_millis(200));
for _ in 0..20 {
sock.send("not valid bincode".as_bytes(), 0).unwrap();
std::thread::sleep(Duration::from_millis(50));
}
});
let (data, _status) =
zmq_sub::<u64>(format!("tcp://127.0.0.1:{port}")).expect("zmq_sub failed");
let result = data
.as_node()
.run(RunMode::RealTime, RunFor::Duration(Duration::from_secs(3)));
assert!(
result.is_err(),
"expected deserialization error to propagate"
);
}
fn sender(period: Duration, port: u16) -> Rc<dyn Node> {
ticker(period).count().logged("pub", Info).zmq_pub(port, ())
}
fn sender_with_delay(period: Duration, port: u16) -> Rc<dyn Node> {
ticker(period)
.count()
.delay(Duration::from_millis(200))
.logged("pub", Info)
.zmq_pub(port, ())
}
fn receiver(address: &str) -> Rc<dyn Node> {
let (data, _status) = zmq_sub::<u64>(address).expect("zmq_sub direct address should not fail");
data.logged("sub", Info).collect().finally(|res, _| {
let values: Vec<u64> = res.into_iter().flat_map(|item| item.value).collect();
println!("{values:?}");
assert!(
values.len() >= 5,
"expected at least 5 items, got {}",
values.len()
);
for window in values.windows(2) {
assert_eq!(window[1], window[0] + 1, "expected consecutive integers");
}
Ok(())
})
}
#[test]
fn zmq_same_thread() {
_ = env_logger::try_init();
let period = Duration::from_millis(50);
let port = 5556;
let address = format!("tcp://127.0.0.1:{port}");
let run_for = RunFor::Duration(period * 10);
Graph::new(
vec![sender(period, port), receiver(&address)],
RunMode::RealTime,
run_for,
)
.print()
.run()
.unwrap();
}
#[test]
fn zmq_separate_threads() {
_ = env_logger::try_init();
let period = Duration::from_millis(50);
let port = std::net::TcpListener::bind("127.0.0.1:0")
.unwrap()
.local_addr()
.unwrap()
.port();
let address = format!("tcp://127.0.0.1:{port}");
let run_for = RunFor::Duration(Duration::from_secs(2));
let rf_send = run_for;
let rf_rec = run_for;
let rec = std::thread::spawn(move || receiver(&address).run(RunMode::RealTime, rf_rec));
let send =
std::thread::spawn(move || sender_with_delay(period, port).run(RunMode::RealTime, rf_send));
send.join().unwrap().unwrap();
rec.join().unwrap().unwrap();
}
#[test]
fn zmq_pub_historical_mode_fails() {
use crate::NanoTime;
let result = sender(Duration::from_millis(10), 5558)
.run(RunMode::HistoricalFrom(NanoTime::ZERO), RunFor::Forever);
let err = result.expect_err("expected historical mode to fail for zmq sender");
let err_msg = format!("{err:?}");
assert!(
err_msg.contains("real-time"),
"expected error to mention real-time, got: {err_msg}"
);
}
#[test]
fn zmq_sub_historical_mode_fails() {
use crate::NanoTime;
let result = zmq_sub::<u64>("tcp://127.0.0.1:5559")
.unwrap()
.0
.as_node()
.run(RunMode::HistoricalFrom(NanoTime::ZERO), RunFor::Forever);
let err = result.expect_err("expected historical mode to fail for zmq receiver");
let err_msg = format!("{err:?}");
assert!(
err_msg.contains("real-time"),
"expected error to mention real-time, got: {err_msg}"
);
}
#[test]
fn zmq_first_message_not_dropped() {
_ = env_logger::try_init();
let period = Duration::from_millis(50);
let port = 5560;
let address = format!("tcp://127.0.0.1:{port}");
let run_for = RunFor::Duration(period * 15);
let (data, _status) = zmq_sub::<u64>(&address).unwrap();
let recv_node = data.collect().finally(|res, _| {
let values: Vec<u64> = res.into_iter().flat_map(|item| item.value).collect();
assert!(!values.is_empty(), "no values received");
assert_eq!(values[0], 1, "first message dropped: got {}", values[0]);
Ok(())
});
Graph::new(
vec![sender_with_delay(period, port), recv_node],
RunMode::RealTime,
run_for,
)
.run()
.unwrap();
}
#[test]
fn zmq_first_message_not_dropped_no_delay() {
_ = env_logger::try_init();
let period = Duration::from_millis(50);
let port = std::net::TcpListener::bind("127.0.0.1:0")
.unwrap()
.local_addr()
.unwrap()
.port();
let address = format!("tcp://127.0.0.1:{port}");
let run_for = RunFor::Duration(period * 15);
let (data, _status) = zmq_sub::<u64>(&address).unwrap();
let recv_node = data.collect().finally(|res, _| {
let values: Vec<u64> = res.into_iter().flat_map(|item| item.value).collect();
assert!(!values.is_empty(), "no values received");
assert_eq!(values[0], 1, "first message dropped: got {}", values[0]);
Ok(())
});
Graph::new(
vec![sender(period, port), recv_node],
RunMode::RealTime,
run_for,
)
.run()
.unwrap();
}
#[test]
fn zmq_reports_connected_status() {
_ = env_logger::try_init();
let period = Duration::from_millis(50);
let port = 5561;
let address = format!("tcp://127.0.0.1:{port}");
let run_for = RunFor::Duration(period * 10);
let (data, status) = zmq_sub::<u64>(&address).unwrap();
let data_node = data.collect().finally(|res, _| {
let values: Vec<u64> = res.into_iter().flat_map(|item| item.value).collect();
assert!(!values.is_empty(), "no data received");
Ok(())
});
let status_node = status.collect().finally(|statuses, _| {
let vs: Vec<ZmqStatus> = statuses.into_iter().map(|item| item.value).collect();
assert!(
vs.contains(&ZmqStatus::Connected),
"expected Connected, got: {vs:?}"
);
Ok(())
});
Graph::new(
vec![sender(period, port), data_node, status_node],
RunMode::RealTime,
run_for,
)
.run()
.unwrap();
}
#[test]
fn zmq_sub_stops_cleanly_without_publisher_endofstream() {
_ = env_logger::try_init();
let port = 5563;
let address = format!("tcp://127.0.0.1:{port}");
std::thread::spawn(move || {
let ctx = zmq::Context::new();
let sock = ctx.socket(zmq::PUB).unwrap();
sock.bind(&format!("tcp://127.0.0.1:{port}")).unwrap();
std::thread::sleep(Duration::from_millis(500));
});
let (data, _status) = zmq_sub::<u64>(&address).unwrap();
let start = std::time::Instant::now();
data.as_node()
.run(
RunMode::RealTime,
RunFor::Duration(Duration::from_millis(300)),
)
.unwrap();
let elapsed = start.elapsed();
assert!(
elapsed < Duration::from_secs(2),
"subscriber took too long to stop: {elapsed:?}"
);
}
#[cfg(feature = "zmq-etcd-integration-test")]
fn start_etcd_container() -> anyhow::Result<(impl Drop, String)> {
use testcontainers::{GenericImage, ImageExt, core::WaitFor, runners::SyncRunner};
let container = GenericImage::new("gcr.io/etcd-development/etcd", "v3.5.0")
.with_wait_for(WaitFor::message_on_stderr(
"now serving peer/client/metrics",
))
.with_env_var("ETCD_LISTEN_CLIENT_URLS", "http://0.0.0.0:2379")
.with_env_var("ETCD_ADVERTISE_CLIENT_URLS", "http://0.0.0.0:2379")
.start()?;
let port = container.get_host_port_ipv4(2379)?;
let endpoint = format!("http://127.0.0.1:{port}");
Ok((container, endpoint))
}
#[cfg(feature = "zmq-etcd-integration-test")]
fn wait_for_etcd_key(endpoint: &str, key: &str, timeout: Duration) -> anyhow::Result<String> {
use etcd_client::Client;
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()?;
let deadline = std::time::Instant::now() + timeout;
loop {
let val = rt.block_on(async {
let mut client = Client::connect([endpoint], None).await?;
let resp = client.get(key, None).await?;
anyhow::Ok(
resp.kvs()
.first()
.and_then(|kv| kv.value_str().ok())
.map(|s| s.to_string()),
)
})?;
if let Some(v) = val {
return Ok(v);
}
if std::time::Instant::now() >= deadline {
anyhow::bail!("key '{}' not found in etcd within {:?}", key, timeout);
}
std::thread::sleep(Duration::from_millis(100));
}
}
#[cfg(feature = "zmq-cross-lang-test")]
mod cross_lang_tests {
use super::*;
use std::process::{Command, Stdio};
fn require_python() {
let ok = Command::new("python3")
.args(["-c", "import wingfoil"])
.stdout(Stdio::null())
.stderr(Stdio::null())
.status()
.map(|s| s.success())
.unwrap_or(false);
assert!(
ok,
"zmq-cross-lang-test feature is enabled but `import wingfoil` failed. \
Run `maturin develop` in wingfoil-python/ first."
);
}
fn run_python(script: &str) -> std::process::Output {
Command::new("python3")
.args(["-c", script])
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.output()
.expect("failed to execute python")
}
fn assert_python_ok(output: &std::process::Output, label: &str) {
assert!(
output.status.success(),
"{label} failed (exit {}):\nstdout: {}\nstderr: {}",
output.status,
String::from_utf8_lossy(&output.stdout),
String::from_utf8_lossy(&output.stderr),
);
}
#[test]
fn zmq_rust_pub_python_sub_direct() {
require_python();
_ = env_logger::try_init();
let port = 5580u16;
let pub_handle = std::thread::spawn(move || {
ticker(Duration::from_millis(50))
.count()
.map(|n: u64| format!("{n}").into_bytes())
.zmq_pub(port, ())
.run(RunMode::RealTime, RunFor::Duration(Duration::from_secs(2)))
});
std::thread::sleep(Duration::from_millis(500));
let script = format!(
r#"
import wingfoil as wf
items = []
data, _status = wf.zmq_sub("tcp://127.0.0.1:{port}")
data.inspect(lambda msgs: items.extend(msgs)).run(realtime=True, duration=1.0)
assert len(items) >= 3, f"expected >= 3 items, got {{len(items)}}"
nums = [int(b) for b in items]
for a, b in zip(nums, nums[1:]):
assert b == a + 1, f"non-consecutive: {{a}}, {{b}}"
"#
);
let output = run_python(&script);
assert_python_ok(&output, "rust_pub_python_sub_direct");
pub_handle.join().unwrap().unwrap();
}
#[test]
fn zmq_python_pub_rust_sub_direct() {
require_python();
_ = env_logger::try_init();
let port = 5581u16;
let script = format!(
r#"
import wingfoil as wf
(
wf.ticker(0.05)
.count()
.map(lambda n: str(n).encode())
.zmq_pub({port})
.run(realtime=True, duration=2.0)
)
"#
);
let child = Command::new("python3")
.args(["-c", &script])
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.spawn()
.expect("failed to spawn python publisher");
std::thread::sleep(Duration::from_millis(500));
let address = format!("tcp://127.0.0.1:{port}");
let (data, _status) = zmq_sub::<Vec<u8>>(&address).expect("zmq_sub failed");
let recv_node = data.collect().finally(|res, _| {
let values: Vec<String> = res
.into_iter()
.flat_map(|burst| burst.value)
.map(|b| String::from_utf8(b).expect("invalid utf8"))
.collect();
assert!(
values.len() >= 3,
"expected >= 3 items, got {}",
values.len()
);
let nums: Vec<u64> = values.iter().map(|s| s.parse().unwrap()).collect();
for w in nums.windows(2) {
assert_eq!(w[1], w[0] + 1, "non-consecutive: {} {}", w[0], w[1]);
}
Ok(())
});
recv_node
.run(RunMode::RealTime, RunFor::Duration(Duration::from_secs(1)))
.unwrap();
let output = child.wait_with_output().expect("failed to wait on python");
assert_python_ok(&output, "python_pub_rust_sub_direct");
}
#[cfg(feature = "zmq-cross-lang-etcd-test")]
mod etcd {
use super::*;
use crate::adapters::etcd::EtcdConnection;
use crate::adapters::zmq::EtcdRegistry;
#[test]
fn zmq_rust_pub_python_sub_etcd() {
require_python();
_ = env_logger::try_init();
let (_container, endpoint) = super::super::start_etcd_container().unwrap();
let port = 5582u16;
let service = "cross-lang/rust-pub";
let ep = endpoint.clone();
let pub_handle = std::thread::spawn(move || {
let conn = EtcdConnection::new(ep);
ticker(Duration::from_millis(50))
.count()
.map(|n: u64| format!("{n}").into_bytes())
.zmq_pub(port, (service, EtcdRegistry::new(conn)))
.run(RunMode::RealTime, RunFor::Duration(Duration::from_secs(3)))
});
super::super::wait_for_etcd_key(&endpoint, service, Duration::from_secs(5)).unwrap();
let script = format!(
r#"
import wingfoil as wf
items = []
data, _status = wf.zmq_sub_etcd("{service}", "{endpoint}")
data.inspect(lambda msgs: items.extend(msgs)).run(realtime=True, duration=1.0)
assert len(items) >= 3, f"expected >= 3 items, got {{len(items)}}"
nums = [int(b) for b in items]
for a, b in zip(nums, nums[1:]):
assert b == a + 1, f"non-consecutive: {{a}}, {{b}}"
"#
);
let output = run_python(&script);
assert_python_ok(&output, "rust_pub_python_sub_etcd");
pub_handle.join().unwrap().unwrap();
}
#[test]
fn zmq_python_pub_rust_sub_etcd() {
require_python();
_ = env_logger::try_init();
let (_container, endpoint) = super::super::start_etcd_container().unwrap();
let port = 5583u16;
let service = "cross-lang/python-pub";
let script = format!(
r#"
import wingfoil as wf
(
wf.ticker(0.05)
.count()
.map(lambda n: str(n).encode())
.zmq_pub_etcd("{service}", {port}, "{endpoint}")
.run(realtime=True, duration=3.0)
)
"#
);
let child = Command::new("python3")
.args(["-c", &script])
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.spawn()
.expect("failed to spawn python publisher");
super::super::wait_for_etcd_key(&endpoint, service, Duration::from_secs(5)).unwrap();
let conn = EtcdConnection::new(&endpoint);
let (data, _status) = zmq_sub::<Vec<u8>>((service, EtcdRegistry::new(conn)))
.expect("zmq_sub_etcd failed");
let recv_node = data.collect().finally(|res, _| {
let values: Vec<String> = res
.into_iter()
.flat_map(|burst| burst.value)
.map(|b| String::from_utf8(b).expect("invalid utf8"))
.collect();
assert!(
values.len() >= 3,
"expected >= 3 items, got {}",
values.len()
);
let nums: Vec<u64> = values.iter().map(|s| s.parse().unwrap()).collect();
for w in nums.windows(2) {
assert_eq!(w[1], w[0] + 1, "non-consecutive: {} {}", w[0], w[1]);
}
Ok(())
});
recv_node
.run(RunMode::RealTime, RunFor::Duration(Duration::from_secs(1)))
.unwrap();
let output = child.wait_with_output().expect("failed to wait on python");
assert_python_ok(&output, "python_pub_rust_sub_etcd");
}
}
}
#[cfg(feature = "zmq-etcd-integration-test")]
mod etcd_tests {
use super::*;
use crate::adapters::etcd::EtcdConnection;
use crate::adapters::zmq::EtcdRegistry;
use etcd_client::Client;
fn start_etcd() -> anyhow::Result<(impl Drop, EtcdConnection)> {
let (container, endpoint) = super::start_etcd_container()?;
Ok((container, EtcdConnection::new(endpoint)))
}
fn read_key(conn: &EtcdConnection, key: &str) -> anyhow::Result<Option<String>> {
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()?;
rt.block_on(async {
let mut client = Client::connect(&conn.endpoints, None).await?;
let resp = client.get(key, None).await?;
Ok(resp
.kvs()
.first()
.and_then(|kv| kv.value_str().ok())
.map(|s| s.to_string()))
})
}
#[test]
fn zmq_sub_etcd_no_etcd_returns_error() {
let conn = EtcdConnection::new("http://127.0.0.1:59999");
let result = zmq_sub::<u64>(("anything", EtcdRegistry::new(conn)));
assert!(result.is_err(), "expected error when etcd is unreachable");
}
#[test]
fn zmq_sub_etcd_name_not_found() {
let (_container, conn) = start_etcd().unwrap();
let result = zmq_sub::<u64>(("nonexistent-key", EtcdRegistry::new(conn)));
assert!(result.is_err(), "expected error for absent key");
let msg = result.unwrap_err().to_string();
assert!(
msg.contains("no publisher named"),
"unexpected error message: {msg}"
);
}
#[test]
fn zmq_pub_etcd_registers_address() {
_ = env_logger::try_init();
let (_container, conn) = start_etcd().unwrap();
let port = 5596u16;
let conn_clone = conn.clone();
let handle = std::thread::spawn(move || {
ticker(Duration::from_millis(50))
.count()
.zmq_pub(port, ("etcd-quotes", EtcdRegistry::new(conn_clone)))
.run(RunMode::RealTime, RunFor::Duration(Duration::from_secs(2)))
});
let val =
super::wait_for_etcd_key(&conn.endpoints[0], "etcd-quotes", Duration::from_secs(5))
.unwrap();
assert!(val.contains("5596"), "address should contain port 5596");
handle.join().unwrap().unwrap();
}
#[test]
fn zmq_sub_etcd_end_to_end() {
_ = env_logger::try_init();
let (_container, conn) = start_etcd().unwrap();
let port = 5597u16;
let conn_pub = conn.clone();
std::thread::spawn(move || {
ticker(Duration::from_millis(50))
.count()
.zmq_pub(port, ("etcd-data", EtcdRegistry::new(conn_pub)))
.run(RunMode::RealTime, RunFor::Duration(Duration::from_secs(3)))
});
super::wait_for_etcd_key(&conn.endpoints[0], "etcd-data", Duration::from_secs(5)).unwrap();
let (data, _status) = zmq_sub::<u64>(("etcd-data", EtcdRegistry::new(conn))).unwrap();
let recv_node = data.collect().finally(|res, _| {
let values: Vec<u64> = res.into_iter().flat_map(|item| item.value).collect();
assert!(!values.is_empty(), "no data received via etcd discovery");
Ok(())
});
recv_node
.run(RunMode::RealTime, RunFor::Duration(Duration::from_secs(1)))
.unwrap();
}
#[test]
fn zmq_pub_etcd_lease_revoked_on_stop() {
_ = env_logger::try_init();
let (_container, conn) = start_etcd().unwrap();
let port = 5598u16;
let conn_clone = conn.clone();
let handle = std::thread::spawn(move || {
ticker(Duration::from_millis(50))
.count()
.zmq_pub(port, ("etcd-lease-key", EtcdRegistry::new(conn_clone)))
.run(
RunMode::RealTime,
RunFor::Duration(Duration::from_millis(200)),
)
});
handle.join().unwrap().unwrap();
std::thread::sleep(Duration::from_millis(200));
let val = read_key(&conn, "etcd-lease-key").unwrap();
assert!(
val.is_none(),
"key should be gone after publisher stop, got: {val:?}"
);
}
#[test]
fn zmq_pub_etcd_historical_mode_fails() {
use crate::NanoTime;
let conn = EtcdConnection::new("http://127.0.0.1:59999");
let result = ticker(Duration::from_millis(10))
.count()
.zmq_pub(5599, ("test-hist", EtcdRegistry::new(conn)))
.run(RunMode::HistoricalFrom(NanoTime::ZERO), RunFor::Forever);
let err = result.expect_err("expected historical mode to fail");
assert!(
format!("{err:?}").contains("real-time"),
"expected error to mention real-time"
);
}
}