use claude_agent_sdk::{ContentBlock, Message, query_stream};
use futures::stream::StreamExt;
use std::time::{Duration, Instant};
use tokio::time::sleep;
#[tokio::main]
async fn main() -> anyhow::Result<()> {
println!("🚀 Advanced Stream Processing Example\n");
println!("📡 Example 1: Real-time Stream Processing");
println!("━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━\n");
let mut stream = query_stream(
"Count from 1 to 10, but take a breath between each number",
None,
)
.await?;
let mut message_count = 0;
let start_time = Instant::now();
while let Some(result) = stream.next().await {
match result? {
Message::Assistant(msg) => {
for block in &msg.message.content {
if let ContentBlock::Text(text) = block {
print!("{}", text.text);
use std::io::Write;
std::io::stdout().flush()?;
}
}
message_count += 1;
},
Message::Result(result) if result.is_error => {
eprintln!("\n❌ Error result received");
break;
},
_ => {},
}
}
let elapsed = start_time.elapsed();
println!(
"\n✅ Processed {} messages in {:.2}s\n",
message_count,
elapsed.as_secs_f64()
);
println!("📊 Example 2: Stream with Backpressure Control");
println!("━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━\n");
let mut stream = query_stream(
"List 20 Python best practices with brief explanations",
None,
)
.await?;
let mut item_count = 0;
let start_time = Instant::now();
while let Some(result) = stream.next().await {
match result? {
Message::Assistant(msg) => {
sleep(Duration::from_millis(50)).await;
for block in &msg.message.content {
if let ContentBlock::Text(text) = block {
let words: Vec<&str> = text.text.split_whitespace().collect();
for (i, word) in words.iter().enumerate() {
if i % 5 == 0 {
print!("\n "); }
print!("{} ", word);
item_count += 1;
if item_count % 50 == 0 {
sleep(Duration::from_millis(10)).await;
}
}
}
}
},
Message::Result(result) if result.is_error => {
eprintln!("\n❌ Error result received");
break;
},
_ => {},
}
if item_count % 100 == 0 {
let elapsed = start_time.elapsed();
let rate = item_count as f64 / elapsed.as_secs_f64();
println!(
"\n 📈 Progress: {} items ({:.1} items/s)",
item_count, rate
);
}
}
let elapsed = start_time.elapsed();
let rate = item_count as f64 / elapsed.as_secs_f64();
println!(
"\n✅ Processed {} items in {:.2}s ({:.1} items/s)\n",
item_count,
elapsed.as_secs_f64(),
rate
);
println!("🔍 Example 3: Stream Filtering and Transformation");
println!("━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━\n");
let query = "Explain the differences between Rust and Go in terms of:
1. Memory management
2. Concurrency model
3. Error handling
4. Learning curve";
let mut stream = query_stream(query, None).await?;
let mut filtered_content = Vec::new();
let mut total_blocks = 0;
while let Some(result) = stream.next().await {
match result? {
Message::Assistant(msg) => {
for block in &msg.message.content {
if let ContentBlock::Text(text) = block {
total_blocks += 1;
if text.text.len() > 20 {
let transformed =
format!("[{}] {}", filtered_content.len() + 1, text.text);
filtered_content.push(transformed);
println!(
" ✓ Captured block {} ({} chars)",
filtered_content.len(),
text.text.len()
);
} else {
println!(" ⊘ Skipped short block ({} chars)", text.text.len());
}
}
}
},
Message::Result(result) if result.is_error => {
eprintln!("❌ Error result received");
break;
},
_ => {},
}
}
println!("\n📊 Filtering Summary:");
println!(" Total blocks: {}", total_blocks);
println!(" Filtered blocks: {}", filtered_content.len());
println!(
" Filter rate: {:.1}%",
(filtered_content.len() as f64 / total_blocks as f64) * 100.0
);
println!("\n💾 Example 4: Memory-Efficient Aggregation");
println!("━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━\n");
let mut stream = query_stream(
"Generate 5 tips for each: Python, JavaScript, Rust, and Go programming",
None,
)
.await?;
let mut tips_by_language = std::collections::HashMap::new();
let mut current_language = String::new();
while let Some(result) = stream.next().await {
match result? {
Message::Assistant(msg) => {
for block in &msg.message.content {
if let ContentBlock::Text(text) = block {
let text_lower = text.text.to_lowercase();
for lang in ["python", "javascript", "rust", "go"] {
if text_lower.contains(lang) && current_language != lang {
current_language = lang.to_string();
break;
}
}
if !current_language.is_empty() {
*tips_by_language
.entry(current_language.clone())
.or_insert(0) += 1;
}
}
}
},
Message::Result(result) if result.is_error => {
eprintln!("❌ Error result received");
break;
},
_ => {},
}
}
println!("📊 Tips by Language:");
for (lang, count) in &tips_by_language {
println!(" {}: {} tips", lang, count);
}
let separator = "=".repeat(50);
println!("\n{}", separator);
println!("✅ Stream Processing Examples Completed");
println!("{}", separator);
println!("\nKey Takeaways:");
println!(" 🎯 Real-time processing reduces perceived latency");
println!(" 💾 O(1) memory per message vs O(n) for query()");
println!(" 🔄 Backpressure control prevents resource exhaustion");
println!(" 🔍 Stream filtering enables selective processing");
println!(" 📊 Aggregation can be done without storing all data");
Ok(())
}