Skip to main content

llm/providers/openai/
streaming.rs

1use async_openai::types::chat::{
2    CompletionUsage, CreateChatCompletionStreamResponse, FinishReason as OpenAiFinishReason,
3};
4use async_stream;
5use tokio_stream::{Stream, StreamExt};
6use tracing::debug;
7
8use crate::providers::tool_call_collector::ToolCallCollector;
9use crate::{LlmError, LlmResponse, Result, StopReason, TokenUsage};
10
11impl From<CompletionUsage> for TokenUsage {
12    fn from(usage: CompletionUsage) -> Self {
13        let prompt = usage.prompt_tokens_details.unwrap_or_default();
14        let completion = usage.completion_tokens_details.unwrap_or_default();
15        TokenUsage {
16            input_tokens: usage.prompt_tokens,
17            output_tokens: usage.completion_tokens,
18            cache_read_tokens: prompt.cached_tokens,
19            input_audio_tokens: prompt.audio_tokens,
20            reasoning_tokens: completion.reasoning_tokens,
21            output_audio_tokens: completion.audio_tokens,
22            accepted_prediction_tokens: completion.accepted_prediction_tokens,
23            rejected_prediction_tokens: completion.rejected_prediction_tokens,
24            ..TokenUsage::default()
25        }
26    }
27}
28
29/// Common stream processing logic that handles tool call state tracking and event emission.
30/// Works with standard `async_openai` `CreateChatCompletionStreamResponse` types.
31pub fn process_completion_stream<E: Into<LlmError> + Send>(
32    mut stream: impl Stream<Item = std::result::Result<CreateChatCompletionStreamResponse, E>> + Send + Unpin,
33) -> impl Stream<Item = Result<LlmResponse>> + Send {
34    async_stream::stream! {
35        let message_id = uuid::Uuid::new_v4().to_string();
36        yield Ok(LlmResponse::Start { message_id });
37
38        let mut collector = ToolCallCollector::<u32>::new();
39        let mut last_stop_reason: Option<StopReason> = None;
40
41        while let Some(result) = stream.next().await {
42            match result {
43                Ok(mut response) => {
44                    // Emit usage information if available
45                    // This must be checked on every chunk since usage may come
46                    // in a separate final chunk after finish_reason
47                    if let Some(usage) = response.usage {
48                        yield Ok(LlmResponse::Usage { tokens: usage.into() });
49                    }
50
51                    if let Some(choice) = response.choices.pop() {
52                        let delta = choice.delta;
53
54                        if let Some(content) = delta.content
55                            && !content.is_empty() {
56                                // If we have pending tool calls and now we're getting content,
57                                // complete all tool calls first
58                                for tool_call in collector.complete_all() {
59                                    yield Ok(LlmResponse::ToolRequestComplete { tool_call });
60                                }
61                                yield Ok(LlmResponse::Text { chunk: content });
62                            }
63
64                        if let Some(tool_calls) = delta.tool_calls {
65                            for tc in tool_calls {
66                                let (id, name, args) = match tc.function {
67                                    Some(f) => (tc.id, f.name, f.arguments),
68                                    None => (tc.id, None, None),
69                                };
70                                for response in collector.handle_delta(tc.index, id, name, args) {
71                                    yield Ok(response);
72                                }
73                            }
74                        }
75
76                        if let Some(finish_reason) = choice.finish_reason {
77                            let finish_reason_str = format!("{finish_reason:?}");
78                            debug!("Received finish reason: {finish_reason_str}");
79                            last_stop_reason = Some(map_openai_finish_reason(finish_reason));
80
81                            for tool_call in collector.complete_all() {
82                                yield Ok(LlmResponse::ToolRequestComplete { tool_call });
83                            }
84                            // Don't break yet - continue to capture usage from subsequent chunks
85                            // OpenRouter sends usage in the last SSE message after finish_reason
86                            // See: https://openrouter.ai/docs/guides/usage-accounting
87                        }
88                    } else {
89                        // No choices in this chunk - could be:
90                        // 1. Final usage-only chunk after finish_reason (OpenRouter)
91                        // 2. Stream is done (some providers)
92                        // We already extracted usage above if present
93                        debug!("No choices in response, ending stream");
94                        for tool_call in collector.complete_all() {
95                            yield Ok(LlmResponse::ToolRequestComplete { tool_call });
96                        }
97                        break;
98                    }
99                }
100                Err(e) => {
101                    yield Err(e.into());
102                    break;
103                }
104            }
105        }
106
107        yield Ok(LlmResponse::Done {
108            stop_reason: last_stop_reason,
109        });
110    }
111}
112
113fn map_openai_finish_reason(reason: OpenAiFinishReason) -> StopReason {
114    match reason {
115        OpenAiFinishReason::Stop => StopReason::EndTurn,
116        OpenAiFinishReason::Length => StopReason::Length,
117        OpenAiFinishReason::ToolCalls => StopReason::ToolCalls,
118        OpenAiFinishReason::ContentFilter => StopReason::ContentFilter,
119        OpenAiFinishReason::FunctionCall => StopReason::FunctionCall,
120    }
121}