use kubemq::{ConnectionState, Event, KubemqClient};
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use std::time::Duration;
async fn live_client() -> KubemqClient {
KubemqClient::builder()
.host("localhost")
.port(50000)
.client_id("rust-cov-client")
.check_connection(true)
.build()
.await
.expect("live broker required")
}
#[tokio::test]
#[ignore]
async fn test_client_drop_without_close() {
{
let _client = live_client().await;
}
}
#[tokio::test]
#[ignore]
async fn test_child_token_cancelled_by_close() {
let client = live_client().await;
let received = Arc::new(AtomicUsize::new(0));
let rc = received.clone();
let sub = client
.subscribe_to_events(
"integration.cov-client.child-token",
"",
move |_event| {
let r = rc.clone();
Box::pin(async move {
r.fetch_add(1, Ordering::SeqCst);
})
},
None,
)
.await
.unwrap();
tokio::time::sleep(Duration::from_millis(200)).await;
client.close().await.unwrap();
tokio::time::sleep(Duration::from_millis(100)).await;
assert!(sub.is_closed());
}
#[tokio::test]
#[ignore]
async fn test_check_state_after_close() {
let client = live_client().await;
client.close().await.unwrap();
let event = Event::builder()
.channel("test-ch")
.body(b"data".to_vec())
.build();
let result = client.send_event(event).await;
assert!(result.is_err());
}
#[tokio::test]
#[ignore]
async fn test_config_accessors() {
let client = KubemqClient::builder()
.host("localhost")
.port(50000)
.client_id("rust-cov-config-test")
.check_connection(true)
.drain_timeout(Duration::from_secs(3))
.rpc_timeout(Duration::from_secs(10))
.max_receive_message_size(8 * 1024 * 1024)
.max_send_message_size(8 * 1024 * 1024)
.build()
.await
.unwrap();
let config = client.config();
assert_eq!(config.host(), "localhost");
assert_eq!(config.port(), 50000);
assert_eq!(config.client_id(), "rust-cov-config-test");
assert_eq!(config.drain_timeout(), Duration::from_secs(3));
assert_eq!(config.rpc_timeout(), Duration::from_secs(10));
assert!(config.check_connection());
client.close().await.unwrap();
}
#[tokio::test]
#[ignore]
async fn test_close_with_drain_timeout() {
let client = KubemqClient::builder()
.host("localhost")
.port(50000)
.client_id("rust-cov-drain-timeout")
.check_connection(true)
.drain_timeout(Duration::from_millis(100))
.build()
.await
.unwrap();
let _sub = client
.subscribe_to_events(
"integration.cov-client.drain",
"",
|_event| Box::pin(async {}),
None,
)
.await
.unwrap();
tokio::time::sleep(Duration::from_millis(100)).await;
client.close().await.unwrap();
assert_eq!(client.state(), ConnectionState::Closed);
}
#[tokio::test]
#[ignore]
async fn test_sequential_operations() {
let client = live_client().await;
let info = client.ping().await.unwrap();
assert!(!info.host.is_empty());
let event = Event::builder()
.channel("integration.cov-client.sequential")
.body(b"seq-test".to_vec())
.build();
client.send_event(event).await.unwrap();
client
.publish_event(
"integration.cov-client.sequential",
b"convenience",
Some("meta"),
None,
)
.await
.unwrap();
client.close().await.unwrap();
}
#[tokio::test]
#[ignore]
async fn test_state_ready_after_eager_connect() {
let client = live_client().await;
assert_eq!(client.state(), ConnectionState::Ready);
client.ping().await.unwrap();
client.close().await.unwrap();
assert_eq!(client.state(), ConnectionState::Closed);
}
#[tokio::test]
#[ignore]
async fn test_client_clone_operations() {
let client1 = live_client().await;
let client2 = client1.clone();
let client3 = client2.clone();
client1.ping().await.unwrap();
client2.ping().await.unwrap();
client3.ping().await.unwrap();
client1.close().await.unwrap();
assert_eq!(client2.state(), ConnectionState::Closed);
assert_eq!(client3.state(), ConnectionState::Closed);
}
#[tokio::test]
#[ignore]
async fn test_multiple_drops() {
{
let client = live_client().await;
let _clone1 = client.clone();
let _clone2 = client.clone();
}
}