llm/providers/openai/
streaming.rs1use async_openai::types::chat::{
2 CompletionUsage, CreateChatCompletionStreamResponse, FinishReason as OpenAiFinishReason,
3};
4use async_stream;
5use tokio_stream::{Stream, StreamExt};
6use tracing::debug;
7
8use crate::providers::tool_call_collector::ToolCallCollector;
9use crate::{LlmError, LlmResponse, Result, StopReason, TokenUsage};
10
11impl From<CompletionUsage> for TokenUsage {
12 fn from(usage: CompletionUsage) -> Self {
13 let prompt = usage.prompt_tokens_details.unwrap_or_default();
14 let completion = usage.completion_tokens_details.unwrap_or_default();
15 TokenUsage {
16 input_tokens: usage.prompt_tokens,
17 output_tokens: usage.completion_tokens,
18 cache_read_tokens: prompt.cached_tokens,
19 input_audio_tokens: prompt.audio_tokens,
20 reasoning_tokens: completion.reasoning_tokens,
21 output_audio_tokens: completion.audio_tokens,
22 accepted_prediction_tokens: completion.accepted_prediction_tokens,
23 rejected_prediction_tokens: completion.rejected_prediction_tokens,
24 ..TokenUsage::default()
25 }
26 }
27}
28
29pub fn process_completion_stream<E: Into<LlmError> + Send>(
32 mut stream: impl Stream<Item = std::result::Result<CreateChatCompletionStreamResponse, E>> + Send + Unpin,
33) -> impl Stream<Item = Result<LlmResponse>> + Send {
34 async_stream::stream! {
35 let message_id = uuid::Uuid::new_v4().to_string();
36 yield Ok(LlmResponse::Start { message_id });
37
38 let mut collector = ToolCallCollector::<u32>::new();
39 let mut last_stop_reason: Option<StopReason> = None;
40
41 while let Some(result) = stream.next().await {
42 match result {
43 Ok(mut response) => {
44 if let Some(usage) = response.usage {
48 yield Ok(LlmResponse::Usage { tokens: usage.into() });
49 }
50
51 if let Some(choice) = response.choices.pop() {
52 let delta = choice.delta;
53
54 if let Some(content) = delta.content
55 && !content.is_empty() {
56 for tool_call in collector.complete_all() {
59 yield Ok(LlmResponse::ToolRequestComplete { tool_call });
60 }
61 yield Ok(LlmResponse::Text { chunk: content });
62 }
63
64 if let Some(tool_calls) = delta.tool_calls {
65 for tc in tool_calls {
66 let (id, name, args) = match tc.function {
67 Some(f) => (tc.id, f.name, f.arguments),
68 None => (tc.id, None, None),
69 };
70 for response in collector.handle_delta(tc.index, id, name, args) {
71 yield Ok(response);
72 }
73 }
74 }
75
76 if let Some(finish_reason) = choice.finish_reason {
77 let finish_reason_str = format!("{finish_reason:?}");
78 debug!("Received finish reason: {finish_reason_str}");
79 last_stop_reason = Some(map_openai_finish_reason(finish_reason));
80
81 for tool_call in collector.complete_all() {
82 yield Ok(LlmResponse::ToolRequestComplete { tool_call });
83 }
84 }
88 } else {
89 debug!("No choices in response, ending stream");
94 for tool_call in collector.complete_all() {
95 yield Ok(LlmResponse::ToolRequestComplete { tool_call });
96 }
97 break;
98 }
99 }
100 Err(e) => {
101 yield Err(e.into());
102 break;
103 }
104 }
105 }
106
107 yield Ok(LlmResponse::Done {
108 stop_reason: last_stop_reason,
109 });
110 }
111}
112
113fn map_openai_finish_reason(reason: OpenAiFinishReason) -> StopReason {
114 match reason {
115 OpenAiFinishReason::Stop => StopReason::EndTurn,
116 OpenAiFinishReason::Length => StopReason::Length,
117 OpenAiFinishReason::ToolCalls => StopReason::ToolCalls,
118 OpenAiFinishReason::ContentFilter => StopReason::ContentFilter,
119 OpenAiFinishReason::FunctionCall => StopReason::FunctionCall,
120 }
121}