mod common;
use rzmq::{
Socket, SocketType, ZmqError,
socket::{LINGER, PLAIN_PASSWORD, PLAIN_SERVER, PLAIN_USERNAME, SocketEvent},
};
use serial_test::serial; use std::time::Duration;
use tokio::{task::JoinHandle, time::timeout};
const TEST_TIMEOUT: Duration = Duration::from_secs(5);
async fn setup_server(
ctx: &rzmq::Context,
bind_addr: &str,
enable_plain: bool,
expected_username: Option<&str>,
expected_password: Option<&str>,
) -> Result<Socket, ZmqError> {
let server = ctx.socket(SocketType::Rep)?;
if enable_plain {
server.set_option(PLAIN_SERVER, true).await?;
if let Some(user) = expected_username {
server.set_option(PLAIN_USERNAME, user).await?;
}
if let Some(pass) = expected_password {
server.set_option(PLAIN_PASSWORD, pass).await?;
}
}
server.set_option(LINGER, 0).await?;
server.bind(bind_addr).await?;
Ok(server)
}
async fn setup_client(
ctx: &rzmq::Context,
connect_addr: &str,
enable_plain: bool,
username: Option<&str>,
password: Option<&str>,
) -> Result<(Socket, fibre::mpmc::AsyncReceiver<SocketEvent>), ZmqError> {
let client = ctx.socket(SocketType::Req)?;
if enable_plain {
let user_to_set = username.unwrap_or("");
let pass_to_set = password.unwrap_or("");
client
.set_option_raw(PLAIN_USERNAME, user_to_set.as_bytes())
.await?;
client
.set_option_raw(PLAIN_PASSWORD, pass_to_set.as_bytes())
.await?;
}
client.set_option_raw(LINGER, &0u32.to_ne_bytes()).await?;
let client_monitor = client.monitor_default().await.unwrap();
client.connect(connect_addr).await?;
Ok((client, client_monitor))
}
#[tokio::test]
#[serial]
async fn test_plain_successful_handshake() {
let ctx = common::test_context();
let addr = "tcp://127.0.0.1:15501";
let server_socket = setup_server(&ctx, addr, true, Some("user"), Some("pass"))
.await
.expect("Server setup failed");
let (client_socket, client_monitor) = setup_client(&ctx, addr, true, Some("user"), Some("pass"))
.await
.expect("Client setup failed");
let mut handshake_succeeded = false;
tokio::spawn(async move {
let req = server_socket.recv().await.expect("Server failed to recv");
assert_eq!(req.data().unwrap(), b"hello");
server_socket
.send(rzmq::Msg::from_static(b"world"))
.await
.expect("Server failed to send reply");
});
client_socket
.send(rzmq::Msg::from_static(b"hello"))
.await
.expect("Client failed to send request");
let reply = client_socket
.recv()
.await
.expect("Client failed to recv reply");
assert_eq!(reply.data().unwrap(), b"world");
loop {
match timeout(Duration::from_millis(200), client_monitor.recv()).await {
Ok(Ok(SocketEvent::HandshakeSucceeded { endpoint })) => {
if endpoint.contains("15501") {
handshake_succeeded = true;
break;
}
}
Ok(Ok(event)) => {
tracing::debug!("Client monitor event: {:?}", event);
}
Ok(Err(_)) => {
break;
}
Err(_) => {
break;
}
}
}
assert!(
handshake_succeeded,
"Client did not receive HandshakeSucceeded event for PLAIN"
);
client_socket.close().await.unwrap();
ctx.term().await.unwrap();
}
#[tokio::test]
#[serial]
async fn test_plain_client_no_credentials() {
let ctx = common::test_context();
let addr = "tcp://127.0.0.1:15504";
let server_socket = setup_server(&ctx, addr, true, Some("admin"), Some("secret")).await.expect("Server setup failed");
tokio::spawn(async move {
let _ = server_socket.recv().await;
});
let (client_socket, client_monitor) = setup_client(&ctx, addr, true, None, None).await.expect("Client setup failed");
let _ = client_socket.send(rzmq::Msg::from_static(b"empty_req")).await;
let mut failure_observed = false;
let deadline = tokio::time::Instant::now() + Duration::from_secs(2);
loop {
if tokio::time::Instant::now() > deadline { break; }
match timeout(Duration::from_millis(200), client_monitor.recv()).await {
Ok(Ok(event)) => {
if matches!(event, SocketEvent::HandshakeFailed { .. } | SocketEvent::Disconnected { .. }) {
failure_observed = true;
break;
}
}
Ok(Ok(_)) => {}, _ => {}
}
}
assert!(
failure_observed,
"Client with no credentials should have been rejected by server expecting 'admin'."
);
client_socket.close().await.unwrap();
ctx.term().await.unwrap();
}
#[tokio::test]
async fn test_plain_security_enforcement() -> Result<(), ZmqError> {
println!("--- Starting test_plain_security_enforcement ---");
let ctx = common::test_context();
let endpoint = "tcp://127.0.0.1:5998";
let server = ctx.socket(SocketType::Rep)?;
server.set_option(PLAIN_SERVER, true).await?;
server.set_option(PLAIN_USERNAME, "admin").await?;
server.set_option(PLAIN_PASSWORD, "secret").await?;
server.bind(endpoint).await?;
let client = ctx.socket(SocketType::Req)?;
client.set_option(PLAIN_USERNAME, "hacker").await?;
client.set_option(PLAIN_PASSWORD, "wrongpass").await?;
let mut monitor = client.monitor_default().await?;
client.connect(endpoint).await?;
let result = common::wait_for_monitor_event(
&monitor,
Duration::from_secs(2),
Duration::from_millis(50),
|e| {
matches!(
e,
SocketEvent::HandshakeFailed { .. } | SocketEvent::Disconnected { .. }
)
},
)
.await;
match result {
Ok(_) => {
println!("Success: Connection correctly rejected.");
Ok(())
}
Err(_) => {
panic!(
"Regression: PLAIN auth did not report failure for bad credentials. (Server likely accepted connection)."
);
}
}
}
#[tokio::test]
#[serial]
async fn test_plain_client_to_null_server_fails() {
let ctx = common::test_context();
let addr = "tcp://127.0.0.1:15502";
let server_socket = setup_server(&ctx, addr, false, None, None)
.await
.expect("Server setup failed");
let server_task = tokio::spawn(async move {
let _ = server_socket.recv().await; server_socket.close().await.ok();
});
let client_setup_result = setup_client(&ctx, addr, true, Some("user"), Some("pass")).await;
if let Ok((client_socket, mut client_monitor)) = client_setup_result {
let mut failure_event_received = false;
for _ in 0..10 {
match timeout(Duration::from_millis(500), client_monitor.recv()).await {
Ok(Ok(event)) => {
tracing::info!(
"[Test Monitor - PLAIN Client to NULL Server] Event: {:?}",
event
);
match event {
SocketEvent::HandshakeFailed { .. }
| SocketEvent::ConnectFailed { .. }
| SocketEvent::Disconnected { .. }
| SocketEvent::Closed { .. } => {
failure_event_received = true;
break;
}
_ => {}
}
}
Ok(Err(_)) => {
tracing::info!("[Test Monitor - PLAIN Client to NULL Server] Monitor closed.");
failure_event_received = true;
break;
} Err(_) => {
tracing::info!("[Test Monitor - PLAIN Client to NULL Server] Timeout.");
} }
if failure_event_received {
break;
}
}
assert!(
failure_event_received,
"PLAIN client to NULL server did not result in a failure-related monitor event."
);
client_socket.close().await.ok();
} else {
tracing::info!(
"Client setup/connect failed as expected for PLAIN client to NULL server: {:?}",
client_setup_result.as_ref().err().unwrap()
);
assert!(client_setup_result.is_err());
}
server_task.abort(); ctx.term().await.unwrap();
}
#[tokio::test]
#[serial]
async fn test_null_client_to_plain_server_fails() {
let ctx = common::test_context();
let addr = "tcp://127.0.0.1:15503";
let server_socket = setup_server(&ctx, addr, true, Some("u"), Some("p"))
.await
.expect("Server setup failed");
let _server_task: JoinHandle<()> = tokio::spawn(async move {
match server_socket.recv().await {
Ok(_) => tracing::info!("[Test Server {}] Unexpectedly received a message.", addr),
Err(e) => tracing::info!(
"[Test Server {}] Recv ended (expected for failed handshake): {}",
addr,
e
),
}
tracing::info!("[Test Server {}] Exiting task.", addr);
});
let client_setup_result = setup_client(&ctx, addr, false, None, None).await;
assert!(
client_setup_result.is_ok(),
"Client setup failed unexpectedly: {:?}",
client_setup_result.err()
);
let (client_socket, client_monitor) = client_setup_result.unwrap();
let client_task: JoinHandle<Result<(), ZmqError>> = tokio::spawn(async move {
let send_res = client_socket.send(rzmq::Msg::from_static(b"hello")).await;
if send_res.is_ok() {
println!("OK (client task)"); let recv_res = client_socket.recv().await;
println!("Client task recv result: {:?}", recv_res);
if recv_res.is_ok() {
println!("(client task) - recv succeeded unexpectedly");
return Err(ZmqError::Internal(
"Client recv() succeeded when handshake should have failed".into(),
));
} else {
match recv_res.as_ref().err().unwrap() {
ZmqError::ConnectionClosed
| ZmqError::HostUnreachable(_)
| ZmqError::SecurityError(_)
| ZmqError::ResourceLimitReached => {
println!(
"Client task recv() failed as expected: {:?}",
recv_res.as_ref().err().unwrap()
);
}
e => {
println!(
"NOOKSEND (client task) - recv failed with unexpected error: {}",
e
);
return Err(e.clone()); }
}
}
} else {
println!(
"NOOKSEND (client task) - send failed directly: {:?}",
send_res.as_ref().err().unwrap()
);
match send_res.as_ref().err().unwrap() {
ZmqError::SecurityError(_)
| ZmqError::HostUnreachable(_)
| ZmqError::ResourceLimitReached => {
println!("Client task send() failed as expected due to handshake issue.");
}
e => return Err(e.clone()), }
}
let close_res = client_socket.close().await;
println!("Client task: client_socket.close() result: {:?}", close_res);
Ok(())
});
let mut handshake_failed_event_observed = false;
let mut connect_failed_event_observed = false;
let mut disconnected_event_observed = false;
println!("Main test: Waiting for failure indication on client monitor...");
let monitor_check_deadline = tokio::time::Instant::now() + Duration::from_secs(3);
loop {
if tokio::time::Instant::now() > monitor_check_deadline {
tracing::warn!("Main test: Timeout waiting for specific failure monitor event.");
break;
}
match timeout(Duration::from_millis(200), client_monitor.recv()).await {
Ok(Ok(event)) => {
tracing::debug!("Main test: Client monitor event: {:?}", event);
match event {
SocketEvent::HandshakeFailed { .. } => {
handshake_failed_event_observed = true;
println!("Main test: Monitor observed HandshakeFailed.");
break;
}
SocketEvent::ConnectFailed { .. } => {
connect_failed_event_observed = true;
println!("Main test: Monitor observed ConnectFailed.");
break;
}
SocketEvent::Disconnected { .. } => {
disconnected_event_observed = true;
println!("Main test: Monitor observed Disconnected.");
break;
}
_ => {} }
}
Ok(Err(e)) => {
tracing::warn!("Main test: Monitor channel closed or error: {:?}", e);
break;
}
Err(_) => {} }
}
assert!(
handshake_failed_event_observed || connect_failed_event_observed || disconnected_event_observed,
"NULL client to PLAIN server did not result in HandshakeFailed, ConnectFailed, or Disconnected monitor event"
);
println!("Main test: Proceeding to ctx.term()...");
ctx.term().await.unwrap(); println!("Main test: ctx.term() completed.");
println!("Main test: Waiting for client_task to join...");
match timeout(Duration::from_secs(2), client_task).await {
Ok(Ok(Ok(()))) => {
println!(
"Main test: Client task completed successfully (meaning it handled send/recv failure correctly)."
)
}
Ok(Ok(Err(e))) => panic!("Main test: Client task failed with ZmqError: {}", e),
Ok(Err(e)) => panic!("Main test: Client task panicked: {:?}", e),
Err(_) => panic!("Main test: Timed out waiting for client task to join!"),
}
println!("Test test_null_client_to_plain_server_fails finished.");
}