responses_streaming/
responses_streaming.rs

1#![allow(clippy::uninlined_format_args)]
2//! Streaming responses example for the openai-ergonomic crate.
3//!
4//! This example demonstrates streaming patterns for the Responses API, including:
5//! - SSE (Server-Sent Events) streaming responses
6//! - Chunk processing patterns
7//! - Error handling for streaming
8//! - Buffer management best practices
9//! - Real-time response processing
10
11use futures::StreamExt;
12use openai_ergonomic::{Client, Error, Result};
13use serde_json::Value;
14use std::time::Duration;
15use tokio::io::{AsyncBufReadExt, BufReader};
16use tokio_stream::wrappers::LinesStream;
17
18/// Represents a chunk of streaming data from `OpenAI`
19#[derive(Debug, Clone)]
20pub struct StreamChunk {
21    /// The raw event data
22    pub data: String,
23    /// Parsed content delta, if available
24    pub content_delta: Option<String>,
25    /// Whether this is the final chunk
26    pub is_done: bool,
27    /// Any tool call data in this chunk
28    pub tool_call_delta: Option<Value>,
29}
30
31impl StreamChunk {
32    /// Parse a raw SSE data line into a `StreamChunk`
33    pub fn parse(line: &str) -> Result<Option<Self>> {
34        // Handle SSE format: "data: {json}"
35        if !line.starts_with("data: ") {
36            return Ok(None);
37        }
38
39        let data = line.strip_prefix("data: ").unwrap_or("");
40
41        // Check for [DONE] marker
42        if data.trim() == "[DONE]" {
43            return Ok(Some(Self {
44                data: data.to_string(),
45                content_delta: None,
46                is_done: true,
47                tool_call_delta: None,
48            }));
49        }
50
51        // Parse JSON data
52        let json: Value = serde_json::from_str(data).map_err(|e| Error::StreamParsing {
53            message: format!("Failed to parse chunk JSON: {e}"),
54            chunk: data.to_string(),
55        })?;
56
57        // Extract content delta
58        let content_delta = json["choices"][0]["delta"]["content"]
59            .as_str()
60            .map(ToString::to_string);
61
62        // Extract tool call delta
63        let tool_call_delta = json["choices"][0]["delta"]["tool_calls"]
64            .as_array()
65            .and_then(|arr| arr.first())
66            .cloned();
67
68        Ok(Some(Self {
69            data: data.to_string(),
70            content_delta,
71            is_done: false,
72            tool_call_delta,
73        }))
74    }
75
76    /// Get the content from this chunk, if any
77    pub fn content(&self) -> Option<&str> {
78        self.content_delta.as_deref()
79    }
80
81    /// Check if this chunk has tool call data
82    pub const fn has_tool_call(&self) -> bool {
83        self.tool_call_delta.is_some()
84    }
85}
86
87/// Stream manager for handling SSE responses
88pub struct ResponseStream {
89    lines_stream: LinesStream<BufReader<Box<dyn tokio::io::AsyncRead + Send + Unpin>>>,
90    finished: bool,
91}
92
93impl ResponseStream {
94    /// Create a new response stream from a reader
95    pub fn new(reader: Box<dyn tokio::io::AsyncRead + Send + Unpin>) -> Self {
96        let buf_reader = BufReader::new(reader);
97        let lines_stream = LinesStream::new(buf_reader.lines());
98
99        Self {
100            lines_stream,
101            finished: false,
102        }
103    }
104
105    /// Get the next chunk from the stream
106    pub async fn next_chunk(&mut self) -> Result<Option<StreamChunk>> {
107        if self.finished {
108            return Ok(None);
109        }
110
111        while let Some(line_result) = self.lines_stream.next().await {
112            let line = line_result.map_err(|e| Error::StreamConnection {
113                message: format!("Stream read error: {e}"),
114            })?;
115
116            // Skip empty lines
117            if line.trim().is_empty() {
118                continue;
119            }
120
121            // Parse the chunk
122            if let Some(chunk) = StreamChunk::parse(&line)? {
123                if chunk.is_done {
124                    self.finished = true;
125                }
126                return Ok(Some(chunk));
127            }
128        }
129
130        // Stream ended without [DONE] marker
131        self.finished = true;
132        Ok(None)
133    }
134
135    /// Collect all content from the remaining stream
136    pub async fn collect_remaining(&mut self) -> Result<String> {
137        let mut content = String::new();
138
139        while let Some(chunk) = self.next_chunk().await? {
140            if let Some(delta) = chunk.content() {
141                content.push_str(delta);
142            }
143        }
144
145        Ok(content)
146    }
147
148    /// Check if the stream has finished
149    pub const fn is_finished(&self) -> bool {
150        self.finished
151    }
152}
153
154/// Buffer manager for efficient streaming content handling
155pub struct StreamBuffer {
156    content: String,
157    capacity: usize,
158    high_water_mark: usize,
159}
160
161impl StreamBuffer {
162    /// Create a new buffer with the given capacity
163    pub fn new(capacity: usize) -> Self {
164        Self {
165            content: String::with_capacity(capacity),
166            capacity,
167            high_water_mark: capacity * 3 / 4, // 75% of capacity
168        }
169    }
170
171    /// Add content to the buffer
172    pub fn append(&mut self, content: &str) -> Result<()> {
173        // Check if adding this content would exceed capacity
174        if self.content.len() + content.len() > self.capacity {
175            return Err(Error::StreamBuffer {
176                message: format!(
177                    "Buffer capacity exceeded: {} + {} > {}",
178                    self.content.len(),
179                    content.len(),
180                    self.capacity
181                ),
182            });
183        }
184
185        self.content.push_str(content);
186        Ok(())
187    }
188
189    /// Get the current content
190    pub fn content(&self) -> &str {
191        &self.content
192    }
193
194    /// Clear the buffer
195    pub fn clear(&mut self) {
196        self.content.clear();
197    }
198
199    /// Check if buffer is near capacity
200    pub fn is_high_water(&self) -> bool {
201        self.content.len() >= self.high_water_mark
202    }
203
204    /// Get buffer utilization as a percentage
205    pub fn utilization(&self) -> f64 {
206        #[allow(clippy::cast_precision_loss)]
207        {
208            (self.content.len() as f64 / self.capacity as f64) * 100.0
209        }
210    }
211
212    /// Compact the buffer by removing processed content
213    pub fn compact(&mut self, keep_last_chars: usize) {
214        if self.content.len() > keep_last_chars {
215            let start_pos = self.content.len() - keep_last_chars;
216            self.content = self.content[start_pos..].to_string();
217        }
218    }
219}
220
221/// Demonstrates basic streaming response handling
222async fn example_basic_streaming() -> Result<()> {
223    println!("=== Basic Streaming Example ===");
224
225    // Note: This is a conceptual example since actual streaming
226    // requires integration with openai-client-base streaming API
227    println!("Creating client and streaming request...");
228
229    let client = Client::from_env()?.build();
230
231    // Build a streaming request
232    let _streaming_request = client
233        .responses()
234        .user("Tell me a short story about a robot learning to paint")
235        .stream(true)
236        .temperature(0.7)
237        .max_completion_tokens(500);
238
239    println!("Streaming request configured:");
240    println!("- Model: Default (gpt-4)");
241    println!("- Stream: true");
242    println!("- Temperature: 0.7");
243    println!("- Max tokens: 500");
244
245    // Simulate streaming chunks for demonstration
246    let sample_chunks = vec![
247        "Once", " upon", " a", " time,", " there", " was", " a", " little", " robot", " named",
248        " Pixel", "...",
249    ];
250
251    println!("\nSimulated streaming output:");
252    print!("> ");
253    for chunk in sample_chunks {
254        print!("{chunk}");
255        std::io::Write::flush(&mut std::io::stdout()).unwrap();
256        tokio::time::sleep(Duration::from_millis(100)).await;
257    }
258    println!("\n");
259
260    Ok(())
261}
262
263/// Demonstrates advanced streaming with buffer management
264async fn example_buffered_streaming() -> Result<()> {
265    println!("=== Buffered Streaming Example ===");
266
267    let mut buffer = StreamBuffer::new(1024); // 1KB buffer
268
269    // Simulate incoming chunks
270    let chunks = [
271        "The robot's optical sensors",
272        " detected the vibrant colors",
273        " of the sunset painting",
274        " hanging in the gallery.",
275        " For the first time,",
276        " Pixel felt something",
277        " that could only be",
278        " described as wonder.",
279    ];
280
281    println!("Processing chunks with buffer management:");
282
283    for (i, chunk) in chunks.iter().enumerate() {
284        // Add chunk to buffer
285        buffer.append(chunk)?;
286
287        println!(
288            "Chunk {}: '{}' (Buffer: {:.1}% full)",
289            i + 1,
290            chunk,
291            buffer.utilization()
292        );
293
294        // Check if buffer is getting full
295        if buffer.is_high_water() {
296            println!("    Buffer high water mark reached, consider processing");
297
298            // In a real application, you might:
299            // 1. Process the current content
300            // 2. Send to downstream consumers
301            // 3. Compact the buffer
302            buffer.compact(100); // Keep last 100 chars for context
303            println!("   Buffer compacted to {:.1}%", buffer.utilization());
304        }
305
306        tokio::time::sleep(Duration::from_millis(50)).await;
307    }
308
309    println!(
310        "\nFinal content length: {} characters",
311        buffer.content().len()
312    );
313    println!(
314        "Final content: \"{}...\"",
315        &buffer.content()[..buffer.content().len().min(50)]
316    );
317
318    Ok(())
319}
320
321/// Demonstrates error handling patterns for streaming
322fn example_streaming_error_handling() {
323    println!("=== Streaming Error Handling Example ===");
324
325    // Simulate various error conditions that can occur during streaming
326    println!("Demonstrating common streaming error scenarios:");
327
328    // 1. Connection errors
329    println!("\n1. Connection Error Simulation:");
330    let connection_result: Result<()> = Err(Error::StreamConnection {
331        message: "Connection lost to streaming endpoint".to_string(),
332    });
333
334    match connection_result {
335        Err(Error::StreamConnection { message }) => {
336            println!("    Connection error handled: {message}");
337            println!("    Would implement retry logic here");
338        }
339        _ => unreachable!(),
340    }
341
342    // 2. Parsing errors
343    println!("\n2. Parse Error Simulation:");
344    let malformed_chunk = "data: {invalid json}";
345    match StreamChunk::parse(malformed_chunk) {
346        Err(Error::StreamParsing { message, chunk }) => {
347            println!("    Parse error handled: {message}");
348            println!("    Problematic chunk: {chunk}");
349            println!("    Would skip chunk and continue");
350        }
351        _ => println!("    Chunk parsed successfully"),
352    }
353
354    // 3. Buffer overflow
355    println!("\n3. Buffer Overflow Simulation:");
356    let mut small_buffer = StreamBuffer::new(10); // Very small buffer
357    let large_chunk = "This chunk is definitely too large for our tiny buffer";
358
359    match small_buffer.append(large_chunk) {
360        Err(Error::StreamBuffer { message }) => {
361            println!("    Buffer error handled: {message}");
362            println!("    Would implement buffer resizing or chunking");
363        }
364        Ok(()) => println!("    Content added to buffer"),
365        Err(e) => println!("    Unexpected error: {e}"),
366    }
367
368    // 4. Timeout handling
369    println!("\n4. Timeout Handling:");
370    println!("   ⏱  Would implement timeout for stream chunks");
371    println!("    Would retry or fail gracefully on timeout");
372}
373
374/// Demonstrates tool calling in streaming responses
375async fn example_streaming_tool_calls() -> Result<()> {
376    println!("=== Streaming Tool Calls Example ===");
377
378    let client = Client::from_env()?.build();
379
380    // Create a tool for getting weather information
381    let weather_tool = openai_ergonomic::responses::tool_function(
382        "get_weather",
383        "Get current weather for a location",
384        serde_json::json!({
385            "type": "object",
386            "properties": {
387                "location": {
388                    "type": "string",
389                    "description": "City name"
390                }
391            },
392            "required": ["location"]
393        }),
394    );
395
396    // Build streaming request with tools
397    let _tool_request = client
398        .responses()
399        .user("What's the weather like in San Francisco?")
400        .tool(weather_tool)
401        .stream(true);
402
403    println!("Streaming tool call request configured:");
404    println!("- Tool: get_weather function");
405    println!("- Streaming: enabled");
406
407    // Simulate streaming tool call chunks
408    println!("\nSimulated streaming tool call:");
409
410    let tool_chunks = [
411        r#"{"choices":[{"delta":{"tool_calls":[{"index":0,"id":"call_123","type":"function","function":{"name":"get_weather"}}]}}]}"#,
412        r#"{"choices":[{"delta":{"tool_calls":[{"index":0,"function":{"arguments":"{"}}]}}]}"#,
413        r#"{"choices":[{"delta":{"tool_calls":[{"index":0,"function":{"arguments":"\"location\""}}]}}]}"#,
414        r#"{"choices":[{"delta":{"tool_calls":[{"index":0,"function":{"arguments":":"}}]}}]}"#,
415        r#"{"choices":[{"delta":{"tool_calls":[{"index":0,"function":{"arguments":"\"San Francisco\""}}]}}]}"#,
416        r#"{"choices":[{"delta":{"tool_calls":[{"index":0,"function":{"arguments":"}"}}]}}]}"#,
417    ];
418
419    let mut tool_call_buffer = String::new();
420
421    for (i, chunk_data) in tool_chunks.iter().enumerate() {
422        let chunk_line = format!("data: {chunk_data}");
423
424        if let Some(chunk) = StreamChunk::parse(&chunk_line)? {
425            if chunk.has_tool_call() {
426                println!("Chunk {}: Tool call data received", i + 1);
427
428                // In a real implementation, you'd accumulate tool call arguments
429                if let Some(tool_data) = &chunk.tool_call_delta {
430                    if let Some(args) = tool_data["function"]["arguments"].as_str() {
431                        tool_call_buffer.push_str(args);
432                        println!("  Arguments so far: {tool_call_buffer}");
433                    }
434                }
435            }
436        }
437
438        tokio::time::sleep(Duration::from_millis(100)).await;
439    }
440
441    println!("\n Complete tool call arguments: {tool_call_buffer}");
442    println!(" Would now execute get_weather(location='San Francisco')");
443
444    Ok(())
445}
446
447/// Demonstrates chunk processing patterns and metrics
448#[allow(clippy::cast_precision_loss)]
449async fn example_chunk_processing_patterns() -> Result<()> {
450    println!("=== Chunk Processing Patterns ===");
451
452    #[allow(clippy::items_after_statements)]
453    #[derive(Debug, Default)]
454    struct StreamMetrics {
455        total_chunks: usize,
456        content_chunks: usize,
457        tool_chunks: usize,
458        total_bytes: usize,
459        processing_time: Duration,
460    }
461
462    let mut metrics = StreamMetrics::default();
463    let start_time = std::time::Instant::now();
464
465    // Simulate various types of chunks
466    let sample_chunks = [
467        "data: {\"choices\":[{\"delta\":{\"content\":\"Hello\"}}]}",
468        "data: {\"choices\":[{\"delta\":{\"content\":\" world!\"}}]}",
469        "data: {\"choices\":[{\"delta\":{\"tool_calls\":[{\"function\":{\"name\":\"test\"}}]}}]}",
470        "data: {\"choices\":[{\"delta\":{\"content\":\" How are you?\"}}]}",
471        "data: [DONE]",
472    ];
473
474    println!("Processing {} chunks with metrics:", sample_chunks.len());
475
476    for (i, chunk_line) in sample_chunks.iter().enumerate() {
477        if let Some(chunk) = StreamChunk::parse(chunk_line)? {
478            metrics.total_chunks += 1;
479            metrics.total_bytes += chunk.data.len();
480
481            if chunk.content().is_some() {
482                metrics.content_chunks += 1;
483                println!(
484                    "Chunk {}: Content chunk - '{}'",
485                    i + 1,
486                    chunk.content().unwrap_or("")
487                );
488            } else if chunk.has_tool_call() {
489                metrics.tool_chunks += 1;
490                println!("Chunk {}: Tool call chunk", i + 1);
491            } else if chunk.is_done {
492                println!("Chunk {}: Stream completion marker", i + 1);
493            }
494
495            // Simulate processing time
496            tokio::time::sleep(Duration::from_millis(10)).await;
497        }
498    }
499
500    metrics.processing_time = start_time.elapsed();
501
502    println!("\n Stream Processing Metrics:");
503    println!("   Total chunks: {}", metrics.total_chunks);
504    println!("   Content chunks: {}", metrics.content_chunks);
505    println!("   Tool call chunks: {}", metrics.tool_chunks);
506    println!("   Total bytes: {}", metrics.total_bytes);
507    println!("   Processing time: {:?}", metrics.processing_time);
508    println!(
509        "   Avg bytes/chunk: {:.1}",
510        metrics.total_bytes as f64 / metrics.total_chunks as f64
511    );
512
513    Ok(())
514}
515
516#[tokio::main]
517async fn main() -> Result<()> {
518    // Initialize tracing for better debugging
519    tracing_subscriber::fmt::init();
520
521    println!(" OpenAI Ergonomic - Streaming Responses Examples");
522    println!("================================================\n");
523
524    // Note: These examples demonstrate streaming patterns and error handling
525    // The actual streaming implementation will be completed when the
526    // openai-client-base streaming API is fully integrated
527
528    // Run all examples
529    if let Err(e) = example_basic_streaming().await {
530        eprintln!("Basic streaming example failed: {e}");
531    }
532
533    println!();
534
535    if let Err(e) = example_buffered_streaming().await {
536        eprintln!("Buffered streaming example failed: {e}");
537    }
538
539    println!();
540
541    example_streaming_error_handling();
542
543    println!();
544
545    if let Err(e) = example_streaming_tool_calls().await {
546        eprintln!("Tool calls example failed: {e}");
547    }
548
549    println!();
550
551    if let Err(e) = example_chunk_processing_patterns().await {
552        eprintln!("Chunk processing example failed: {e}");
553    }
554
555    println!("\n All streaming examples completed!");
556    println!("\n Key Takeaways:");
557    println!("   • SSE streaming requires careful chunk parsing");
558    println!("   • Buffer management prevents memory issues");
559    println!("   • Error handling is crucial for robust streaming");
560    println!("   • Tool calls can be streamed incrementally");
561    println!("   • Metrics help optimize streaming performance");
562
563    println!("\n Next Steps:");
564    println!("   • Integrate with openai-client-base streaming API");
565    println!("   • Add real streaming request execution");
566    println!("   • Implement automatic retry logic");
567    println!("   • Add streaming response caching");
568
569    Ok(())
570}