1#![allow(clippy::uninlined_format_args)]
2use futures::StreamExt;
12use openai_ergonomic::{Client, Error, Result};
13use serde_json::Value;
14use std::time::Duration;
15use tokio::io::{AsyncBufReadExt, BufReader};
16use tokio_stream::wrappers::LinesStream;
17
18#[derive(Debug, Clone)]
20pub struct StreamChunk {
21 pub data: String,
23 pub content_delta: Option<String>,
25 pub is_done: bool,
27 pub tool_call_delta: Option<Value>,
29}
30
31impl StreamChunk {
32 pub fn parse(line: &str) -> Result<Option<Self>> {
34 if !line.starts_with("data: ") {
36 return Ok(None);
37 }
38
39 let data = line.strip_prefix("data: ").unwrap_or("");
40
41 if data.trim() == "[DONE]" {
43 return Ok(Some(Self {
44 data: data.to_string(),
45 content_delta: None,
46 is_done: true,
47 tool_call_delta: None,
48 }));
49 }
50
51 let json: Value = serde_json::from_str(data).map_err(|e| Error::StreamParsing {
53 message: format!("Failed to parse chunk JSON: {e}"),
54 chunk: data.to_string(),
55 })?;
56
57 let content_delta = json["choices"][0]["delta"]["content"]
59 .as_str()
60 .map(ToString::to_string);
61
62 let tool_call_delta = json["choices"][0]["delta"]["tool_calls"]
64 .as_array()
65 .and_then(|arr| arr.first())
66 .cloned();
67
68 Ok(Some(Self {
69 data: data.to_string(),
70 content_delta,
71 is_done: false,
72 tool_call_delta,
73 }))
74 }
75
76 pub fn content(&self) -> Option<&str> {
78 self.content_delta.as_deref()
79 }
80
81 pub const fn has_tool_call(&self) -> bool {
83 self.tool_call_delta.is_some()
84 }
85}
86
87pub struct ResponseStream {
89 lines_stream: LinesStream<BufReader<Box<dyn tokio::io::AsyncRead + Send + Unpin>>>,
90 finished: bool,
91}
92
93impl ResponseStream {
94 pub fn new(reader: Box<dyn tokio::io::AsyncRead + Send + Unpin>) -> Self {
96 let buf_reader = BufReader::new(reader);
97 let lines_stream = LinesStream::new(buf_reader.lines());
98
99 Self {
100 lines_stream,
101 finished: false,
102 }
103 }
104
105 pub async fn next_chunk(&mut self) -> Result<Option<StreamChunk>> {
107 if self.finished {
108 return Ok(None);
109 }
110
111 while let Some(line_result) = self.lines_stream.next().await {
112 let line = line_result.map_err(|e| Error::StreamConnection {
113 message: format!("Stream read error: {e}"),
114 })?;
115
116 if line.trim().is_empty() {
118 continue;
119 }
120
121 if let Some(chunk) = StreamChunk::parse(&line)? {
123 if chunk.is_done {
124 self.finished = true;
125 }
126 return Ok(Some(chunk));
127 }
128 }
129
130 self.finished = true;
132 Ok(None)
133 }
134
135 pub async fn collect_remaining(&mut self) -> Result<String> {
137 let mut content = String::new();
138
139 while let Some(chunk) = self.next_chunk().await? {
140 if let Some(delta) = chunk.content() {
141 content.push_str(delta);
142 }
143 }
144
145 Ok(content)
146 }
147
148 pub const fn is_finished(&self) -> bool {
150 self.finished
151 }
152}
153
154pub struct StreamBuffer {
156 content: String,
157 capacity: usize,
158 high_water_mark: usize,
159}
160
161impl StreamBuffer {
162 pub fn new(capacity: usize) -> Self {
164 Self {
165 content: String::with_capacity(capacity),
166 capacity,
167 high_water_mark: capacity * 3 / 4, }
169 }
170
171 pub fn append(&mut self, content: &str) -> Result<()> {
173 if self.content.len() + content.len() > self.capacity {
175 return Err(Error::StreamBuffer {
176 message: format!(
177 "Buffer capacity exceeded: {} + {} > {}",
178 self.content.len(),
179 content.len(),
180 self.capacity
181 ),
182 });
183 }
184
185 self.content.push_str(content);
186 Ok(())
187 }
188
189 pub fn content(&self) -> &str {
191 &self.content
192 }
193
194 pub fn clear(&mut self) {
196 self.content.clear();
197 }
198
199 pub fn is_high_water(&self) -> bool {
201 self.content.len() >= self.high_water_mark
202 }
203
204 pub fn utilization(&self) -> f64 {
206 #[allow(clippy::cast_precision_loss)]
207 {
208 (self.content.len() as f64 / self.capacity as f64) * 100.0
209 }
210 }
211
212 pub fn compact(&mut self, keep_last_chars: usize) {
214 if self.content.len() > keep_last_chars {
215 let start_pos = self.content.len() - keep_last_chars;
216 self.content = self.content[start_pos..].to_string();
217 }
218 }
219}
220
221async fn example_basic_streaming() -> Result<()> {
223 println!("=== Basic Streaming Example ===");
224
225 println!("Creating client and streaming request...");
228
229 let client = Client::from_env()?.build();
230
231 let _streaming_request = client
233 .responses()
234 .user("Tell me a short story about a robot learning to paint")
235 .stream(true)
236 .temperature(0.7)
237 .max_completion_tokens(500);
238
239 println!("Streaming request configured:");
240 println!("- Model: Default (gpt-4)");
241 println!("- Stream: true");
242 println!("- Temperature: 0.7");
243 println!("- Max tokens: 500");
244
245 let sample_chunks = vec![
247 "Once", " upon", " a", " time,", " there", " was", " a", " little", " robot", " named",
248 " Pixel", "...",
249 ];
250
251 println!("\nSimulated streaming output:");
252 print!("> ");
253 for chunk in sample_chunks {
254 print!("{chunk}");
255 std::io::Write::flush(&mut std::io::stdout()).unwrap();
256 tokio::time::sleep(Duration::from_millis(100)).await;
257 }
258 println!("\n");
259
260 Ok(())
261}
262
263async fn example_buffered_streaming() -> Result<()> {
265 println!("=== Buffered Streaming Example ===");
266
267 let mut buffer = StreamBuffer::new(1024); let chunks = [
271 "The robot's optical sensors",
272 " detected the vibrant colors",
273 " of the sunset painting",
274 " hanging in the gallery.",
275 " For the first time,",
276 " Pixel felt something",
277 " that could only be",
278 " described as wonder.",
279 ];
280
281 println!("Processing chunks with buffer management:");
282
283 for (i, chunk) in chunks.iter().enumerate() {
284 buffer.append(chunk)?;
286
287 println!(
288 "Chunk {}: '{}' (Buffer: {:.1}% full)",
289 i + 1,
290 chunk,
291 buffer.utilization()
292 );
293
294 if buffer.is_high_water() {
296 println!(" ā ļø Buffer high water mark reached, consider processing");
297
298 buffer.compact(100); println!(" š Buffer compacted to {:.1}%", buffer.utilization());
304 }
305
306 tokio::time::sleep(Duration::from_millis(50)).await;
307 }
308
309 println!(
310 "\nFinal content length: {} characters",
311 buffer.content().len()
312 );
313 println!(
314 "Final content: \"{}...\"",
315 &buffer.content()[..buffer.content().len().min(50)]
316 );
317
318 Ok(())
319}
320
321fn example_streaming_error_handling() {
323 println!("=== Streaming Error Handling Example ===");
324
325 println!("Demonstrating common streaming error scenarios:");
327
328 println!("\n1. Connection Error Simulation:");
330 let connection_result: Result<()> = Err(Error::StreamConnection {
331 message: "Connection lost to streaming endpoint".to_string(),
332 });
333
334 match connection_result {
335 Err(Error::StreamConnection { message }) => {
336 println!(" ā Connection error handled: {message}");
337 println!(" š Would implement retry logic here");
338 }
339 _ => unreachable!(),
340 }
341
342 println!("\n2. Parse Error Simulation:");
344 let malformed_chunk = "data: {invalid json}";
345 match StreamChunk::parse(malformed_chunk) {
346 Err(Error::StreamParsing { message, chunk }) => {
347 println!(" ā Parse error handled: {message}");
348 println!(" š Problematic chunk: {chunk}");
349 println!(" š Would skip chunk and continue");
350 }
351 _ => println!(" ā
Chunk parsed successfully"),
352 }
353
354 println!("\n3. Buffer Overflow Simulation:");
356 let mut small_buffer = StreamBuffer::new(10); let large_chunk = "This chunk is definitely too large for our tiny buffer";
358
359 match small_buffer.append(large_chunk) {
360 Err(Error::StreamBuffer { message }) => {
361 println!(" ā Buffer error handled: {message}");
362 println!(" š Would implement buffer resizing or chunking");
363 }
364 Ok(()) => println!(" ā
Content added to buffer"),
365 Err(e) => println!(" ā Unexpected error: {e}"),
366 }
367
368 println!("\n4. Timeout Handling:");
370 println!(" ā±ļø Would implement timeout for stream chunks");
371 println!(" š Would retry or fail gracefully on timeout");
372}
373
374async fn example_streaming_tool_calls() -> Result<()> {
376 println!("=== Streaming Tool Calls Example ===");
377
378 let client = Client::from_env()?.build();
379
380 let weather_tool = openai_ergonomic::responses::tool_function(
382 "get_weather",
383 "Get current weather for a location",
384 serde_json::json!({
385 "type": "object",
386 "properties": {
387 "location": {
388 "type": "string",
389 "description": "City name"
390 }
391 },
392 "required": ["location"]
393 }),
394 );
395
396 let _tool_request = client
398 .responses()
399 .user("What's the weather like in San Francisco?")
400 .tool(weather_tool)
401 .stream(true);
402
403 println!("Streaming tool call request configured:");
404 println!("- Tool: get_weather function");
405 println!("- Streaming: enabled");
406
407 println!("\nSimulated streaming tool call:");
409
410 let tool_chunks = [
411 r#"{"choices":[{"delta":{"tool_calls":[{"index":0,"id":"call_123","type":"function","function":{"name":"get_weather"}}]}}]}"#,
412 r#"{"choices":[{"delta":{"tool_calls":[{"index":0,"function":{"arguments":"{"}}]}}]}"#,
413 r#"{"choices":[{"delta":{"tool_calls":[{"index":0,"function":{"arguments":"\"location\""}}]}}]}"#,
414 r#"{"choices":[{"delta":{"tool_calls":[{"index":0,"function":{"arguments":":"}}]}}]}"#,
415 r#"{"choices":[{"delta":{"tool_calls":[{"index":0,"function":{"arguments":"\"San Francisco\""}}]}}]}"#,
416 r#"{"choices":[{"delta":{"tool_calls":[{"index":0,"function":{"arguments":"}"}}]}}]}"#,
417 ];
418
419 let mut tool_call_buffer = String::new();
420
421 for (i, chunk_data) in tool_chunks.iter().enumerate() {
422 let chunk_line = format!("data: {chunk_data}");
423
424 if let Some(chunk) = StreamChunk::parse(&chunk_line)? {
425 if chunk.has_tool_call() {
426 println!("Chunk {}: Tool call data received", i + 1);
427
428 if let Some(tool_data) = &chunk.tool_call_delta {
430 if let Some(args) = tool_data["function"]["arguments"].as_str() {
431 tool_call_buffer.push_str(args);
432 println!(" Arguments so far: {tool_call_buffer}");
433 }
434 }
435 }
436 }
437
438 tokio::time::sleep(Duration::from_millis(100)).await;
439 }
440
441 println!("\nā
Complete tool call arguments: {tool_call_buffer}");
442 println!("š§ Would now execute get_weather(location='San Francisco')");
443
444 Ok(())
445}
446
447#[allow(clippy::cast_precision_loss)]
449async fn example_chunk_processing_patterns() -> Result<()> {
450 println!("=== Chunk Processing Patterns ===");
451
452 #[allow(clippy::items_after_statements)]
453 #[derive(Debug, Default)]
454 struct StreamMetrics {
455 total_chunks: usize,
456 content_chunks: usize,
457 tool_chunks: usize,
458 total_bytes: usize,
459 processing_time: Duration,
460 }
461
462 let mut metrics = StreamMetrics::default();
463 let start_time = std::time::Instant::now();
464
465 let sample_chunks = [
467 "data: {\"choices\":[{\"delta\":{\"content\":\"Hello\"}}]}",
468 "data: {\"choices\":[{\"delta\":{\"content\":\" world!\"}}]}",
469 "data: {\"choices\":[{\"delta\":{\"tool_calls\":[{\"function\":{\"name\":\"test\"}}]}}]}",
470 "data: {\"choices\":[{\"delta\":{\"content\":\" How are you?\"}}]}",
471 "data: [DONE]",
472 ];
473
474 println!("Processing {} chunks with metrics:", sample_chunks.len());
475
476 for (i, chunk_line) in sample_chunks.iter().enumerate() {
477 if let Some(chunk) = StreamChunk::parse(chunk_line)? {
478 metrics.total_chunks += 1;
479 metrics.total_bytes += chunk.data.len();
480
481 if chunk.content().is_some() {
482 metrics.content_chunks += 1;
483 println!(
484 "Chunk {}: Content chunk - '{}'",
485 i + 1,
486 chunk.content().unwrap_or("")
487 );
488 } else if chunk.has_tool_call() {
489 metrics.tool_chunks += 1;
490 println!("Chunk {}: Tool call chunk", i + 1);
491 } else if chunk.is_done {
492 println!("Chunk {}: Stream completion marker", i + 1);
493 }
494
495 tokio::time::sleep(Duration::from_millis(10)).await;
497 }
498 }
499
500 metrics.processing_time = start_time.elapsed();
501
502 println!("\nš Stream Processing Metrics:");
503 println!(" Total chunks: {}", metrics.total_chunks);
504 println!(" Content chunks: {}", metrics.content_chunks);
505 println!(" Tool call chunks: {}", metrics.tool_chunks);
506 println!(" Total bytes: {}", metrics.total_bytes);
507 println!(" Processing time: {:?}", metrics.processing_time);
508 println!(
509 " Avg bytes/chunk: {:.1}",
510 metrics.total_bytes as f64 / metrics.total_chunks as f64
511 );
512
513 Ok(())
514}
515
516#[tokio::main]
517async fn main() -> Result<()> {
518 tracing_subscriber::fmt::init();
520
521 println!("š¤ OpenAI Ergonomic - Streaming Responses Examples");
522 println!("================================================\n");
523
524 if let Err(e) = example_basic_streaming().await {
530 eprintln!("Basic streaming example failed: {e}");
531 }
532
533 println!();
534
535 if let Err(e) = example_buffered_streaming().await {
536 eprintln!("Buffered streaming example failed: {e}");
537 }
538
539 println!();
540
541 example_streaming_error_handling();
542
543 println!();
544
545 if let Err(e) = example_streaming_tool_calls().await {
546 eprintln!("Tool calls example failed: {e}");
547 }
548
549 println!();
550
551 if let Err(e) = example_chunk_processing_patterns().await {
552 eprintln!("Chunk processing example failed: {e}");
553 }
554
555 println!("\nā
All streaming examples completed!");
556 println!("\nš” Key Takeaways:");
557 println!(" ⢠SSE streaming requires careful chunk parsing");
558 println!(" ⢠Buffer management prevents memory issues");
559 println!(" ⢠Error handling is crucial for robust streaming");
560 println!(" ⢠Tool calls can be streamed incrementally");
561 println!(" ⢠Metrics help optimize streaming performance");
562
563 println!("\nš Next Steps:");
564 println!(" ⢠Integrate with openai-client-base streaming API");
565 println!(" ⢠Add real streaming request execution");
566 println!(" ⢠Implement automatic retry logic");
567 println!(" ⢠Add streaming response caching");
568
569 Ok(())
570}