use futures::StreamExt;
use sage_core::{error::SageResult, llm::StreamChunk, llm::streaming::stream_utils};
use std::io::{self, Write};
#[tokio::main]
async fn main() -> SageResult<()> {
tracing_subscriber::fmt::init();
println!("š Sage Agent Streaming Demo");
println!("============================\n");
println!("š 1. Setting up streaming client");
demonstrate_streaming_concepts().await?;
println!("\nš Streaming demo completed!");
println!("š” Key benefits of streaming:");
println!(" ⢠Real-time user feedback");
println!(" ⢠Better perceived performance");
println!(" ⢠Ability to process partial responses");
println!(" ⢠Improved user experience for long responses");
Ok(())
}
async fn demonstrate_streaming_concepts() -> SageResult<()> {
println!("š 2. Demonstrating streaming concepts");
let mock_chunks = vec![
StreamChunk::content("Hello"),
StreamChunk::content(" there!"),
StreamChunk::content(" I'm"),
StreamChunk::content(" a"),
StreamChunk::content(" streaming"),
StreamChunk::content(" response."),
StreamChunk::content(" This"),
StreamChunk::content(" allows"),
StreamChunk::content(" for"),
StreamChunk::content(" real-time"),
StreamChunk::content(" feedback!"),
StreamChunk::final_chunk(
Some(sage_core::types::LlmUsage {
prompt_tokens: 20,
completion_tokens: 15,
total_tokens: 35,
cost_usd: Some(0.001),
cache_creation_input_tokens: None,
cache_read_input_tokens: None,
}),
Some("stop".to_string()),
),
];
println!("\nš” Simulating streaming response:");
print!("Response: ");
io::stdout().flush().unwrap();
for chunk in mock_chunks {
if let Some(content) = &chunk.content {
print!("{}", content);
io::stdout().flush().unwrap();
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
}
if chunk.is_final {
println!("\n");
if let Some(usage) = chunk.usage {
println!(
"š Final usage: {} tokens (${:.4})",
usage.total_tokens,
usage.cost_usd.unwrap_or(0.0)
);
}
if let Some(reason) = chunk.finish_reason {
println!("š Finished: {}", reason);
}
}
}
println!("\nš ļø 3. Stream utility functions");
demonstrate_stream_utilities().await?;
println!("\nš 4. Server-Sent Events (SSE) support");
demonstrate_sse_conversion().await?;
Ok(())
}
async fn demonstrate_stream_utilities() -> SageResult<()> {
use futures::stream;
let chunks = vec![
Ok(StreamChunk::content("First ")),
Ok(StreamChunk::content("chunk ")),
Ok(StreamChunk::content("of ")),
Ok(StreamChunk::content("content.")),
Ok(StreamChunk::final_chunk(None, Some("stop".to_string()))),
];
let stream = Box::pin(stream::iter(chunks));
println!("š Collecting stream into complete response...");
let complete_response = stream_utils::collect_stream(stream).await?;
println!("ā
Complete response: {}", complete_response.content);
let chunks2 = vec![
Ok(StreamChunk::content("Content chunk")),
Ok(StreamChunk::tool_calls(vec![])), Ok(StreamChunk::content(" more content")),
Ok(StreamChunk::final_chunk(None, Some("stop".to_string()))),
];
let stream2 = Box::pin(stream::iter(chunks2));
let content_stream = stream_utils::content_only(stream2);
println!("š Content-only stream:");
let mut content_stream = content_stream;
while let Some(chunk_result) = content_stream.next().await {
match chunk_result {
Ok(chunk) => {
if let Some(content) = chunk.content {
println!(" Content: '{}'", content);
}
}
Err(e) => println!(" Error: {}", e),
}
}
Ok(())
}
async fn demonstrate_sse_conversion() -> SageResult<()> {
use sage_core::llm::streaming::sse;
let chunks = vec![
StreamChunk::content("Hello "),
StreamChunk::content("world!"),
StreamChunk::final_chunk(None, Some("stop".to_string())),
];
println!("š” Converting chunks to SSE format:");
for (i, chunk) in chunks.iter().enumerate() {
let sse_event = sse::chunk_to_sse(chunk.clone())?;
println!("Event {}:", i + 1);
println!("{}", sse_event);
}
Ok(())
}
#[allow(dead_code)]
async fn example_real_usage() -> SageResult<()> {
Ok(())
}
#[allow(dead_code)]
async fn example_streaming_with_cache() -> SageResult<()> {
Ok(())
}