use std::time::Duration;
use hyperi_rustlib::transport::grpc::{GrpcConfig, GrpcTransport};
use hyperi_rustlib::transport::{SendResult, TransportBase, TransportReceiver, TransportSender};
async fn find_available_port() -> u16 {
let listener = tokio::net::TcpListener::bind("127.0.0.1:0")
.await
.expect("failed to bind to ephemeral port");
listener.local_addr().unwrap().port()
}
async fn create_pair(port: u16) -> (GrpcTransport, GrpcTransport) {
let addr = format!("127.0.0.1:{port}");
let server_config = GrpcConfig::server(&addr);
let server = GrpcTransport::new(&server_config)
.await
.expect("failed to create server");
tokio::time::sleep(Duration::from_millis(100)).await;
let client_config = GrpcConfig::client(&format!("http://{addr}"));
let client = GrpcTransport::new(&client_config)
.await
.expect("failed to create client");
(server, client)
}
#[tokio::test]
async fn test_send_and_receive() {
let port = find_available_port().await;
let (server, client) = create_pair(port).await;
let result = client.send("test-topic", b"hello world").await;
assert!(
matches!(result, SendResult::Ok),
"send should succeed: {result:?}"
);
tokio::time::sleep(Duration::from_millis(50)).await;
let messages = server.recv(10).await.expect("recv should succeed");
assert_eq!(messages.len(), 1, "should receive exactly one message");
assert_eq!(messages[0].payload, b"hello world");
assert_eq!(messages[0].key.as_deref(), Some("test-topic"));
let _ = client.close().await;
let _ = server.close().await;
}
#[tokio::test]
async fn test_multiple_messages() {
let port = find_available_port().await;
let (server, client) = create_pair(port).await;
for i in 0..10u32 {
let payload = format!("message-{i}");
let result = client.send("topic", payload.as_bytes()).await;
assert!(
matches!(result, SendResult::Ok),
"send {i} should succeed: {result:?}"
);
}
tokio::time::sleep(Duration::from_millis(100)).await;
let messages = server.recv(100).await.expect("recv should succeed");
assert_eq!(messages.len(), 10, "should receive all 10 messages");
for (i, msg) in messages.iter().enumerate() {
let expected = format!("message-{i}");
assert_eq!(
msg.payload,
expected.as_bytes(),
"message {i} payload mismatch"
);
}
let _ = client.close().await;
let _ = server.close().await;
}
#[tokio::test]
async fn test_large_payload() {
let port = find_available_port().await;
let (server, client) = create_pair(port).await;
let payload = vec![0xABu8; 1024 * 1024];
let result = client.send("large", &payload).await;
assert!(
matches!(result, SendResult::Ok),
"large send should succeed: {result:?}"
);
tokio::time::sleep(Duration::from_millis(100)).await;
let messages = server.recv(10).await.expect("recv should succeed");
assert_eq!(messages.len(), 1, "should receive the large message");
assert_eq!(messages[0].payload.len(), 1024 * 1024);
assert!(
messages[0].payload.iter().all(|&b| b == 0xAB),
"payload should be intact"
);
let _ = client.close().await;
let _ = server.close().await;
}
#[tokio::test]
async fn test_commit_is_noop() {
let port = find_available_port().await;
let (server, client) = create_pair(port).await;
let _ = client.send("topic", b"data").await;
tokio::time::sleep(Duration::from_millis(50)).await;
let messages = server.recv(10).await.expect("recv should succeed");
assert!(!messages.is_empty());
let tokens: Vec<_> = messages.iter().map(|m| m.token.clone()).collect();
let result = server.commit(&tokens).await;
assert!(result.is_ok(), "commit should succeed (no-op): {result:?}");
let _ = client.close().await;
let _ = server.close().await;
}
#[tokio::test]
async fn test_close_prevents_operations() {
let port = find_available_port().await;
let (server, client) = create_pair(port).await;
client.close().await.expect("close should succeed");
let result = client.send("topic", b"data").await;
assert!(
matches!(result, SendResult::Fatal(_)),
"send after close should fail: {result:?}"
);
server.close().await.expect("close should succeed");
let _ = client.close().await;
let _ = server.close().await;
}
#[tokio::test]
async fn test_health_check() {
let port = find_available_port().await;
let (server, client) = create_pair(port).await;
assert!(server.is_healthy(), "server should be healthy before close");
assert!(client.is_healthy(), "client should be healthy before close");
server.close().await.expect("close should succeed");
assert!(
!server.is_healthy(),
"server should not be healthy after close"
);
client.close().await.expect("close should succeed");
assert!(
!client.is_healthy(),
"client should not be healthy after close"
);
}
#[tokio::test]
async fn test_compression() {
let port = find_available_port().await;
let addr = format!("127.0.0.1:{port}");
let server_config = GrpcConfig::server(&addr).with_compression();
let server = GrpcTransport::new(&server_config)
.await
.expect("failed to create compressed server");
tokio::time::sleep(Duration::from_millis(100)).await;
let client_config = GrpcConfig::client(&format!("http://{addr}")).with_compression();
let client = GrpcTransport::new(&client_config)
.await
.expect("failed to create compressed client");
let payload = b"compressed payload test data";
let result = client.send("compressed", payload).await;
assert!(
matches!(result, SendResult::Ok),
"compressed send should succeed: {result:?}"
);
tokio::time::sleep(Duration::from_millis(50)).await;
let messages = server.recv(10).await.expect("recv should succeed");
assert_eq!(messages.len(), 1);
assert_eq!(messages[0].payload, payload);
let _ = client.close().await;
let _ = server.close().await;
}
#[tokio::test]
async fn test_recv_timeout_returns_empty() {
let port = find_available_port().await;
let addr = format!("127.0.0.1:{port}");
let mut server_config = GrpcConfig::server(&addr);
server_config.recv_timeout_ms = 50;
let server = GrpcTransport::new(&server_config)
.await
.expect("failed to create server");
tokio::time::sleep(Duration::from_millis(100)).await;
let messages = server.recv(10).await.expect("recv should succeed");
assert!(
messages.is_empty(),
"recv with no messages should return empty, got {} messages",
messages.len()
);
let _ = server.close().await;
}