#![cfg(feature = "inproc")]
mod helpers;
use rustzmq2::__async_rt as async_rt;
use rustzmq2::prelude::*;
use std::error::Error;
#[cfg(test)]
mod test {
use super::*;
#[async_rt::test]
async fn test_inproc_req_rep() -> Result<(), Box<dyn Error>> {
pretty_env_logger::try_init().ok();
let mut rep = rustzmq2::RepSocket::new();
rep.bind("inproc://test-req-rep").await?;
let num_messages = 10;
async_rt::task::spawn(async move {
let mut req = rustzmq2::ReqSocket::new();
req.connect("inproc://test-req-rep").await.unwrap();
helpers::run_req_client(req, num_messages).await.unwrap();
});
helpers::run_rep_server(rep, num_messages).await?;
Ok(())
}
#[async_rt::test]
async fn test_inproc_pub_sub() -> Result<(), Box<dyn Error>> {
pretty_env_logger::try_init().ok();
let mut pub_socket = rustzmq2::PubSocket::new();
pub_socket.bind("inproc://test-pub-sub").await?;
let num_subscribers = 3usize;
let num_messages = 5usize;
let (tx, rx) = futures::channel::mpsc::channel::<usize>(num_subscribers);
for i in 0..num_subscribers {
let mut result_tx = tx.clone();
async_rt::task::spawn(async move {
let mut sub = rustzmq2::SubSocket::new();
sub.connect("inproc://test-pub-sub").await.unwrap();
sub.subscribe("").await.unwrap();
let mut count = 0;
while count < num_messages {
sub.recv().await.unwrap();
count += 1;
}
result_tx.try_send(count).unwrap();
let _ = i;
});
}
drop(tx);
async_rt::task::sleep(std::time::Duration::from_millis(100)).await;
for i in 0..num_messages {
pub_socket.send(format!("message {i}")).await?;
}
use futures::StreamExt;
let results: Vec<usize> = rx.collect().await;
assert_eq!(results.len(), num_subscribers);
for count in results {
assert_eq!(count, num_messages);
}
Ok(())
}
#[async_rt::test]
async fn test_inproc_pair() -> Result<(), Box<dyn Error>> {
pretty_env_logger::try_init().ok();
let mut server = rustzmq2::PairSocket::new();
server.bind("inproc://test-pair").await?;
let num_messages = 20u32;
async_rt::task::spawn(async move {
let mut client = rustzmq2::PairSocket::new();
client.connect("inproc://test-pair").await.unwrap();
for i in 0..num_messages {
client.send(format!("msg {i}")).await.unwrap();
}
});
for i in 0..num_messages {
let msg = server.recv().await?;
let s = String::from_utf8(msg.get(0).unwrap().to_vec()).unwrap();
assert_eq!(s, format!("msg {i}"));
}
Ok(())
}
#[async_rt::test]
async fn test_inproc_address_reuse_after_unbind() -> Result<(), Box<dyn Error>> {
pretty_env_logger::try_init().ok();
let mut server = rustzmq2::RepSocket::new();
let endpoint = server.bind("inproc://test-reuse").await?;
server.unbind(endpoint).await?;
let mut server2 = rustzmq2::RepSocket::new();
server2.bind("inproc://test-reuse").await?;
Ok(())
}
#[async_rt::test]
async fn test_inproc_rejects_incompatible_socket_types() {
pretty_env_logger::try_init().ok();
let mut pub_sock = rustzmq2::PubSocket::new();
let bound = pub_sock
.bind("inproc://test-incompat")
.await
.unwrap()
.to_string();
let mut req = rustzmq2::ReqSocket::new();
let err = req
.connect(&bound)
.await
.expect_err("REQ→PUB should be rejected");
let msg = err.to_string();
assert!(
msg.contains("socket type not compatible"),
"unexpected error: {msg}"
);
let mut sub = rustzmq2::SubSocket::new();
sub.connect(&bound).await.expect("SUB→PUB should succeed");
}
#[async_rt::test]
async fn test_inproc_router_honors_dealer_routing_id() -> Result<(), Box<dyn Error>> {
pretty_env_logger::try_init().ok();
use rustzmq2::PeerIdentity;
use std::str::FromStr;
let mut router = rustzmq2::RouterSocket::new();
router.bind("inproc://test-router-id").await?;
let mut dealer = rustzmq2::DealerSocket::builder()
.peer_identity(PeerIdentity::from_str("worker-7")?)
.build();
dealer.connect("inproc://test-router-id").await?;
tokio::task::yield_now().await;
dealer.send("hello").await?;
let msg = router.recv().await?;
let identity = msg.get(0).expect("identity frame");
let payload = msg.get(1).expect("payload frame");
assert_eq!(
identity.as_ref(),
b"worker-7",
"ROUTER should see dealer's configured routing id, got {:?}",
identity.as_ref()
);
assert_eq!(payload.as_ref(), b"hello");
Ok(())
}
#[async_rt::test]
async fn test_inproc_hello_msg_delivered() -> Result<(), Box<dyn Error>> {
pretty_env_logger::try_init().ok();
let mut pub_sock = rustzmq2::PubSocket::builder()
.hello_msg("welcome-aboard")
.build();
pub_sock.bind("inproc://test-hello").await?;
let mut sub = rustzmq2::SubSocket::new();
sub.subscribe("").await?;
sub.connect("inproc://test-hello").await?;
let msg = sub.recv().await?;
assert_eq!(
msg.get(0).unwrap().as_ref(),
b"welcome-aboard",
"SUB's first recv over inproc should be hello_msg"
);
Ok(())
}
#[async_rt::test]
async fn test_inproc_connect_before_bind() -> Result<(), Box<dyn Error>> {
pretty_env_logger::try_init().ok();
let client_task = async_rt::task::spawn(async move {
let mut req = rustzmq2::ReqSocket::new();
req.connect("inproc://test-pending").await.unwrap();
req.send("ping").await.unwrap();
let reply = req.recv().await.unwrap();
assert_eq!(reply.get(0).unwrap().as_ref(), b"pong");
});
async_rt::task::sleep(std::time::Duration::from_millis(50)).await;
let mut rep = rustzmq2::RepSocket::new();
rep.bind("inproc://test-pending").await?;
let msg = rep.recv().await?;
assert_eq!(msg.get(0).unwrap().as_ref(), b"ping");
rep.send("pong").await?;
client_task.await.ok();
Ok(())
}
#[cfg(feature = "curve")]
#[async_rt::test]
async fn test_inproc_ignored_options_emit_monitor_event() -> Result<(), Box<dyn Error>> {
use futures::StreamExt;
pretty_env_logger::try_init().ok();
let pk = [0u8; 32];
let sk = [1u8; 32];
let mut rep = rustzmq2::RepSocket::builder().curve_server(pk, sk).build();
let mut events = rep.monitor();
rep.bind("inproc://test-ignored-opts").await?;
let per_event = std::time::Duration::from_millis(50);
let mut collected: Vec<&'static str> = Vec::new();
loop {
match async_rt::task::timeout(per_event, events.next()).await {
Ok(Some(rustzmq2::SocketEvent::OptionIgnoredOnTransport { option, .. })) => {
collected.push(option);
}
Ok(Some(_)) => {}
Ok(None) | Err(_) => break,
}
}
assert!(
collected.contains(&"curve_*"),
"expected curve_* in ignored events, got {:?}",
collected
);
assert!(
collected.contains(&"mechanism"),
"expected mechanism in ignored events, got {:?}",
collected
);
Ok(())
}
}