#![cfg(all(feature = "unstable", feature = "shared-memory",))]
mod common;
use std::{
sync::{
atomic::{AtomicBool, AtomicUsize, Ordering},
Arc,
},
time::Duration,
};
use zenoh::{
qos::{CongestionControl, Reliability},
shm::{
AllocAlignment, BlockOn, GarbageCollect, MemoryLayout, PosixShmProviderBackend,
ShmProviderBuilder,
},
Session, Wait,
};
use zenoh_buffers::ZBuf;
use zenoh_core::ztimeout;
use zenoh_shm::api::buffer::traits::OwnedShmBuf;
use crate::common::TestSessions;
const TIMEOUT: Duration = Duration::from_secs(60);
const SLEEP: Duration = Duration::from_secs(1);
const MSG_COUNT: usize = 1_00;
const MSG_SIZE: [usize; 2] = [1_024, 100_000];
async fn setup_shm_unicast<const NO_SHM_FOR_SECOND_PEER: bool>(
test_context: &mut TestSessions,
) -> (Session, Session) {
let mut config = test_context.get_listener_config("tcp/127.0.0.1:0", 1);
config
.transport
.shared_memory
.transport_optimization
.set_message_size_threshold(1)
.unwrap();
let peer01 = test_context.open_listener_with_cfg(config).await;
let mut config = test_context.get_connector_config();
config
.transport
.shared_memory
.transport_optimization
.set_message_size_threshold(1)
.unwrap();
config
.transport
.shared_memory
.set_enabled(!NO_SHM_FOR_SECOND_PEER)
.unwrap();
let peer02 = test_context.open_connector_with_cfg(config).await;
(peer01, peer02)
}
async fn setup_shm_multicast(test_context: &mut TestSessions) -> (Session, Session) {
let mut config = test_context.get_listener_config("udp/224.0.0.1:0", 1);
config
.transport
.shared_memory
.transport_optimization
.set_message_size_threshold(1)
.unwrap();
let peer01 = test_context.open_listener_with_cfg(config).await;
let locator = peer01
.info()
.locators()
.await
.into_iter()
.find(|l| l.protocol().as_str() == "udp")
.expect("Expected at least one UDP locator")
.to_string();
let mut config = test_context.get_listener_config(&locator, 1);
config
.transport
.shared_memory
.transport_optimization
.set_message_size_threshold(1)
.unwrap();
let peer02 = test_context.open_listener_with_cfg(config).await;
(peer01, peer02)
}
#[derive(Clone, Copy, PartialEq, Eq)]
enum ResizeBuffer {
NoResize,
Resize,
Relayout,
NonSHM,
}
async fn test_session_pubsub<const NO_SHM_FOR_SECOND_PEER: bool>(
peer01: &Session,
peer02: &Session,
reliability: Reliability,
resize_buffer: ResizeBuffer,
) {
let msg_count = match reliability {
Reliability::Reliable => MSG_COUNT,
Reliability::BestEffort => 1,
};
let msgs = Arc::new(AtomicUsize::new(0));
for size in MSG_SIZE {
let key_expr = format!("shm{size}");
msgs.store(0, Ordering::SeqCst);
println!("[PS][01b] Subscribing on peer01 session");
let c_msgs = msgs.clone();
let _sub = ztimeout!(peer01
.declare_subscriber(&key_expr)
.callback(move |sample| {
let expected_size = match resize_buffer {
ResizeBuffer::NoResize => size,
ResizeBuffer::Resize => size / 2,
ResizeBuffer::Relayout => size / 2,
ResizeBuffer::NonSHM => size,
};
let len = sample.payload().len();
assert_eq!(len, expected_size);
if NO_SHM_FOR_SECOND_PEER {
assert!(sample.payload().as_shm().is_none());
} else {
assert!(sample.payload().as_shm().is_some());
}
c_msgs.fetch_add(1, Ordering::Relaxed);
}))
.unwrap();
tokio::time::sleep(SLEEP).await;
let backend = PosixShmProviderBackend::builder(size * MSG_COUNT / 10)
.wait()
.unwrap();
let shm01 = ShmProviderBuilder::backend(backend).wait();
let shm_segment_size = shm01.available();
let layout = shm01.alloc_layout(size).unwrap();
println!("[PS][03b] Putting on peer02 session. {MSG_COUNT} msgs of {size} bytes.");
for c in 0..msg_count {
let mut sbuf =
ztimeout!(layout.alloc().with_policy::<BlockOn<GarbageCollect>>()).unwrap();
match resize_buffer {
ResizeBuffer::Relayout => sbuf
.try_relayout(MemoryLayout::new(size / 2, AllocAlignment::default()).unwrap())
.unwrap(),
ResizeBuffer::Resize => sbuf.try_resize((size / 2).try_into().unwrap()).unwrap(),
ResizeBuffer::NoResize => {}
ResizeBuffer::NonSHM => {}
}
println!("{c} created");
let buf = if resize_buffer == ResizeBuffer::NonSHM {
let r = sbuf.as_ref().to_vec();
ZBuf::from(r)
} else {
ZBuf::from(sbuf)
};
ztimeout!(peer02
.put(&key_expr, buf)
.congestion_control(CongestionControl::Block))
.unwrap();
println!("{c} putted");
}
ztimeout!(async {
loop {
let cnt = msgs.load(Ordering::Relaxed);
println!("[PS][03b] Received {cnt}/{msg_count}.");
if cnt != msg_count {
tokio::time::sleep(SLEEP).await;
} else {
break;
}
}
});
ztimeout!(async {
loop {
shm01.garbage_collect();
let available = shm01.available();
println!("[PS][03b] SHM available {available}/{shm_segment_size}");
if available != shm_segment_size {
tokio::time::sleep(SLEEP).await;
} else {
break;
}
}
});
}
}
#[test]
fn zenoh_shm_startup_init() {
let mut config = zenoh::Config::default();
config
.transport
.shared_memory
.set_mode(zenoh_config::ShmInitMode::Init)
.unwrap();
tokio::runtime::Runtime::new().unwrap().block_on(async {
let _session = ztimeout!(zenoh::open(config)).unwrap();
});
}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn zenoh_shm_unicast() {
zenoh::init_log_from_env_or("error");
let mut test_context = TestSessions::new();
let (peer01, peer02) = setup_shm_unicast::<false>(&mut test_context).await;
test_session_pubsub::<false>(
&peer01,
&peer02,
Reliability::Reliable,
ResizeBuffer::NoResize,
)
.await;
test_context.close().await;
}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn zenoh_shm_unicast_to_non_shm() {
zenoh::init_log_from_env_or("error");
let mut test_context = TestSessions::new();
let (peer01, peer02) = setup_shm_unicast::<true>(&mut test_context).await;
test_session_pubsub::<true>(
&peer01,
&peer02,
Reliability::Reliable,
ResizeBuffer::NoResize,
)
.await;
test_context.close().await;
}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn zenoh_shm_unicast_implicit_optimization() {
zenoh::init_log_from_env_or("error");
let mut test_context = TestSessions::new();
let (peer01, peer02) = setup_shm_unicast::<false>(&mut test_context).await;
{
let key = "warmup";
let shm_works = Arc::new(AtomicBool::new(false));
let c_shm_works = shm_works.clone();
let _sub = peer01
.declare_subscriber(key)
.callback(move |sample| {
if sample.payload().as_shm().is_some() {
c_shm_works.store(true, Ordering::Relaxed);
}
})
.wait()
.unwrap();
while !shm_works.load(Ordering::Relaxed) {
peer02.put(key, "test").wait().unwrap();
tokio::time::sleep(Duration::from_millis(100)).await;
}
}
test_session_pubsub::<false>(
&peer01,
&peer02,
Reliability::Reliable,
ResizeBuffer::NonSHM,
)
.await;
test_context.close().await;
}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn zenoh_shm_unicast_with_buffer_shrink_relayout() {
zenoh::init_log_from_env_or("error");
let mut test_context = TestSessions::new();
let (peer01, peer02) = setup_shm_unicast::<false>(&mut test_context).await;
test_session_pubsub::<false>(
&peer01,
&peer02,
Reliability::Reliable,
ResizeBuffer::Relayout,
)
.await;
test_context.close().await;
}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn zenoh_shm_unicast_with_buffer_shrink_resize() {
zenoh::init_log_from_env_or("error");
let mut test_context = TestSessions::new();
let (peer01, peer02) = setup_shm_unicast::<false>(&mut test_context).await;
test_session_pubsub::<false>(
&peer01,
&peer02,
Reliability::Reliable,
ResizeBuffer::Resize,
)
.await;
test_context.close().await;
}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn zenoh_shm_unicast_with_buffer_shrink_to_non_shm_relayout() {
zenoh::init_log_from_env_or("error");
let mut test_context = TestSessions::new();
let (peer01, peer02) = setup_shm_unicast::<true>(&mut test_context).await;
test_session_pubsub::<true>(
&peer01,
&peer02,
Reliability::Reliable,
ResizeBuffer::Relayout,
)
.await;
test_context.close().await;
}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn zenoh_shm_unicast_with_buffer_shrink_to_non_shm_resize() {
zenoh::init_log_from_env_or("error");
let mut test_context = TestSessions::new();
let (peer01, peer02) = setup_shm_unicast::<true>(&mut test_context).await;
test_session_pubsub::<true>(
&peer01,
&peer02,
Reliability::Reliable,
ResizeBuffer::Resize,
)
.await;
test_context.close().await;
}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn zenoh_shm_multicast() {
zenoh::init_log_from_env_or("error");
let mut test_context = TestSessions::new();
let (peer01, peer02) = setup_shm_multicast(&mut test_context).await;
test_session_pubsub::<false>(
&peer01,
&peer02,
Reliability::BestEffort,
ResizeBuffer::NoResize,
)
.await;
test_context.close().await;
}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn zenoh_shm_multicast_with_buffer_shrink_relayout() {
zenoh::init_log_from_env_or("error");
let mut test_context = TestSessions::new();
let (peer01, peer02) = setup_shm_multicast(&mut test_context).await;
test_session_pubsub::<false>(
&peer01,
&peer02,
Reliability::BestEffort,
ResizeBuffer::Relayout,
)
.await;
test_context.close().await;
}