#![cfg(all(test, feature = "iceoryx2-integration-test"))]
use super::*;
use crate::nodes::{NodeOperators, StreamOperators};
use crate::types::{Burst, IntoNode};
use crate::{Graph, RunFor, RunMode, ticker};
use iceoryx2::port::update_connections::UpdateConnections;
use iceoryx2::prelude::ZeroCopySend;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::time::Duration;
#[repr(C)]
#[derive(Debug, Clone, Copy, Default, ZeroCopySend, PartialEq)]
struct TestData {
value: u64,
}
fn unique_service_name(prefix: &str) -> String {
static COUNTER: AtomicUsize = AtomicUsize::new(0);
let n = COUNTER.fetch_add(1, Ordering::Relaxed);
format!(
"wingfoil/test/integration/{prefix}/{}/{n}",
std::process::id()
)
}
struct IpcPublisherUpdateConnectionsNode {
publisher: iceoryx2::port::publisher::Publisher<iceoryx2::prelude::ipc::Service, TestData, ()>,
cycles: u64,
}
impl crate::MutableNode for IpcPublisherUpdateConnectionsNode {
fn cycle(&mut self, _state: &mut crate::GraphState) -> anyhow::Result<bool> {
self.cycles += 1;
let _ = self.publisher.update_connections();
std::thread::sleep(Duration::from_millis(5));
Ok(false)
}
fn start(&mut self, state: &mut crate::GraphState) -> anyhow::Result<()> {
state.always_callback();
let _ = self.publisher.update_connections();
Ok(())
}
}
#[test]
fn test_late_joiner_with_history() -> anyhow::Result<()> {
let service_name = unique_service_name("history");
let node = iceoryx2::prelude::NodeBuilder::new().create::<iceoryx2::prelude::ipc::Service>()?;
let service = node
.service_builder(&service_name.as_str().try_into()?)
.publish_subscribe::<TestData>()
.history_size(5)
.subscriber_max_buffer_size(16)
.open_or_create()?;
let publisher = service.publisher_builder().create()?;
publisher.update_connections()?;
for i in 0..3 {
let sample = publisher.loan_uninit()?;
sample.write_payload(TestData { value: i as u64 }).send()?;
}
let sub = iceoryx2_sub_with::<TestData>(&service_name, Iceoryx2ServiceVariant::Ipc);
let collected = sub.fold(Box::new(|acc: &mut Vec<TestData>, burst| {
for item in burst {
acc.push(item);
}
}));
let publisher_update = IpcPublisherUpdateConnectionsNode {
publisher,
cycles: 0,
}
.into_node();
Graph::new(
vec![publisher_update, collected.clone().as_node()],
RunMode::RealTime,
RunFor::Duration(Duration::from_millis(300)),
)
.run()?;
let values = collected.peek_value();
assert_eq!(values.len(), 3, "expected 3 samples from history");
assert_eq!(values[0].value, 0);
assert_eq!(values[1].value, 1);
assert_eq!(values[2].value, 2);
Ok(())
}
#[test]
fn test_ipc_round_trip() -> anyhow::Result<()> {
let service_name = unique_service_name("ipc");
let sub = iceoryx2_sub::<TestData>(&service_name);
let collected = sub.collapse().collect();
let upstream = ticker(Duration::from_millis(10)).produce(|| {
let mut b = Burst::default();
b.push(TestData { value: 12345 });
b
});
let pub_node = iceoryx2_pub(upstream, &service_name);
Graph::new(
vec![pub_node, collected.clone().as_node()],
RunMode::RealTime,
RunFor::Duration(Duration::from_millis(200)),
)
.run()?;
let values = collected.peek_value();
assert!(!values.is_empty());
assert!(values.iter().all(|v| v.value.value == 12345));
Ok(())
}