use clasp_client::Clasp;
use clasp_core::SecurityMode;
use clasp_router::{Router, RouterConfig};
use clasp_test_utils::{find_available_port, wait_for, TestRouter};
use std::collections::HashSet;
use std::sync::atomic::{AtomicBool, AtomicU32, Ordering};
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::Notify;
use tokio::time::timeout;
#[tokio::test]
async fn test_session_unique_id() {
let router = TestRouter::start().await;
let mut session_ids = HashSet::new();
let mut clients = Vec::new();
for i in 0..5 {
let client = Clasp::builder(&router.url())
.name(&format!("Client{}", i))
.connect()
.await
.unwrap_or_else(|_| panic!("Client {} connect failed", i));
let session_id = client
.session_id()
.unwrap_or_else(|| panic!("Client {} has no session ID", i));
assert!(
session_ids.insert(session_id.clone()),
"Duplicate session ID: {}",
session_id
);
clients.push(client);
}
assert_eq!(session_ids.len(), 5, "Expected 5 unique session IDs");
for client in clients {
client.close().await;
}
}
#[tokio::test]
async fn test_session_id_format() {
let router = TestRouter::start().await;
let client = Clasp::connect_to(&router.url())
.await
.expect("Connect failed");
let session_id = client.session_id().expect("No session ID");
assert_eq!(session_id.len(), 36, "Session ID length should be 36");
assert_eq!(
session_id.chars().filter(|c| *c == '-').count(),
4,
"Session ID should have 4 hyphens"
);
for (i, c) in session_id.chars().enumerate() {
let valid = c.is_ascii_hexdigit() || c == '-';
assert!(valid, "Invalid char '{}' at position {}", c, i);
}
let hyphen_positions: Vec<usize> = session_id
.chars()
.enumerate()
.filter(|(_, c)| *c == '-')
.map(|(i, _)| i)
.collect();
assert_eq!(
hyphen_positions,
vec![8, 13, 18, 23],
"UUID hyphen positions incorrect"
);
client.close().await;
}
#[tokio::test]
async fn test_session_id_persistence() {
let router = TestRouter::start().await;
let client = Clasp::connect_to(&router.url())
.await
.expect("Connect failed");
let session_id_1 = client.session_id().expect("No session ID");
for _ in 0..10 {
let session_id = client.session_id().expect("Session ID became None");
assert_eq!(session_id, session_id_1, "Session ID changed");
}
client.close().await;
}
#[tokio::test]
async fn test_session_cleanup_on_disconnect() {
let router = TestRouter::start().await;
let client = Clasp::connect_to(&router.url())
.await
.expect("Connect failed");
let original_session = client.session_id().expect("No session ID");
client.close().await;
tokio::time::sleep(Duration::from_millis(200)).await;
let new_client = Clasp::connect_to(&router.url())
.await
.expect("Reconnect failed");
let new_session = new_client.session_id().expect("No new session ID");
assert_ne!(
new_session, original_session,
"New client got same session ID: {} == {}",
new_session, original_session
);
new_client.close().await;
}
#[tokio::test]
async fn test_session_multiple_reconnects() {
let router = TestRouter::start().await;
let mut all_sessions = HashSet::new();
for i in 0..5 {
let client = Clasp::connect_to(&router.url())
.await
.unwrap_or_else(|_| panic!("Connect {} failed", i));
let session_id = client
.session_id()
.unwrap_or_else(|| panic!("No session ID on connect {}", i));
assert!(
all_sessions.insert(session_id.clone()),
"Duplicate session ID on reconnect {}: {}",
i,
session_id
);
client.close().await;
tokio::time::sleep(Duration::from_millis(50)).await;
}
assert_eq!(
all_sessions.len(),
5,
"Expected 5 unique sessions across reconnects"
);
}
#[tokio::test]
async fn test_graceful_vs_abrupt_disconnect() {
let router = TestRouter::start().await;
let client1 = Clasp::connect_to(&router.url())
.await
.expect("Connect 1 failed");
assert!(client1.is_connected(), "Client1 not connected");
client1.close().await;
assert!(
!client1.is_connected(),
"Client1 still connected after close"
);
let client2 = Clasp::connect_to(&router.url())
.await
.expect("Connect 2 failed");
let session2 = client2.session_id();
drop(client2);
let client3 = Clasp::connect_to(&router.url())
.await
.expect("Connect 3 failed");
let session3 = client3.session_id();
if let (Some(s2), Some(s3)) = (session2, session3) {
assert_ne!(s2, s3, "Sessions should differ after abrupt disconnect");
}
client3.close().await;
}
#[tokio::test]
async fn test_max_sessions_limit() {
let router = TestRouter::start_with_config(RouterConfig {
name: "Limited Router".to_string(),
max_sessions: 3,
session_timeout: 60,
features: vec!["param".to_string()],
security_mode: SecurityMode::Open,
max_subscriptions_per_session: 1000,
gesture_coalescing: true,
gesture_coalesce_interval_ms: 0,
max_messages_per_second: 0,
rate_limiting_enabled: false,
..Default::default()
})
.await;
let mut clients = Vec::new();
let mut connect_success = 0;
let mut connect_failed = 0;
for _ in 0..5 {
match timeout(Duration::from_secs(2), Clasp::connect_to(&router.url())).await {
Ok(Ok(client)) => {
if client.session_id().is_some() {
connect_success += 1;
clients.push(client);
} else {
connect_failed += 1;
}
}
Ok(Err(_)) | Err(_) => {
connect_failed += 1;
}
}
}
assert!(
connect_success >= 1,
"Should have at least 1 successful connection, got {} success, {} failed",
connect_success,
connect_failed
);
for client in clients {
client.close().await;
}
}
#[tokio::test]
async fn test_concurrent_session_state() {
let router = TestRouter::start().await;
let client1 = Clasp::connect_to(&router.url())
.await
.expect("Client1 connect failed");
let client2 = Clasp::connect_to(&router.url())
.await
.expect("Client2 connect failed");
let session1 = client1.session_id().expect("Client1 no session");
let session2 = client2.session_id().expect("Client2 no session");
assert_ne!(
session1, session2,
"Concurrent clients have same session: {} == {}",
session1, session2
);
assert!(client1.is_connected(), "Client1 not connected");
assert!(client2.is_connected(), "Client2 not connected");
client1.close().await;
client2.close().await;
}
#[tokio::test]
async fn test_high_concurrency() {
let router = TestRouter::start().await;
let url = router.url();
let mut sessions = HashSet::new();
let mut errors = Vec::new();
for i in 0..20 {
match Clasp::builder(&url)
.name(&format!("Concurrent{}", i))
.connect()
.await
{
Ok(client) => {
if let Some(session) = client.session_id().clone() {
if !sessions.insert(session.clone()) {
errors.push(format!("Duplicate session at index {}: {}", i, session));
}
} else {
errors.push(format!("No session at index {}", i));
}
client.close().await;
}
Err(e) => errors.push(format!("Client {} failed: {}", i, e)),
}
}
let success_rate = sessions.len() as f64 / 20.0;
assert!(
success_rate >= 0.8,
"Low success rate: {:.0}% ({} sessions, errors: {:?})",
success_rate * 100.0,
sessions.len(),
errors.first()
);
}
#[tokio::test]
async fn test_session_subscription_isolation() {
let router = TestRouter::start().await;
let client1 = Clasp::connect_to(&router.url())
.await
.expect("Client1 failed");
let client1_received = Arc::new(AtomicU32::new(0));
let client1_wrong = Arc::new(AtomicBool::new(false)); let client1_received_clone = client1_received.clone();
let client1_wrong_clone = client1_wrong.clone();
let notify1 = Arc::new(Notify::new());
let notify1_clone = notify1.clone();
let _ = client1
.subscribe("/client1/**", move |_, addr| {
if addr.starts_with("/client1/") {
client1_received_clone.fetch_add(1, Ordering::SeqCst);
} else {
client1_wrong_clone.store(true, Ordering::SeqCst);
}
notify1_clone.notify_one();
})
.await
.expect("Client1 subscribe failed");
let client2 = Clasp::connect_to(&router.url())
.await
.expect("Client2 failed");
let client2_received = Arc::new(AtomicU32::new(0));
let client2_wrong = Arc::new(AtomicBool::new(false));
let client2_received_clone = client2_received.clone();
let client2_wrong_clone = client2_wrong.clone();
let notify2 = Arc::new(Notify::new());
let notify2_clone = notify2.clone();
let _ = client2
.subscribe("/client2/**", move |_, addr| {
if addr.starts_with("/client2/") {
client2_received_clone.fetch_add(1, Ordering::SeqCst);
} else {
client2_wrong_clone.store(true, Ordering::SeqCst);
}
notify2_clone.notify_one();
})
.await
.expect("Client2 subscribe failed");
tokio::time::sleep(Duration::from_millis(50)).await;
client1
.set("/client1/value", 1.0)
.await
.expect("Client1 set failed");
client2
.set("/client2/value", 2.0)
.await
.expect("Client2 set failed");
let _ = timeout(Duration::from_secs(2), notify1.notified()).await;
let _ = timeout(Duration::from_secs(2), notify2.notified()).await;
let c1_count = client1_received.load(Ordering::SeqCst);
let c2_count = client2_received.load(Ordering::SeqCst);
let c1_wrong = client1_wrong.load(Ordering::SeqCst);
let c2_wrong = client2_wrong.load(Ordering::SeqCst);
assert!(
c1_count >= 1,
"Client1 received {} values (expected >= 1)",
c1_count
);
assert!(
c2_count >= 1,
"Client2 received {} values (expected >= 1)",
c2_count
);
assert!(
!c1_wrong,
"Client1 received wrong address (isolation violated)"
);
assert!(
!c2_wrong,
"Client2 received wrong address (isolation violated)"
);
client1.close().await;
client2.close().await;
}
#[tokio::test]
async fn test_session_value_isolation() {
let router = TestRouter::start().await;
let client1 = Clasp::connect_to(&router.url())
.await
.expect("Client1 connect failed");
let client2 = Clasp::connect_to(&router.url())
.await
.expect("Client2 connect failed");
let client1_values = Arc::new(std::sync::Mutex::new(Vec::<f64>::new()));
let client2_values = Arc::new(std::sync::Mutex::new(Vec::<f64>::new()));
let notify = Arc::new(Notify::new());
let c1_values = client1_values.clone();
let n1 = notify.clone();
client1
.subscribe("/shared/counter", move |val, _| {
if let Some(v) = val.as_f64() {
c1_values.lock().unwrap().push(v);
}
n1.notify_one();
})
.await
.expect("Client1 subscribe failed");
let c2_values = client2_values.clone();
let n2 = notify.clone();
client2
.subscribe("/shared/counter", move |val, _| {
if let Some(v) = val.as_f64() {
c2_values.lock().unwrap().push(v);
}
n2.notify_one();
})
.await
.expect("Client2 subscribe failed");
tokio::time::sleep(Duration::from_millis(50)).await;
client1
.set("/shared/counter", 100.0)
.await
.expect("Set 1 failed");
client2
.set("/shared/counter", 200.0)
.await
.expect("Set 2 failed");
for _ in 0..4 {
let _ = timeout(Duration::from_millis(500), notify.notified()).await;
}
let v1 = client1_values.lock().unwrap().clone();
let v2 = client2_values.lock().unwrap().clone();
assert!(!v1.is_empty(), "Client1 received no values");
assert!(!v2.is_empty(), "Client2 received no values");
client1.close().await;
client2.close().await;
}
#[tokio::test]
async fn test_connect_to_nonexistent_server() {
let result = timeout(
Duration::from_secs(3),
Clasp::connect_to("ws://127.0.0.1:1"), )
.await;
match result {
Ok(Ok(_)) => panic!("Should have failed to connect to nonexistent server"),
Ok(Err(_)) => {} Err(_) => {} }
}
#[tokio::test]
async fn test_connect_invalid_url() {
let invalid_urls = vec![
"not-a-url",
"http://localhost:7330", "",
"ws://",
];
for url in invalid_urls {
let connect_result = timeout(Duration::from_secs(2), Clasp::connect_to(url)).await;
match connect_result {
Ok(Ok(_)) => {
panic!("Should have failed for invalid URL: {}", url);
}
Ok(Err(_)) | Err(_) => {
}
}
}
}
#[tokio::test]
async fn test_operations_after_close() {
let router = TestRouter::start().await;
let client = Clasp::connect_to(&router.url())
.await
.expect("Connect failed");
client.close().await;
assert!(
!client.is_connected(),
"Should not be connected after close"
);
let _ = client.set("/test", 1.0).await;
}
#[tokio::test]
async fn test_double_close() {
let router = TestRouter::start().await;
let client = Clasp::connect_to(&router.url())
.await
.expect("Connect failed");
client.close().await;
client.close().await;
assert!(!client.is_connected(), "Should not be connected");
}
#[tokio::test]
async fn test_rapid_connect_disconnect() {
let router = TestRouter::start().await;
let mut success = 0;
for _ in 0..20 {
if let Ok(Ok(client)) =
timeout(Duration::from_secs(2), Clasp::connect_to(&router.url())).await
{
client.close().await;
success += 1;
}
}
assert!(
success >= 18,
"Only {}/20 rapid connect/disconnect succeeded",
success
);
}
#[tokio::test]
async fn test_session_after_server_restart() {
let port = find_available_port().await;
let addr = format!("127.0.0.1:{}", port);
let url = format!("ws://127.0.0.1:{}", port);
let router = Router::new(RouterConfig::default());
let handle = tokio::spawn({
let addr = addr.clone();
async move {
let _ = router.serve_websocket(&addr).await;
}
});
let _ = wait_for(
|| async move {
tokio::net::TcpStream::connect(format!("127.0.0.1:{}", port))
.await
.is_ok()
},
Duration::from_millis(10),
Duration::from_secs(5),
)
.await;
let client1 = Clasp::connect_to(&url).await.expect("Connect 1 failed");
let session1 = client1.session_id();
client1.close().await;
handle.abort();
tokio::time::sleep(Duration::from_millis(100)).await;
let router2 = Router::new(RouterConfig::default());
let handle2 = tokio::spawn({
let addr = addr.clone();
async move {
let _ = router2.serve_websocket(&addr).await;
}
});
let _ = wait_for(
|| async move {
tokio::net::TcpStream::connect(format!("127.0.0.1:{}", port))
.await
.is_ok()
},
Duration::from_millis(10),
Duration::from_secs(5),
)
.await;
let client2 = Clasp::connect_to(&url).await.expect("Connect 2 failed");
let session2 = client2.session_id();
if let (Some(s1), Some(s2)) = (session1, session2) {
assert_ne!(
s1, s2,
"Session persisted across server restart: {} == {}",
s1, s2
);
}
client2.close().await;
handle2.abort();
}