04_reasoning_streaming/
04_reasoning_streaming.rs

1use anyhow::Result;
2use praxis_llm::{ReasoningClient, OpenAIClient, ResponseRequest, Message, ReasoningConfig, StreamEvent, EventBatcher};
3use futures::StreamExt;
4use tokio::time::Instant;
5
6#[tokio::main]
7async fn main() -> Result<()> {
8    let api_key = std::env::var("OPENAI_API_KEY")?;
9    let client = OpenAIClient::new(api_key)?;
10
11    let request = ResponseRequest::new(
12        "gpt-5",
13        vec![Message::human("Solve this problem: If a train travels 120 km in 2 hours, what is its average speed? And after that make a dynamic programming problem that can be solved in O(n) time and space complexity.")]
14    ).with_reasoning(ReasoningConfig::medium());
15
16    println!("Streaming response with reasoning (batched for network efficiency):\n");
17    
18    let mut stream = client.reason_stream(request).await?;
19    let mut batcher = EventBatcher::new(50); // 50ms batching window
20    let mut reasoning_displayed = false;
21    let start_time = Instant::now();
22    let mut total_events = 0;
23    let mut total_batches = 0;
24
25    loop {
26        tokio::select! {
27            // Receive events from stream
28            event_result = stream.next() => {
29                match event_result {
30                    Some(Ok(event)) => {
31                        total_events += 1;
32                        batcher.push(event);
33                    }
34                    Some(Err(e)) => {
35                        eprintln!("Stream error: {}", e);
36                        break;
37                    }
38                    None => {
39                        // Stream ended, flush remaining events
40                        if !batcher.is_empty() {
41                            let batch = batcher.take();
42                            total_batches += 1;
43                            display_batch(&batch, &mut reasoning_displayed)?;
44                        }
45                        break;
46                    }
47                }
48            }
49            
50            // Flush batch when timer expires
51            _ = batcher.ticker().tick() => {
52                if !batcher.is_empty() {
53                    let batch = batcher.take();
54                    total_batches += 1;
55                    display_batch(&batch, &mut reasoning_displayed)?;
56                }
57            }
58        }
59    }
60
61    let elapsed = start_time.elapsed();
62    println!("\n\nDone.");
63    println!("Stats: {} events in {} batches ({}% reduction in network calls)", 
64        total_events, 
65        total_batches,
66        if total_events > 0 { 100 - (total_batches * 100 / total_events) } else { 0 }
67    );
68    println!("Time elapsed: {:.2}s", elapsed.as_secs_f64());
69
70    Ok(())
71}
72
73fn display_batch(batch: &[StreamEvent], reasoning_displayed: &mut bool) -> Result<()> {
74    for event in batch {
75        match event {
76            StreamEvent::Reasoning { content } => {
77                if !*reasoning_displayed {
78                    println!("[REASONING]");
79                    *reasoning_displayed = true;
80                }
81                print!("{}", content);
82                std::io::Write::flush(&mut std::io::stdout())?;
83            }
84            StreamEvent::Message { content } => {
85                if *reasoning_displayed {
86                    println!("\n\n[RESPONSE]");
87                    *reasoning_displayed = false;
88                }
89                print!("{}", content);
90                std::io::Write::flush(&mut std::io::stdout())?;
91            }
92            StreamEvent::Done { .. } => {
93                // Done event handled in main loop
94            }
95            _ => {}
96        }
97    }
98    Ok(())
99}
100