#![cfg(feature = "integration-tests")]
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use std::time::Duration;
use claude_agents_sdk::{ClaudeAgentOptions, ClaudeClient, PermissionMode};
use tokio::sync::Barrier;
use crate::integration::helpers::{collect_messages, default_options};
#[tokio::test]
async fn test_two_concurrent_queries() {
let options1 = default_options();
let options2 = default_options();
let (result1, result2) = tokio::join!(
collect_messages("What is 2+2? Reply with just the number.", options1),
collect_messages("What is 3+3? Reply with just the number.", options2)
);
let success_count = [&result1, &result2].iter().filter(|r| r.is_ok()).count();
eprintln!(
"Concurrent queries: result1={:?}, result2={:?}",
result1.is_ok(),
result2.is_ok()
);
assert!(
success_count >= 1,
"At least one concurrent query should succeed"
);
}
#[tokio::test]
async fn test_multiple_concurrent_queries() {
let count = 3; let completed = Arc::new(AtomicUsize::new(0));
let handles: Vec<_> = (0..count)
.map(|i| {
let completed = Arc::clone(&completed);
tokio::spawn(async move {
let options = default_options();
let prompt = format!("What is {}+{}? Reply with just the number.", i, i);
let result = tokio::time::timeout(
Duration::from_secs(90),
collect_messages(&prompt, options),
)
.await;
match result {
Ok(Ok(_)) => {
completed.fetch_add(1, Ordering::SeqCst);
true
}
Ok(Err(e)) => {
eprintln!("Query {} failed: {}", i, e);
false
}
Err(_) => {
eprintln!("Query {} timed out", i);
false
}
}
})
})
.collect();
for handle in handles {
let _ = handle.await;
}
let completed_count = completed.load(Ordering::SeqCst);
eprintln!(
"Multiple concurrent: {}/{} completed",
completed_count, count
);
assert!(
completed_count >= count / 2,
"At least half of concurrent queries should complete"
);
}
#[tokio::test]
async fn test_two_concurrent_clients() {
let handle1 = tokio::spawn(async {
let options = ClaudeAgentOptions::new()
.with_permission_mode(PermissionMode::Default)
.with_max_turns(1);
let mut client = ClaudeClient::new(Some(options));
if let Err(e) = client.connect().await {
eprintln!("Client 1 connect failed: {}", e);
return false;
}
if let Err(e) = client.query("Say 'client1'").await {
eprintln!("Client 1 query failed: {}", e);
client.disconnect().await.ok();
return false;
}
let result = client.receive_response().await;
client.disconnect().await.ok();
result.is_ok()
});
let handle2 = tokio::spawn(async {
let options = ClaudeAgentOptions::new()
.with_permission_mode(PermissionMode::Default)
.with_max_turns(1);
let mut client = ClaudeClient::new(Some(options));
if let Err(e) = client.connect().await {
eprintln!("Client 2 connect failed: {}", e);
return false;
}
if let Err(e) = client.query("Say 'client2'").await {
eprintln!("Client 2 query failed: {}", e);
client.disconnect().await.ok();
return false;
}
let result = client.receive_response().await;
client.disconnect().await.ok();
result.is_ok()
});
let (result1, result2) = tokio::join!(handle1, handle2);
let success1 = result1.unwrap_or(false);
let success2 = result2.unwrap_or(false);
eprintln!(
"Concurrent clients: client1={}, client2={}",
success1, success2
);
assert!(
success1 || success2,
"At least one concurrent client should succeed"
);
}
#[tokio::test]
async fn test_synchronized_concurrent_clients() {
let barrier = Arc::new(Barrier::new(2));
let completed = Arc::new(AtomicUsize::new(0));
let b1 = Arc::clone(&barrier);
let c1 = Arc::clone(&completed);
let handle1 = tokio::spawn(async move {
let options = ClaudeAgentOptions::new()
.with_permission_mode(PermissionMode::Default)
.with_max_turns(1);
let mut client = ClaudeClient::new(Some(options));
b1.wait().await;
if client.connect().await.is_ok() {
if client.query("Say 'sync1'").await.is_ok() {
if client.receive_response().await.is_ok() {
c1.fetch_add(1, Ordering::SeqCst);
}
}
client.disconnect().await.ok();
}
});
let b2 = Arc::clone(&barrier);
let c2 = Arc::clone(&completed);
let handle2 = tokio::spawn(async move {
let options = ClaudeAgentOptions::new()
.with_permission_mode(PermissionMode::Default)
.with_max_turns(1);
let mut client = ClaudeClient::new(Some(options));
b2.wait().await;
if client.connect().await.is_ok() {
if client.query("Say 'sync2'").await.is_ok() {
if client.receive_response().await.is_ok() {
c2.fetch_add(1, Ordering::SeqCst);
}
}
client.disconnect().await.ok();
}
});
let result = tokio::time::timeout(Duration::from_secs(120), async {
let _ = handle1.await;
let _ = handle2.await;
})
.await;
let completed_count = completed.load(Ordering::SeqCst);
eprintln!("Synchronized concurrent: {}/2 completed", completed_count);
match result {
Ok(_) => {
assert!(
completed_count >= 1,
"At least one synchronized client should complete"
);
}
Err(_) => {
eprintln!("Synchronized test timed out");
}
}
}
#[tokio::test]
async fn test_concurrent_sessions_isolated() {
let session1_response = Arc::new(tokio::sync::Mutex::new(String::new()));
let session2_response = Arc::new(tokio::sync::Mutex::new(String::new()));
let r1 = Arc::clone(&session1_response);
let handle1 = tokio::spawn(async move {
let options = ClaudeAgentOptions::new()
.with_permission_mode(PermissionMode::Default)
.with_max_turns(1);
let mut client = ClaudeClient::new(Some(options));
if client.connect().await.is_err() {
return;
}
if client
.query("The secret code is ALPHA123. Acknowledge with 'Code received: ALPHA123'")
.await
.is_err()
{
client.disconnect().await.ok();
return;
}
if let Ok((response, _)) = client.receive_response().await {
*r1.lock().await = response;
}
client.disconnect().await.ok();
});
let r2 = Arc::clone(&session2_response);
let handle2 = tokio::spawn(async move {
tokio::time::sleep(Duration::from_millis(100)).await;
let options = ClaudeAgentOptions::new()
.with_permission_mode(PermissionMode::Default)
.with_max_turns(1);
let mut client = ClaudeClient::new(Some(options));
if client.connect().await.is_err() {
return;
}
if client
.query("What is the secret code? If you don't know, say 'No code known'")
.await
.is_err()
{
client.disconnect().await.ok();
return;
}
if let Ok((response, _)) = client.receive_response().await {
*r2.lock().await = response;
}
client.disconnect().await.ok();
});
let _ = tokio::time::timeout(Duration::from_secs(120), async {
let _ = handle1.await;
let _ = handle2.await;
})
.await;
let resp1 = session1_response.lock().await;
let resp2 = session2_response.lock().await;
eprintln!("Session 1 response: {}", *resp1);
eprintln!("Session 2 response: {}", *resp2);
if !resp1.is_empty() && !resp2.is_empty() {
if resp1.contains("ALPHA123") {
let session2_knows = resp2.contains("ALPHA123");
assert!(
!session2_knows,
"Session 2 should not know Session 1's secret (sessions should be isolated)"
);
}
}
}
#[tokio::test]
async fn test_sequential_after_concurrent() {
let (r1, r2) = tokio::join!(
collect_messages("Say 'concurrent1'", default_options()),
collect_messages("Say 'concurrent2'", default_options())
);
eprintln!("Concurrent phase: r1={:?}, r2={:?}", r1.is_ok(), r2.is_ok());
let sequential = collect_messages("Say 'sequential'", default_options()).await;
eprintln!("Sequential phase: {:?}", sequential.is_ok());
assert!(
sequential.is_ok(),
"Sequential query after concurrent should succeed"
);
}
#[tokio::test]
async fn test_rapid_sequential_after_burst() {
let handles: Vec<_> = (0..2)
.map(|i| {
tokio::spawn(async move {
let options = default_options();
collect_messages(&format!("Say 'burst{}'", i), options).await
})
})
.collect();
for handle in handles {
let _ = handle.await;
}
let mut success_count = 0;
for i in 0..3 {
let result = collect_messages(&format!("Say 'seq{}'", i), default_options()).await;
if result.is_ok() {
success_count += 1;
}
}
eprintln!("Rapid sequential: {}/3 succeeded", success_count);
assert!(
success_count >= 2,
"Most rapid sequential queries should succeed"
);
}