use super::*;
use crate::nodes::{NodeOperators, StreamOperators};
use crate::{Burst, Graph, RunFor, RunMode};
use std::time::Duration;
use testcontainers::{
GenericImage, ImageExt,
core::{Mount, WaitFor},
runners::SyncRunner,
};
const DRIVER_IMAGE: &str = "neomantra/aeron-cpp-debian";
const DRIVER_TAG: &str = "latest";
pub(crate) const AERON_CHANNEL: &str = "aeron:ipc";
pub(crate) const CONNECT_TIMEOUT: Duration = Duration::from_secs(10);
const AERON_DIR: &str = "/dev/shm/aeron-integration-test";
pub(crate) fn start_media_driver() -> anyhow::Result<impl Drop> {
unsafe { std::env::set_var("AERON_DIR", AERON_DIR) };
let _ = std::fs::remove_dir_all(AERON_DIR);
let uid = unsafe { libc::getuid() };
let gid = unsafe { libc::getgid() };
let container = GenericImage::new(DRIVER_IMAGE, DRIVER_TAG)
.with_wait_for(WaitFor::seconds(1))
.with_mount(Mount::bind_mount("/dev/shm", "/dev/shm"))
.with_env_var("AERON_DIR", AERON_DIR)
.with_user(format!("{uid}:{gid}"))
.start()?;
let cnc = std::path::Path::new(AERON_DIR).join("cnc.dat");
let deadline = std::time::Instant::now() + Duration::from_secs(10);
while !cnc.exists() {
anyhow::ensure!(
std::time::Instant::now() < deadline,
"timed out waiting for aeronmd CNC file: {}",
cnc.display()
);
std::thread::sleep(Duration::from_millis(100));
}
std::thread::sleep(Duration::from_millis(500));
Ok(container)
}
#[test]
fn test_no_driver_connection_fails() {
const NO_DRIVER_DIR: &str = "/tmp/aeron-no-driver-test";
let _ = std::fs::remove_dir_all(NO_DRIVER_DIR);
unsafe { std::env::set_var("AERON_DIR", NO_DRIVER_DIR) };
let result = AeronHandle::connect();
assert!(
result.is_err(),
"expected connection error when no media driver is running"
);
}
#[cfg(feature = "aeron")]
#[test]
fn test_try_claim_zero_copy_roundtrip() -> anyhow::Result<()> {
use crate::adapters::aeron::transport::{AeronPublisherBackend, AeronSubscriberBackend};
let _container = start_media_driver()?;
let handle = AeronHandle::connect()?;
let stream_id = 2005i32;
let mut sub = handle.subscription(AERON_CHANNEL, stream_id, CONNECT_TIMEOUT)?;
let mut pub_ = handle.publication(AERON_CHANNEL, stream_id, CONNECT_TIMEOUT)?;
let mut claim = pub_.try_claim(64).expect("try_claim succeeds");
assert_eq!(claim.len(), 64);
for b in claim.data().iter_mut() {
*b = 0xAB;
}
claim.commit().expect("commit succeeds");
let mut received: Option<Vec<u8>> = None;
let deadline = std::time::Instant::now() + Duration::from_secs(2);
while std::time::Instant::now() < deadline && received.is_none() {
let _ = sub.poll(&mut |bytes: &[u8]| {
received = Some(bytes.to_vec());
});
if received.is_none() {
std::thread::sleep(Duration::from_millis(10));
}
}
let bytes = received.expect("subscription observes committed claim within 2s");
assert_eq!(bytes.len(), 64);
assert!(
bytes.iter().all(|b| *b == 0xAB),
"all 64 bytes equal 0xAB: {bytes:?}"
);
Ok(())
}
#[cfg(feature = "aeron")]
fn wait_for_pub_connected<P>(pub_: &P, timeout: Duration) -> anyhow::Result<()>
where
P: crate::adapters::aeron::transport::AeronPublisherBackend,
{
let deadline = std::time::Instant::now() + timeout;
while !pub_.is_connected() {
anyhow::ensure!(
std::time::Instant::now() < deadline,
"publication did not connect within {timeout:?}"
);
std::thread::sleep(Duration::from_millis(5));
}
Ok(())
}
#[cfg(feature = "aeron")]
#[test]
fn test_try_claim_back_pressure_returns_typed_error() -> anyhow::Result<()> {
use crate::adapters::aeron::transport::AeronPublisherBackend;
let _container = start_media_driver()?;
let handle = AeronHandle::connect()?;
let stream_id = 2006i32;
let channel = "aeron:ipc?term-length=65536";
let _sub = handle.subscription(channel, stream_id, CONNECT_TIMEOUT)?;
let mut pub_ = handle.publication(channel, stream_id, CONNECT_TIMEOUT)?;
wait_for_pub_connected(&pub_, CONNECT_TIMEOUT)?;
let mut back_pressure_seen = false;
for _ in 0..200 {
match pub_.try_claim(1024) {
Ok(claim) => {
claim.commit().expect("commit succeeds");
}
Err(TransportError::BackPressure) => {
back_pressure_seen = true;
break;
}
Err(other) => panic!("unexpected error from try_claim: {other:?}"),
}
}
assert!(
back_pressure_seen,
"back-pressure never observed within 200 iterations on term-length=65536"
);
Ok(())
}
#[cfg(feature = "aeron")]
#[test]
fn test_try_claim_drop_without_commit_aborts() -> anyhow::Result<()> {
use crate::adapters::aeron::transport::AeronPublisherBackend;
let _container = start_media_driver()?;
let handle = AeronHandle::connect()?;
let stream_id = 2007i32;
let _sub = handle.subscription(AERON_CHANNEL, stream_id, CONNECT_TIMEOUT)?;
let mut pub_ = handle.publication(AERON_CHANNEL, stream_id, CONNECT_TIMEOUT)?;
wait_for_pub_connected(&pub_, CONNECT_TIMEOUT)?;
let claim = pub_.try_claim(64).expect("first try_claim succeeds");
drop(claim);
let claim2 = pub_.try_claim(64).expect("second try_claim succeeds");
claim2.commit().expect("commit succeeds");
Ok(())
}
#[cfg(feature = "aeron-rs-beta")]
#[test]
fn test_aeron_rs_spin_roundtrip() -> anyhow::Result<()> {
let _container = start_media_driver()?;
let handle = AeronRsHandle::connect()?;
let stream_id = 3001i32;
let sub = handle.subscription(AERON_CHANNEL, stream_id, CONNECT_TIMEOUT)?;
let pub_ = handle.publication(AERON_CHANNEL, stream_id, CONNECT_TIMEOUT)?;
let parser = |frag: &FragmentBuffer<'_>| -> Result<Option<i64>, TransportError> {
Ok(frag.as_ref().try_into().ok().map(i64::from_le_bytes))
};
let subscriber = aeron_sub_fragment(sub, parser, AeronSubOptions::default());
let collected = subscriber.collect();
let source = crate::nodes::ticker(Duration::from_millis(10))
.count()
.map(|_| {
let mut b = Burst::new();
b.push(77i64);
b
});
let pub_node = source.aeron_pub(pub_, |v: &i64| v.to_le_bytes().to_vec());
Graph::new(
vec![collected.clone().as_node(), pub_node],
RunMode::RealTime,
RunFor::Duration(Duration::from_secs(2)),
)
.run()
.ok();
let values: Vec<i64> = collected
.peek_value()
.into_iter()
.flat_map(|b| b.value)
.collect();
assert!(
values.contains(&77i64),
"expected 77 via aeron-rs backend, got: {values:?}"
);
Ok(())
}
#[cfg(feature = "aeron")]
#[test]
fn test_fragment_limit_caps_burst_size() -> anyhow::Result<()> {
use crate::adapters::aeron::transport::{AeronPublisherBackend, AeronSubscriberBackend};
let _container = start_media_driver()?;
let handle = AeronHandle::connect()?;
let stream_id = 2008i32;
let mut sub = handle
.subscription(AERON_CHANNEL, stream_id, CONNECT_TIMEOUT)?
.with_fragment_limit(32);
let mut pub_ = handle.publication(AERON_CHANNEL, stream_id, CONNECT_TIMEOUT)?;
let mut published = 0usize;
let pub_deadline = std::time::Instant::now() + Duration::from_secs(10);
while published < 1000 {
anyhow::ensure!(
std::time::Instant::now() < pub_deadline,
"timed out publishing at {published} of 1000"
);
let bytes = (published as i64).to_le_bytes();
match pub_.offer(&bytes) {
Ok(()) => published += 1,
Err(_) => std::thread::sleep(Duration::from_micros(100)),
}
}
let mut total = 0usize;
let mut saw_capped_burst = false;
let deadline = std::time::Instant::now() + Duration::from_secs(10);
while total < 1000 {
anyhow::ensure!(
std::time::Instant::now() < deadline,
"timed out at {total} of 1000"
);
let count = sub.poll(&mut |_fragment: &[u8]| ())?;
assert!(
count <= 32,
"fragment_limit=32 cap violated: poll returned {count}"
);
if count > 0 {
saw_capped_burst = true;
}
total += count;
}
assert_eq!(total, 1000, "expected exactly 1000 fragments");
assert!(
saw_capped_burst,
"expected at least one non-zero poll within the 32-fragment cap"
);
Ok(())
}
#[cfg(feature = "aeron")]
#[test]
fn test_spin_sub_burst_single_message_roundtrip() -> anyhow::Result<()> {
let _container = start_media_driver()?;
let handle = AeronHandle::connect()?;
let stream_id = 2009i32;
let sub = handle.subscription(AERON_CHANNEL, stream_id, CONNECT_TIMEOUT)?;
let pub_ = handle.publication(AERON_CHANNEL, stream_id, CONNECT_TIMEOUT)?;
let parser = |frag: &FragmentBuffer<'_>| -> Result<Option<i64>, TransportError> {
Ok(frag.as_ref().try_into().ok().map(i64::from_le_bytes))
};
let subscriber = aeron_sub_fragment(sub, parser, AeronSubOptions::default());
let collected = subscriber.collect();
let source = crate::nodes::ticker(Duration::from_millis(10))
.count()
.map(|_| {
let mut b = Burst::new();
b.push(42i64);
b
});
let pub_node = source.aeron_pub(pub_, |v: &i64| v.to_le_bytes().to_vec());
Graph::new(
vec![collected.clone().as_node(), pub_node],
RunMode::RealTime,
RunFor::Duration(Duration::from_secs(2)),
)
.run()
.ok();
let values: Vec<i64> = collected
.peek_value()
.into_iter()
.flat_map(|b| b.value)
.collect();
assert!(
values.contains(&42i64),
"expected to receive 42, got: {values:?}"
);
Ok(())
}
#[cfg(feature = "aeron")]
#[test]
fn test_spin_sub_burst_typed_accumulation() -> anyhow::Result<()> {
let _container = start_media_driver()?;
let handle = AeronHandle::connect()?;
let stream_id = 2010i32;
let sub = handle.subscription(AERON_CHANNEL, stream_id, CONNECT_TIMEOUT)?;
let pub_ = handle.publication(AERON_CHANNEL, stream_id, CONNECT_TIMEOUT)?;
let parser = |frag: &FragmentBuffer<'_>| -> Result<Option<i64>, TransportError> {
Ok(frag.as_ref().try_into().ok().map(i64::from_le_bytes))
};
let subscriber = aeron_sub_fragment(
sub,
parser,
AeronSubOptions {
mode: AeronMode::Spin,
fragment_limit: 256,
},
);
let collected = subscriber.collect();
let source = crate::nodes::ticker(Duration::from_millis(10))
.count()
.map(|n| {
let mut burst = Burst::new();
for i in 0..15 {
burst.push(n as i64 * 15 + i);
}
burst
});
let pub_node = source.aeron_pub(pub_, |v: &i64| v.to_le_bytes().to_vec());
Graph::new(
vec![collected.clone().as_node(), pub_node],
RunMode::RealTime,
RunFor::Duration(Duration::from_secs(2)),
)
.run()
.ok();
let total: usize = collected.peek_value().iter().map(|b| b.value.len()).sum();
assert!(
total >= 10,
"expected at least 10 accumulated values, got: {total}"
);
Ok(())
}
#[cfg(feature = "aeron")]
#[test]
fn test_threaded_sub_burst_accumulates_across_channel_drain() -> anyhow::Result<()> {
let _container = start_media_driver()?;
let handle = AeronHandle::connect()?;
let stream_id = 2011i32;
let sub = handle.subscription(AERON_CHANNEL, stream_id, CONNECT_TIMEOUT)?;
let pub_ = handle.publication(AERON_CHANNEL, stream_id, CONNECT_TIMEOUT)?;
let parser = |frag: &FragmentBuffer<'_>| -> Result<Option<i64>, TransportError> {
Ok(frag.as_ref().try_into().ok().map(i64::from_le_bytes))
};
let subscriber = aeron_sub_fragment(
sub,
parser,
AeronSubOptions {
mode: AeronMode::Threaded,
fragment_limit: 256,
},
);
let collected = subscriber.collect();
let source = crate::nodes::ticker(Duration::from_millis(20))
.count()
.map(|_| {
let mut b = Burst::new();
b.push(99i64);
b
});
let pub_node = source.aeron_pub(pub_, |v: &i64| v.to_le_bytes().to_vec());
Graph::new(
vec![collected.clone().as_node(), pub_node],
RunMode::RealTime,
RunFor::Duration(Duration::from_secs(2)),
)
.run()
.ok();
let values: Vec<i64> = collected
.peek_value()
.into_iter()
.flat_map(|b| b.value)
.collect();
assert!(
!values.is_empty(),
"expected at least one value via threaded burst subscriber"
);
Ok(())
}
#[cfg(feature = "aeron")]
#[test]
fn test_collapse_yields_latest_value() -> anyhow::Result<()> {
let _container = start_media_driver()?;
let handle = AeronHandle::connect()?;
let stream_id = 2012i32;
let sub = handle.subscription(AERON_CHANNEL, stream_id, CONNECT_TIMEOUT)?;
let pub_ = handle.publication(AERON_CHANNEL, stream_id, CONNECT_TIMEOUT)?;
let parser = |frag: &FragmentBuffer<'_>| -> Result<Option<i64>, TransportError> {
Ok(frag.as_ref().try_into().ok().map(i64::from_le_bytes))
};
let subscriber = aeron_sub_fragment(sub, parser, AeronSubOptions::default());
let collapsed = subscriber.collapse();
let collected = collapsed.collect();
let source = crate::nodes::ticker(Duration::from_millis(10))
.count()
.map(|n| {
let mut burst = Burst::new();
for i in 0..10 {
burst.push(n as i64 * 10 + i);
}
burst
});
let pub_node = source.aeron_pub(pub_, |v: &i64| v.to_le_bytes().to_vec());
Graph::new(
vec![collected.clone().as_node(), pub_node],
RunMode::RealTime,
RunFor::Duration(Duration::from_secs(1)),
)
.run()
.ok();
let values: Vec<i64> = collected
.peek_value()
.into_iter()
.map(|v| v.value)
.collect();
assert!(
!values.is_empty(),
"expected at least one collapsed value, got: {values:?}"
);
assert!(
values.iter().any(|v| *v % 10 == 9),
"expected at least one tail-of-burst value (== 9 mod 10), got: {values:?}"
);
Ok(())
}
#[cfg(feature = "aeron")]
#[test]
fn test_burst_parser_sees_fragment_header_with_real_position() -> anyhow::Result<()> {
use std::sync::Arc;
use std::sync::atomic::{AtomicI64, Ordering};
let _container = start_media_driver()?;
let handle = AeronHandle::connect()?;
let stream_id = 2013i32;
let sub = handle.subscription(AERON_CHANNEL, stream_id, CONNECT_TIMEOUT)?;
let pub_ = handle.publication(AERON_CHANNEL, stream_id, CONNECT_TIMEOUT)?;
let captured_position = Arc::new(AtomicI64::new(0));
let captured_for_parser = captured_position.clone();
let parser = move |frag: &FragmentBuffer<'_>| -> Result<Option<i64>, TransportError> {
let _ = captured_for_parser.compare_exchange(
0,
frag.position(),
Ordering::SeqCst,
Ordering::SeqCst,
);
Ok(Some(frag.position()))
};
let subscriber = aeron_sub_fragment(sub, parser, AeronSubOptions::default());
let collected = subscriber.collect();
let source = crate::nodes::ticker(Duration::from_millis(50))
.count()
.map(|_| {
let mut b = Burst::new();
b.push(1i64);
b
});
let pub_node = source.aeron_pub(pub_, |v: &i64| v.to_le_bytes().to_vec());
Graph::new(
vec![collected.clone().as_node(), pub_node],
RunMode::RealTime,
RunFor::Duration(Duration::from_secs(2)),
)
.run()
.ok();
let observed = captured_position.load(Ordering::SeqCst);
assert!(
observed > 0,
"expected non-zero fragment position from rusteron override, got: {observed}"
);
Ok(())
}
#[cfg(feature = "aeron")]
#[test]
fn test_status_stream_emits_on_connect() -> anyhow::Result<()> {
use std::cell::RefCell;
use std::rc::Rc;
let _container = start_media_driver()?;
let handle = AeronHandle::connect()?;
let stream_id = 2014i32;
let sub = handle.subscription(AERON_CHANNEL, stream_id, CONNECT_TIMEOUT)?;
let pub_ = handle.publication(AERON_CHANNEL, stream_id, CONNECT_TIMEOUT)?;
wait_for_pub_connected(&pub_, CONNECT_TIMEOUT)?;
let parser = |frag: &FragmentBuffer<'_>| -> Result<Option<i64>, TransportError> {
Ok(frag.as_ref().try_into().ok().map(i64::from_le_bytes))
};
let (data, status_stream) =
aeron_sub_fragment_with_status(sub, parser, AeronSubOptions::default());
let observed: Rc<RefCell<Vec<AeronStatus>>> = Rc::new(RefCell::new(Vec::new()));
let observed_inspect = Rc::clone(&observed);
let inspected = status_stream.clone().inspect(move |burst| {
for s in burst.iter() {
observed_inspect.borrow_mut().push(*s);
}
});
let source = crate::nodes::ticker(Duration::from_millis(50))
.count()
.map(|_| {
let mut b = Burst::new();
b.push(7i64);
b
});
let pub_node = source.aeron_pub(pub_, |v: &i64| v.to_le_bytes().to_vec());
Graph::new(
vec![data.as_node(), inspected.as_node(), pub_node],
RunMode::RealTime,
RunFor::Duration(Duration::from_secs(2)),
)
.run()
.ok();
let observed = observed.borrow();
assert!(
observed.contains(&AeronStatus::Connected),
"expected status stream to record AeronStatus::Connected, got: {observed:?}"
);
Ok(())
}
#[cfg(feature = "aeron")]
#[test]
fn test_status_stream_no_emission_when_steady() -> anyhow::Result<()> {
use std::cell::RefCell;
use std::rc::Rc;
let _container = start_media_driver()?;
let handle = AeronHandle::connect()?;
let stream_id = 2015i32;
let sub = handle.subscription(AERON_CHANNEL, stream_id, CONNECT_TIMEOUT)?;
let pub_ = handle.publication(AERON_CHANNEL, stream_id, CONNECT_TIMEOUT)?;
let parser = |frag: &FragmentBuffer<'_>| -> Result<Option<i64>, TransportError> {
Ok(frag.as_ref().try_into().ok().map(i64::from_le_bytes))
};
let (_data, status_stream) =
aeron_sub_fragment_with_status(sub, parser, AeronSubOptions::default());
let observed: Rc<RefCell<Vec<AeronStatus>>> = Rc::new(RefCell::new(Vec::new()));
let observed_inspect = Rc::clone(&observed);
let inspected = status_stream.clone().inspect(move |burst| {
for s in burst.iter() {
observed_inspect.borrow_mut().push(*s);
}
});
let source = crate::nodes::ticker(Duration::from_millis(20))
.count()
.map(|_| {
let mut b = Burst::new();
b.push(7i64);
b
});
let pub_node = source.aeron_pub(pub_, |v: &i64| v.to_le_bytes().to_vec());
Graph::new(
vec![inspected.as_node(), pub_node],
RunMode::RealTime,
RunFor::Duration(Duration::from_secs(2)),
)
.run()
.ok();
let observed = observed.borrow();
assert!(
observed.len() <= 1,
"expected ≤1 transition under steady state, observed: {observed:?}"
);
Ok(())
}
#[cfg(feature = "aeron")]
#[test]
fn test_threaded_status_stream_emits_on_connect() -> anyhow::Result<()> {
use std::cell::RefCell;
use std::rc::Rc;
let _container = start_media_driver()?;
let handle = AeronHandle::connect()?;
let stream_id = 2025i32;
let sub = handle.subscription(AERON_CHANNEL, stream_id, CONNECT_TIMEOUT)?;
let pub_ = handle.publication(AERON_CHANNEL, stream_id, CONNECT_TIMEOUT)?;
wait_for_pub_connected(&pub_, CONNECT_TIMEOUT)?;
let parser = |frag: &FragmentBuffer<'_>| -> Result<Option<i64>, TransportError> {
Ok(frag.as_ref().try_into().ok().map(i64::from_le_bytes))
};
let (data, status_stream) = aeron_sub_fragment_with_status(
sub,
parser,
AeronSubOptions {
mode: AeronMode::Threaded,
fragment_limit: 256,
},
);
let observed: Rc<RefCell<Vec<AeronStatus>>> = Rc::new(RefCell::new(Vec::new()));
let observed_inspect = Rc::clone(&observed);
let inspected = status_stream.clone().inspect(move |burst| {
for s in burst.iter() {
observed_inspect.borrow_mut().push(*s);
}
});
let source = crate::nodes::ticker(Duration::from_millis(50))
.count()
.map(|_| {
let mut b = Burst::new();
b.push(7i64);
b
});
let pub_node = source.aeron_pub(pub_, |v: &i64| v.to_le_bytes().to_vec());
Graph::new(
vec![data.as_node(), inspected.as_node(), pub_node],
RunMode::RealTime,
RunFor::Duration(Duration::from_secs(2)),
)
.run()
.ok();
let observed = observed.borrow();
assert!(
observed.contains(&AeronStatus::Connected),
"expected threaded status stream to record AeronStatus::Connected, got: {observed:?}"
);
Ok(())
}
#[cfg(all(feature = "aeron", feature = "aeron-integration-test"))]
#[test]
fn test_publisher_status_emits_back_pressure() -> anyhow::Result<()> {
use std::cell::RefCell;
use std::rc::Rc;
let _container = start_media_driver()?;
let handle = AeronHandle::connect()?;
let stream_id = 2016i32;
let channel = "aeron:ipc?term-length=65536";
let _sub = handle.subscription(channel, stream_id, CONNECT_TIMEOUT)?;
let pub_ = handle.publication(channel, stream_id, CONNECT_TIMEOUT)?;
wait_for_pub_connected(&pub_, CONNECT_TIMEOUT)?;
let payload: Vec<u8> = vec![0xABu8; 4 * 1024];
let source = crate::nodes::ticker(Duration::from_millis(1))
.count()
.map(move |_| {
let mut b = Burst::new();
b.push(payload.clone());
b
});
let (pub_node, status_stream) = source.aeron_pub_with_status(pub_, |v: &Vec<u8>| v.clone());
let observed: Rc<RefCell<Vec<AeronStatus>>> = Rc::new(RefCell::new(Vec::new()));
let observed_inspect = Rc::clone(&observed);
let inspected = status_stream.clone().inspect(move |burst| {
for s in burst.iter() {
observed_inspect.borrow_mut().push(*s);
}
});
Graph::new(
vec![inspected.as_node(), pub_node],
RunMode::RealTime,
RunFor::Duration(Duration::from_secs(3)),
)
.run()
.ok();
let observed = observed.borrow();
assert!(
observed.contains(&AeronStatus::BackPressured),
"expected publisher status to record BackPressured under saturation, got: {observed:?}"
);
Ok(())
}
#[cfg(feature = "aeron")]
#[test]
fn test_publisher_no_dedup_publishes_every_burst_item() -> anyhow::Result<()> {
let _container = start_media_driver()?;
let handle = AeronHandle::connect()?;
let stream_id = 2019i32;
let sub = handle.subscription(AERON_CHANNEL, stream_id, CONNECT_TIMEOUT)?;
let pub_ = handle.publication(AERON_CHANNEL, stream_id, CONNECT_TIMEOUT)?;
let parser = |frag: &FragmentBuffer<'_>| -> Result<Option<i64>, TransportError> {
Ok(frag.as_ref().try_into().ok().map(i64::from_le_bytes))
};
let subscriber = aeron_sub_fragment(sub, parser, AeronSubOptions::default());
let collected = subscriber.collect();
let source = crate::nodes::ticker(Duration::from_millis(50))
.count()
.map(|n| {
let mut b = Burst::new();
b.push(n as i64);
b
});
let pub_node = source.aeron_pub(pub_, |v: &i64| v.to_le_bytes().to_vec());
Graph::new(
vec![collected.clone().as_node(), pub_node],
RunMode::RealTime,
RunFor::Duration(Duration::from_secs(2)),
)
.run()
.ok();
let values: Vec<i64> = collected
.peek_value()
.into_iter()
.flat_map(|b| b.value)
.collect();
let unique: std::collections::HashSet<i64> = values.iter().copied().collect();
assert!(
unique.len() >= 5,
"expected at least 5 distinct values via plain aeron_pub, got {} unique: {values:?}",
unique.len()
);
Ok(())
}
#[cfg(feature = "aeron")]
#[test]
fn test_channel_uri_ipc_roundtrip() -> anyhow::Result<()> {
let _container = start_media_driver()?;
let handle = AeronHandle::connect()?;
let stream_id = 2020i32;
let channel = ChannelUri::ipc();
let sub = handle.subscription(&channel, stream_id, CONNECT_TIMEOUT)?;
let pub_ = handle.publication(&channel, stream_id, CONNECT_TIMEOUT)?;
let parser = |frag: &FragmentBuffer<'_>| -> Result<Option<i64>, TransportError> {
Ok(frag.as_ref().try_into().ok().map(i64::from_le_bytes))
};
let subscriber = aeron_sub_fragment(sub, parser, AeronSubOptions::default());
let collected = subscriber.collect();
let source = crate::nodes::ticker(Duration::from_millis(10))
.count()
.map(|_| {
let mut b = Burst::new();
b.push(7i64);
b
});
let pub_node = source.aeron_pub(pub_, |v: &i64| v.to_le_bytes().to_vec());
Graph::new(
vec![collected.clone().as_node(), pub_node],
RunMode::RealTime,
RunFor::Duration(Duration::from_secs(2)),
)
.run()
.ok();
let values: Vec<i64> = collected
.peek_value()
.into_iter()
.flat_map(|b| b.value)
.collect();
assert!(
values.contains(&7i64),
"ChannelUri::ipc() round-trip lost message: {values:?}"
);
Ok(())
}
#[cfg(feature = "aeron")]
#[test]
fn test_channel_uri_mdc_roundtrip() -> anyhow::Result<()> {
let _container = start_media_driver()?;
let handle = AeronHandle::connect()?;
let stream_id = 2021i32;
let pub_channel = ChannelUri::mdc_publication("127.0.0.1:40456")?;
let sub_channel = ChannelUri::mdc_subscription("127.0.0.1:40457", "127.0.0.1:40456")?;
let sub = handle.subscription(&sub_channel, stream_id, CONNECT_TIMEOUT)?;
let pub_ = handle.publication(&pub_channel, stream_id, CONNECT_TIMEOUT)?;
wait_for_pub_connected(&pub_, CONNECT_TIMEOUT)?;
let parser = |frag: &FragmentBuffer<'_>| -> Result<Option<i64>, TransportError> {
Ok(frag.as_ref().try_into().ok().map(i64::from_le_bytes))
};
let subscriber = aeron_sub_fragment(sub, parser, AeronSubOptions::default());
let collected = subscriber.collect();
let source = crate::nodes::ticker(Duration::from_millis(10))
.count()
.map(|_| {
let mut b = Burst::new();
b.push(99i64);
b
});
let pub_node = source.aeron_pub(pub_, |v: &i64| v.to_le_bytes().to_vec());
Graph::new(
vec![collected.clone().as_node(), pub_node],
RunMode::RealTime,
RunFor::Duration(Duration::from_secs(2)),
)
.run()
.ok();
let values: Vec<i64> = collected
.peek_value()
.into_iter()
.flat_map(|b| b.value)
.collect();
assert!(
values.contains(&99i64),
"ChannelUri::mdc_* round-trip lost message: {values:?}"
);
Ok(())
}