use std::time::Duration;
use std::sync::Arc;
use hyperi_rustlib::transport::grpc::{GrpcConfig, GrpcTransport};
use hyperi_rustlib::transport::{
PayloadFormat, Record, RecordMeta, SendResult, TransportBase, TransportReceiver,
TransportSender,
};
async fn find_available_port() -> u16 {
let listener = tokio::net::TcpListener::bind("127.0.0.1:0")
.await
.expect("failed to bind to ephemeral port");
listener.local_addr().unwrap().port()
}
#[tokio::test]
async fn test_close_frees_port_and_is_idempotent() {
let port = find_available_port().await;
let addr = format!("127.0.0.1:{port}");
let server = GrpcTransport::new(&GrpcConfig::server(&addr))
.await
.expect("first server should bind");
server.close().await.expect("first close");
server.close().await.expect("close is idempotent");
let mut rebound = None;
for _ in 0..40 {
match GrpcTransport::new(&GrpcConfig::server(&addr)).await {
Ok(s) => {
rebound = Some(s);
break;
}
Err(_) => tokio::time::sleep(Duration::from_millis(50)).await,
}
}
assert!(
rebound.is_some(),
"port {port} not freed within 2s after close() -- the listener did not stop"
);
}
async fn create_pair(port: u16) -> (GrpcTransport, GrpcTransport) {
let addr = format!("127.0.0.1:{port}");
let server_config = GrpcConfig::server(&addr);
let server = GrpcTransport::new(&server_config)
.await
.expect("failed to create server");
let client_config = GrpcConfig::client(&format!("http://{addr}"));
let client = GrpcTransport::new(&client_config)
.await
.expect("failed to create client");
(server, client)
}
#[tokio::test]
async fn test_send_and_receive() {
let port = find_available_port().await;
let (server, client) = create_pair(port).await;
let result = client
.send("test-topic", bytes::Bytes::from_static(b"hello world"))
.await;
assert!(
matches!(result, SendResult::Ok),
"send should succeed: {result:?}"
);
tokio::time::sleep(Duration::from_millis(50)).await;
let records = server.recv(10).await.expect("recv should succeed").records;
assert_eq!(records.len(), 1, "should receive exactly one record");
assert_eq!(records[0].payload.as_ref(), b"hello world");
assert_eq!(records[0].key.as_deref(), Some("test-topic"));
let _ = client.close().await;
let _ = server.close().await;
}
#[tokio::test]
async fn test_multiple_messages() {
let port = find_available_port().await;
let (server, client) = create_pair(port).await;
for i in 0..10u32 {
let payload = format!("message-{i}");
let result = client.send("topic", bytes::Bytes::from(payload)).await;
assert!(
matches!(result, SendResult::Ok),
"send {i} should succeed: {result:?}"
);
}
tokio::time::sleep(Duration::from_millis(100)).await;
let records = server.recv(100).await.expect("recv should succeed").records;
assert_eq!(records.len(), 10, "should receive all 10 records");
for (i, record) in records.iter().enumerate() {
let expected = format!("message-{i}");
assert_eq!(
record.payload,
expected.as_bytes(),
"record {i} payload mismatch"
);
}
let _ = client.close().await;
let _ = server.close().await;
}
#[tokio::test]
async fn test_large_payload() {
let port = find_available_port().await;
let (server, client) = create_pair(port).await;
let payload = vec![0xABu8; 1024 * 1024];
let result = client.send("large", bytes::Bytes::from(payload)).await;
assert!(
matches!(result, SendResult::Ok),
"large send should succeed: {result:?}"
);
tokio::time::sleep(Duration::from_millis(100)).await;
let records = server.recv(10).await.expect("recv should succeed").records;
assert_eq!(records.len(), 1, "should receive the large record");
assert_eq!(records[0].payload.len(), 1024 * 1024);
assert!(
records[0].payload.iter().all(|&b| b == 0xAB),
"payload should be intact"
);
let _ = client.close().await;
let _ = server.close().await;
}
#[tokio::test]
async fn test_commit_is_noop() {
let port = find_available_port().await;
let (server, client) = create_pair(port).await;
let _ = client
.send("topic", bytes::Bytes::from_static(b"data"))
.await;
tokio::time::sleep(Duration::from_millis(50)).await;
let batch = server.recv(10).await.expect("recv should succeed");
assert!(!batch.records.is_empty());
let result = server.commit(&batch.commit_tokens).await;
assert!(result.is_ok(), "commit should succeed (no-op): {result:?}");
let _ = client.close().await;
let _ = server.close().await;
}
#[tokio::test]
async fn test_close_prevents_operations() {
let port = find_available_port().await;
let (server, client) = create_pair(port).await;
client.close().await.expect("close should succeed");
let result = client
.send("topic", bytes::Bytes::from_static(b"data"))
.await;
assert!(
matches!(result, SendResult::Fatal(_)),
"send after close should fail: {result:?}"
);
server.close().await.expect("close should succeed");
let _ = client.close().await;
let _ = server.close().await;
}
#[tokio::test]
async fn test_health_check() {
let port = find_available_port().await;
let (server, client) = create_pair(port).await;
assert!(server.is_healthy(), "server should be healthy before close");
assert!(client.is_healthy(), "client should be healthy before close");
server.close().await.expect("close should succeed");
assert!(
!server.is_healthy(),
"server should not be healthy after close"
);
client.close().await.expect("close should succeed");
assert!(
!client.is_healthy(),
"client should not be healthy after close"
);
}
#[tokio::test]
async fn test_compression() {
let port = find_available_port().await;
let addr = format!("127.0.0.1:{port}");
let server_config = GrpcConfig::server(&addr).with_compression();
let server = GrpcTransport::new(&server_config)
.await
.expect("failed to create compressed server");
let client_config = GrpcConfig::client(&format!("http://{addr}")).with_compression();
let client = GrpcTransport::new(&client_config)
.await
.expect("failed to create compressed client");
let payload = b"compressed payload test data";
let result = client
.send("compressed", bytes::Bytes::from_static(payload))
.await;
assert!(
matches!(result, SendResult::Ok),
"compressed send should succeed: {result:?}"
);
tokio::time::sleep(Duration::from_millis(50)).await;
let records = server.recv(10).await.expect("recv should succeed").records;
assert_eq!(records.len(), 1);
assert_eq!(records[0].payload.as_ref(), payload);
let _ = client.close().await;
let _ = server.close().await;
}
#[tokio::test]
async fn test_route_batch_native_transport() {
let port = find_available_port().await;
let (server, client) = create_pair(port).await;
let records = vec![
Record {
payload: bytes::Bytes::from_static(b"{\"a\":1}"),
key: Some(Arc::from("events")),
headers: vec![("trace".to_string(), b"abc".to_vec())],
metadata: RecordMeta {
timestamp_ms: Some(1_717_000_000_000),
format: PayloadFormat::Json,
},
},
Record {
payload: bytes::Bytes::from_static(&[0x00, 0xff, 0xfe, 0x80, 0x01]),
key: None,
headers: Vec::new(),
metadata: RecordMeta {
timestamp_ms: None,
format: PayloadFormat::Auto,
},
},
Record {
payload: bytes::Bytes::from_static(&[0x81, 0xa1, b'k', 0x07]),
key: Some(Arc::from("metrics")),
headers: Vec::new(),
metadata: RecordMeta {
timestamp_ms: Some(42),
format: PayloadFormat::MsgPack,
},
},
];
let result = client.send_batch(&records).await;
assert!(
matches!(result, SendResult::Ok),
"send_batch should succeed: {result:?}"
);
tokio::time::sleep(Duration::from_millis(50)).await;
let received = server.recv(100).await.expect("recv should succeed").records;
assert_eq!(received.len(), 3, "should receive all 3 batch records");
assert_eq!(received[0].payload.as_ref(), b"{\"a\":1}");
assert_eq!(received[0].key.as_deref(), Some("events"));
assert_eq!(
received[1].payload.as_ref(),
&[0x00, 0xff, 0xfe, 0x80, 0x01]
);
assert_eq!(received[1].key, None);
assert_eq!(received[2].payload.as_ref(), &[0x81, 0xa1, b'k', 0x07]);
assert_eq!(received[2].key.as_deref(), Some("metrics"));
let _ = client.close().await;
let _ = server.close().await;
}
async fn create_pair_with_capacity(port: u16, capacity: usize) -> (GrpcTransport, GrpcTransport) {
let addr = format!("127.0.0.1:{port}");
let mut server_config = GrpcConfig::server(&addr);
server_config.recv_buffer_size = capacity;
let server = GrpcTransport::new(&server_config)
.await
.expect("failed to create server");
let client_config = GrpcConfig::client(&format!("http://{addr}"));
let client = GrpcTransport::new(&client_config)
.await
.expect("failed to create client");
(server, client)
}
#[tokio::test]
async fn test_route_batch_is_atomic_under_capacity() {
let port = find_available_port().await;
let (server, client) = create_pair_with_capacity(port, 1).await;
let records = vec![
Record {
payload: bytes::Bytes::from_static(b"{\"r\":0}"),
key: Some(Arc::from("events")),
headers: Vec::new(),
metadata: RecordMeta {
timestamp_ms: None,
format: PayloadFormat::Json,
},
},
Record {
payload: bytes::Bytes::from_static(b"{\"r\":1}"),
key: Some(Arc::from("events")),
headers: Vec::new(),
metadata: RecordMeta {
timestamp_ms: None,
format: PayloadFormat::Json,
},
},
];
let result = client.send_batch(&records).await;
assert!(
matches!(result, SendResult::Backpressured),
"over-capacity batch must surface as backpressure, got {result:?}"
);
tokio::time::sleep(Duration::from_millis(50)).await;
let received = server.recv(10).await.expect("recv should succeed").records;
assert_eq!(
received.len(),
0,
"atomic batch must accept 0 records on rejection, got {} (partial acceptance)",
received.len()
);
let _ = client.close().await;
let _ = server.close().await;
}
#[tokio::test]
async fn test_route_batch_fits_capacity_accepts_all() {
let port = find_available_port().await;
let (server, client) = create_pair_with_capacity(port, 2).await;
let records = vec![
Record {
payload: bytes::Bytes::from_static(b"{\"r\":0}"),
key: Some(Arc::from("events")),
headers: Vec::new(),
metadata: RecordMeta {
timestamp_ms: None,
format: PayloadFormat::Json,
},
},
Record {
payload: bytes::Bytes::from_static(b"{\"r\":1}"),
key: Some(Arc::from("events")),
headers: Vec::new(),
metadata: RecordMeta {
timestamp_ms: None,
format: PayloadFormat::Json,
},
},
];
let result = client.send_batch(&records).await;
assert!(
matches!(result, SendResult::Ok),
"in-capacity batch should succeed: {result:?}"
);
tokio::time::sleep(Duration::from_millis(50)).await;
let received = server.recv(10).await.expect("recv should succeed").records;
assert_eq!(received.len(), 2, "should receive both records");
let _ = client.close().await;
let _ = server.close().await;
}
#[cfg(feature = "governor")]
#[tokio::test]
async fn test_route_batch_pressure_hold_accepts_nothing() {
use hyperi_rustlib::governor::{
Hysteresis, MemoryPressureSource, PressureSource, UnifiedPressure,
};
use hyperi_rustlib::memory::{MemoryGuard, MemoryGuardConfig};
let port = find_available_port().await;
let addr = format!("127.0.0.1:{port}");
let guard = Arc::new(MemoryGuard::new(MemoryGuardConfig {
limit_bytes: 1000,
pressure_threshold: 0.80,
..Default::default()
}));
guard.add_bytes(950); let pressure = Arc::new(UnifiedPressure::new(
vec![Arc::new(MemoryPressureSource::new(Arc::clone(&guard))) as Arc<dyn PressureSource>],
Hysteresis::new(0.80, 0.65).expect("valid band"),
));
assert!(pressure.should_hold(), "pinned-high governor must hold");
let mut server_config = GrpcConfig::server(&addr);
server_config.recv_buffer_size = 100;
let server = GrpcTransport::with_pressure(&server_config, Some(Arc::clone(&pressure)))
.await
.expect("failed to create server");
let client = GrpcTransport::new(&GrpcConfig::client(&format!("http://{addr}")))
.await
.expect("failed to create client");
let records = vec![
Record {
payload: bytes::Bytes::from_static(b"{\"r\":0}"),
key: Some(Arc::from("events")),
headers: Vec::new(),
metadata: RecordMeta {
timestamp_ms: None,
format: PayloadFormat::Json,
},
},
Record {
payload: bytes::Bytes::from_static(b"{\"r\":1}"),
key: Some(Arc::from("events")),
headers: Vec::new(),
metadata: RecordMeta {
timestamp_ms: None,
format: PayloadFormat::Json,
},
},
];
let result = client.send_batch(&records).await;
assert!(
matches!(result, SendResult::Backpressured),
"batch under pressure must surface as backpressure, got {result:?}"
);
tokio::time::sleep(Duration::from_millis(50)).await;
let received = server.recv(10).await.expect("recv should succeed").records;
assert_eq!(
received.len(),
0,
"pressure-held batch must accept 0 records, got {}",
received.len()
);
let _ = client.close().await;
let _ = server.close().await;
}
#[tokio::test]
async fn test_route_batch_empty() {
let port = find_available_port().await;
let (server, client) = create_pair(port).await;
let result = client.send_batch(&[]).await;
assert!(
matches!(result, SendResult::Ok),
"empty send_batch should succeed: {result:?}"
);
let _ = client.close().await;
let _ = server.close().await;
}
#[tokio::test]
async fn test_recv_timeout_returns_empty() {
let port = find_available_port().await;
let addr = format!("127.0.0.1:{port}");
let mut server_config = GrpcConfig::server(&addr);
server_config.recv_timeout_ms = 50;
let server = GrpcTransport::new(&server_config)
.await
.expect("failed to create server");
let records = server.recv(10).await.expect("recv should succeed").records;
assert!(
records.is_empty(),
"recv with no messages should return empty, got {} records",
records.len()
);
let _ = server.close().await;
}