use anyhow::Result;
use claude_agent_sdk::{Message, query, query_stream};
use futures::stream::{StreamExt, TryStreamExt};
use std::time::Instant;
#[tokio::main]
async fn main() -> Result<()> {
println!("=== Concurrent Queries Example ===\n");
println!("1. Sequential Queries:");
let start = Instant::now();
run_sequential_queries().await?;
let sequential_time = start.elapsed();
println!(" Total time: {:?}\n", sequential_time);
println!("2. Concurrent Queries (join!):");
let start = Instant::now();
run_concurrent_queries_join().await?;
let concurrent_time = start.elapsed();
println!(" Total time: {:?}\n", concurrent_time);
println!("3. Concurrent Queries (TaskPool):");
let start = Instant::now();
run_concurrent_queries_taskpool().await?;
let taskpool_time = start.elapsed();
println!(" Total time: {:?}\n", taskpool_time);
println!("4. Concurrent Streaming Queries:");
let start = Instant::now();
run_concurrent_streams().await?;
let stream_time = start.elapsed();
println!(" Total time: {:?}\n", stream_time);
println!("=== Performance Comparison ===");
println!("Sequential: {:?}", sequential_time);
println!(
"Concurrent: {:?} ({:.1}x faster)",
concurrent_time,
sequential_time.as_secs_f64() / concurrent_time.as_secs_f64()
);
println!(
"TaskPool: {:?} ({:.1}x faster)",
taskpool_time,
sequential_time.as_secs_f64() / taskpool_time.as_secs_f64()
);
println!(
"Streaming: {:?} ({:.1}x faster)",
stream_time,
sequential_time.as_secs_f64() / stream_time.as_secs_f64()
);
Ok(())
}
async fn run_sequential_queries() -> Result<()> {
let questions = vec![
"What is 2 + 2?",
"What is the capital of France?",
"Explain Rust ownership",
];
for (i, question) in questions.iter().enumerate() {
println!(" Query {}: {}", i + 1, question);
let start = Instant::now();
let _messages = query(question.to_string(), None).await?;
println!(" Completed in {:?}", start.elapsed());
}
Ok(())
}
async fn run_concurrent_queries_join() -> Result<()> {
let questions = vec![
"What is 2 + 2?",
"What is the capital of France?",
"Explain Rust ownership",
];
let futures: Vec<_> = questions.into_iter().map(|q| {
let q = q.to_string();
async move {
let start = Instant::now();
let messages = query(q.to_string(), None).await?;
println!(" Query completed in {:?}", start.elapsed());
Ok::<Vec<Message>, anyhow::Error>(messages)
}
}).collect();
use futures::future::join_all;
let results = join_all(futures).await;
let results: Result<Vec<_>, _> = results.into_iter().collect();
let results = results?;
println!(" All {} queries completed", results.len());
Ok(())
}
async fn run_concurrent_queries_taskpool() -> Result<()> {
use tokio::task::JoinSet;
let questions = vec![
"What is 2 + 2?",
"What is the capital of France?",
"Explain Rust ownership",
"What is a closure?",
"Explain async/await",
];
let max_concurrent = 3;
let mut join_set = JoinSet::new();
let mut completed = 0;
for (i, question) in questions.into_iter().enumerate() {
while join_set.len() >= max_concurrent {
if let Some(result) = join_set.join_next().await {
let _ = result??;
completed += 1;
println!(" Completed {}/{}", completed, i + 1);
}
}
println!(" Starting query {}: {}", i + 1, question);
join_set.spawn(async move {
let start = Instant::now();
let messages = query(question.to_string(), None).await?;
println!(" Query {} finished in {:?}", i + 1, start.elapsed());
Ok::<Vec<Message>, anyhow::Error>(messages)
});
}
while let Some(result) = join_set.join_next().await {
result??;
completed += 1;
}
println!(" All {} queries completed", completed);
Ok(())
}
async fn run_concurrent_streams() -> Result<()> {
let questions = vec![
"What is 2 + 2?",
"What is the capital of France?",
"Explain Rust ownership",
];
let streams = futures::stream::iter(questions)
.map(|q| {
let q = q.to_string();
async move {
let start = Instant::now();
let mut stream = query_stream(q, None).await?;
let mut count = 0;
while let Some(result) = stream.next().await {
result?;
count += 1;
}
println!(
" Stream completed in {:?}, {} messages",
start.elapsed(),
count
);
Ok::<(), anyhow::Error>(())
}
})
.buffer_unordered(3);
streams.try_collect::<Vec<_>>().await?;
Ok(())
}
#[allow(dead_code)]
async fn batch_processing_example() -> Result<()> {
use std::collections::HashMap;
let data = vec![
("apple", "fruit"),
("carrot", "vegetable"),
("chicken", "meat"),
("bread", "grain"),
("cheese", "dairy"),
];
println!(" Processing {} items concurrently", data.len());
let results: Vec<_> = futures::stream::iter(data)
.map(|(item, category)| async move {
let question = format!("What is {}? It's a {}", item, category);
let _messages = query(question.to_string(), None).await?;
Ok::<(String, String), anyhow::Error>((item.to_string(), category.to_string()))
})
.buffer_unordered(3)
.try_collect()
.await?;
println!(" Processed {} items", results.len());
let mut map: HashMap<String, String> = HashMap::new();
for (key, value) in results {
map.insert(key, value);
}
Ok(())
}
#[allow(dead_code)]
async fn error_isolation_example() -> Result<()> {
let queries = vec![
"Valid query 1",
"Invalid query that might fail",
"Valid query 2",
];
let total_queries = queries.len();
let results: Vec<_> = futures::stream::iter(queries)
.map(|q| async move {
match query(q.to_string(), None).await {
Ok(messages) => {
println!(" ✓ Query succeeded: {}", q);
Some(messages)
},
Err(e) => {
println!(" ✗ Query failed: {} - {}", q, e);
None
},
}
})
.buffer_unordered(3)
.collect()
.await;
let successful = results.iter().filter(|r| r.is_some()).count();
println!(" {}/{} queries succeeded", successful, total_queries);
Ok(())
}
#[allow(dead_code)]
async fn rate_limited_concurrent() -> Result<()> {
use std::sync::Arc;
use std::sync::atomic::{AtomicU64, Ordering};
use tokio::sync::Semaphore;
use tokio::time::{Duration, sleep};
let queries = vec!["Query 1", "Query 2", "Query 3", "Query 4", "Query 5"];
let queries_count = queries.len();
let rate_limit_ms = 500u64; let last_call = Arc::new(AtomicU64::new(0));
let semaphore = Arc::new(Semaphore::new(2));
println!(" Processing {} queries with rate limit", queries_count);
let results: Vec<_> = futures::stream::iter(queries)
.map(|q| {
let semaphore = semaphore.clone();
let last_call = last_call.clone();
async move {
let _permit = semaphore.acquire().await.unwrap();
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_millis() as u64;
let last = last_call.load(Ordering::SeqCst);
if last > 0 {
let elapsed = now.saturating_sub(last);
if elapsed < rate_limit_ms {
sleep(Duration::from_millis(rate_limit_ms - elapsed)).await;
}
}
last_call.store(
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_millis() as u64,
Ordering::SeqCst,
);
println!(" Executing: {}", q);
query(q.to_string(), None).await
}
})
.buffer_unordered(2) .collect()
.await;
let successful = results.iter().filter(|r| r.is_ok()).count();
println!(" {}/{} queries succeeded", successful, queries_count);
Ok(())
}
#[allow(dead_code)]
async fn concurrent_with_timeout() -> Result<()> {
use std::time::Duration;
let queries = vec![
("Quick query", "What is 2 + 2?"),
("Longer query", "Explain quantum computing"),
("Medium query", "What is Rust?"),
];
let queries_count = queries.len();
let timeout = Duration::from_secs(10);
let results: Vec<_> = futures::stream::iter(queries)
.map(|(name, q)| async move {
let start = Instant::now();
match tokio::time::timeout(timeout, query(q.to_string(), None)).await {
Ok(Ok(messages)) => {
println!(" ✓ {} completed in {:?}", name, start.elapsed());
Some((name, messages))
},
Ok(Err(e)) => {
println!(" ✗ {} failed: {}", name, e);
None
},
Err(_) => {
println!(" ✗ {} timed out after {:?}", name, timeout);
None
},
}
})
.buffer_unordered(3)
.collect()
.await;
let completed = results.iter().filter(|r| r.is_some()).count();
println!(
" {}/{} queries completed within timeout",
completed, queries_count
);
Ok(())
}