Skip to main content

a3s_code_core/llm/
openai.rs

1//! OpenAI-compatible LLM client
2
3use super::http::{default_http_client, normalize_base_url, HttpClient};
4use super::types::*;
5use super::LlmClient;
6use crate::llm::types::{ToolResultContent, ToolResultContentField};
7use crate::retry::{AttemptOutcome, RetryConfig};
8use anyhow::{Context, Result};
9use async_trait::async_trait;
10use futures::StreamExt;
11use serde::Deserialize;
12use std::sync::Arc;
13use std::time::Instant;
14use tokio::sync::mpsc;
15
16/// OpenAI client
17pub struct OpenAiClient {
18    pub(crate) provider_name: String,
19    pub(crate) api_key: SecretString,
20    pub(crate) model: String,
21    pub(crate) base_url: String,
22    pub(crate) chat_completions_path: String,
23    pub(crate) temperature: Option<f32>,
24    pub(crate) max_tokens: Option<usize>,
25    pub(crate) http: Arc<dyn HttpClient>,
26    pub(crate) retry_config: RetryConfig,
27}
28
29impl OpenAiClient {
30    pub(crate) fn parse_tool_arguments(tool_name: &str, arguments: &str) -> serde_json::Value {
31        if arguments.trim().is_empty() {
32            return serde_json::Value::Object(Default::default());
33        }
34
35        serde_json::from_str(arguments).unwrap_or_else(|e| {
36            tracing::warn!(
37                "Failed to parse tool arguments JSON for tool '{}': {}",
38                tool_name,
39                e
40            );
41            serde_json::json!({
42                "__parse_error": format!(
43                    "Malformed tool arguments: {}. Raw input: {}",
44                    e, arguments
45                )
46            })
47        })
48    }
49
50    fn merge_stream_text(text_content: &mut String, incoming: &str) -> Option<String> {
51        if incoming.is_empty() {
52            return None;
53        }
54        if text_content.is_empty() {
55            text_content.push_str(incoming);
56            return Some(incoming.to_string());
57        }
58        if incoming == text_content.as_str() || text_content.ends_with(incoming) {
59            return None;
60        }
61        if let Some(suffix) = incoming.strip_prefix(text_content.as_str()) {
62            if suffix.is_empty() {
63                return None;
64            }
65            text_content.push_str(suffix);
66            return Some(suffix.to_string());
67        }
68        text_content.push_str(incoming);
69        Some(incoming.to_string())
70    }
71
72    pub fn new(api_key: String, model: String) -> Self {
73        Self {
74            provider_name: "openai".to_string(),
75            api_key: SecretString::new(api_key),
76            model,
77            base_url: "https://api.openai.com".to_string(),
78            chat_completions_path: "/v1/chat/completions".to_string(),
79            temperature: None,
80            max_tokens: None,
81            http: default_http_client(),
82            retry_config: RetryConfig::default(),
83        }
84    }
85
86    pub fn with_base_url(mut self, base_url: String) -> Self {
87        self.base_url = normalize_base_url(&base_url);
88        self
89    }
90
91    pub fn with_provider_name(mut self, provider_name: impl Into<String>) -> Self {
92        self.provider_name = provider_name.into();
93        self
94    }
95
96    pub fn with_chat_completions_path(mut self, path: impl Into<String>) -> Self {
97        let path = path.into();
98        self.chat_completions_path = if path.starts_with('/') {
99            path
100        } else {
101            format!("/{}", path)
102        };
103        self
104    }
105
106    pub fn with_temperature(mut self, temperature: f32) -> Self {
107        self.temperature = Some(temperature);
108        self
109    }
110
111    pub fn with_max_tokens(mut self, max_tokens: usize) -> Self {
112        self.max_tokens = Some(max_tokens);
113        self
114    }
115
116    pub fn with_retry_config(mut self, retry_config: RetryConfig) -> Self {
117        self.retry_config = retry_config;
118        self
119    }
120
121    pub fn with_http_client(mut self, http: Arc<dyn HttpClient>) -> Self {
122        self.http = http;
123        self
124    }
125
126    pub(crate) fn convert_messages(&self, messages: &[Message]) -> Vec<serde_json::Value> {
127        messages
128            .iter()
129            .map(|msg| {
130                let content: serde_json::Value = if msg.content.len() == 1 {
131                    match &msg.content[0] {
132                        ContentBlock::Text { text } => serde_json::json!(text),
133                        ContentBlock::ToolResult {
134                            tool_use_id,
135                            content,
136                            ..
137                        } => {
138                            let content_str = match content {
139                                ToolResultContentField::Text(s) => s.clone(),
140                                ToolResultContentField::Blocks(blocks) => blocks
141                                    .iter()
142                                    .filter_map(|b| {
143                                        if let ToolResultContent::Text { text } = b {
144                                            Some(text.clone())
145                                        } else {
146                                            None
147                                        }
148                                    })
149                                    .collect::<Vec<_>>()
150                                    .join("\n"),
151                            };
152                            return serde_json::json!({
153                                "role": "tool",
154                                "tool_call_id": tool_use_id,
155                                "content": content_str,
156                            });
157                        }
158                        _ => serde_json::json!(""),
159                    }
160                } else {
161                    serde_json::json!(msg
162                        .content
163                        .iter()
164                        .map(|block| {
165                            match block {
166                                ContentBlock::Text { text } => serde_json::json!({
167                                    "type": "text",
168                                    "text": text,
169                                }),
170                                ContentBlock::Image { source } => serde_json::json!({
171                                    "type": "image_url",
172                                    "image_url": {
173                                        "url": format!(
174                                            "data:{};base64,{}",
175                                            source.media_type, source.data
176                                        ),
177                                    }
178                                }),
179                                ContentBlock::ToolUse { id, name, input } => serde_json::json!({
180                                    "type": "function",
181                                    "id": id,
182                                    "function": {
183                                        "name": name,
184                                        "arguments": input.to_string(),
185                                    }
186                                }),
187                                _ => serde_json::json!({}),
188                            }
189                        })
190                        .collect::<Vec<_>>())
191                };
192
193                // Handle assistant messages — kimi-k2.5 requires reasoning_content
194                // on all assistant messages when thinking mode is enabled
195                if msg.role == "assistant" {
196                    let rc = msg.reasoning_content.as_deref().unwrap_or("");
197                    let tool_calls: Vec<_> = msg.tool_calls();
198                    if !tool_calls.is_empty() {
199                        return serde_json::json!({
200                            "role": "assistant",
201                            "content": msg.text(),
202                            "reasoning_content": rc,
203                            "tool_calls": tool_calls.iter().map(|tc| {
204                                serde_json::json!({
205                                    "id": tc.id,
206                                    "type": "function",
207                                    "function": {
208                                        "name": tc.name,
209                                        "arguments": tc.args.to_string(),
210                                    }
211                                })
212                            }).collect::<Vec<_>>(),
213                        });
214                    }
215                    return serde_json::json!({
216                        "role": "assistant",
217                        "content": content,
218                        "reasoning_content": rc,
219                    });
220                }
221
222                serde_json::json!({
223                    "role": msg.role,
224                    "content": content,
225                })
226            })
227            .collect()
228    }
229
230    pub(crate) fn convert_tools(&self, tools: &[ToolDefinition]) -> Vec<serde_json::Value> {
231        tools
232            .iter()
233            .map(|t| {
234                serde_json::json!({
235                    "type": "function",
236                    "function": {
237                        "name": t.name,
238                        "description": t.description,
239                        "parameters": t.parameters,
240                    }
241                })
242            })
243            .collect()
244    }
245}
246
247#[async_trait]
248impl LlmClient for OpenAiClient {
249    async fn complete(
250        &self,
251        messages: &[Message],
252        system: Option<&str>,
253        tools: &[ToolDefinition],
254    ) -> Result<LlmResponse> {
255        {
256            let request_started_at = Instant::now();
257            let mut openai_messages = Vec::new();
258
259            if let Some(sys) = system {
260                openai_messages.push(serde_json::json!({
261                    "role": "system",
262                    "content": sys,
263                }));
264            }
265
266            openai_messages.extend(self.convert_messages(messages));
267
268            let mut request = serde_json::json!({
269                "model": self.model,
270                "messages": openai_messages,
271            });
272
273            if let Some(temp) = self.temperature {
274                request["temperature"] = serde_json::json!(temp);
275            }
276            if let Some(max) = self.max_tokens {
277                request["max_tokens"] = serde_json::json!(max);
278            }
279
280            if !tools.is_empty() {
281                request["tools"] = serde_json::json!(self.convert_tools(tools));
282            }
283
284            let url = format!("{}{}", self.base_url, self.chat_completions_path);
285            let auth_header = format!("Bearer {}", self.api_key.expose());
286            let headers = vec![("Authorization", auth_header.as_str())];
287
288            let response = crate::retry::with_retry(&self.retry_config, |_attempt| {
289                let http = &self.http;
290                let url = &url;
291                let headers = headers.clone();
292                let request = &request;
293                async move {
294                    match http.post(url, headers, request).await {
295                        Ok(resp) => {
296                            let status = reqwest::StatusCode::from_u16(resp.status)
297                                .unwrap_or(reqwest::StatusCode::INTERNAL_SERVER_ERROR);
298                            if status.is_success() {
299                                AttemptOutcome::Success(resp.body)
300                            } else if self.retry_config.is_retryable_status(status) {
301                                AttemptOutcome::Retryable {
302                                    status,
303                                    body: resp.body,
304                                    retry_after: None,
305                                }
306                            } else {
307                                AttemptOutcome::Fatal(anyhow::anyhow!(
308                                    "OpenAI API error at {} ({}): {}",
309                                    url,
310                                    status,
311                                    resp.body
312                                ))
313                            }
314                        }
315                        Err(e) => {
316                            eprintln!("[DEBUG] HTTP error: {:?}", e);
317                            AttemptOutcome::Fatal(e)
318                        }
319                    }
320                }
321            })
322            .await?;
323
324            let parsed: OpenAiResponse =
325                serde_json::from_str(&response).context("Failed to parse OpenAI response")?;
326
327            let choice = parsed.choices.into_iter().next().context("No choices")?;
328
329            let mut content = vec![];
330
331            let reasoning_content = choice.message.reasoning_content;
332
333            let text_content = choice.message.content;
334
335            if let Some(text) = text_content {
336                if !text.is_empty() {
337                    content.push(ContentBlock::Text { text });
338                }
339            }
340
341            if let Some(tool_calls) = choice.message.tool_calls {
342                for tc in tool_calls {
343                    content.push(ContentBlock::ToolUse {
344                        id: tc.id,
345                        name: tc.function.name.clone(),
346                        input: Self::parse_tool_arguments(
347                            &tc.function.name,
348                            &tc.function.arguments,
349                        ),
350                    });
351                }
352            }
353
354            let llm_response = LlmResponse {
355                message: Message {
356                    role: "assistant".to_string(),
357                    content,
358                    reasoning_content,
359                },
360                usage: TokenUsage {
361                    prompt_tokens: parsed.usage.prompt_tokens,
362                    completion_tokens: parsed.usage.completion_tokens,
363                    total_tokens: parsed.usage.total_tokens,
364                    cache_read_tokens: parsed
365                        .usage
366                        .prompt_tokens_details
367                        .as_ref()
368                        .and_then(|d| d.cached_tokens),
369                    cache_write_tokens: None,
370                },
371                stop_reason: choice.finish_reason,
372                meta: Some(LlmResponseMeta {
373                    provider: Some(self.provider_name.clone()),
374                    request_model: Some(self.model.clone()),
375                    request_url: Some(url.clone()),
376                    response_id: parsed.id,
377                    response_model: parsed.model,
378                    response_object: parsed.object,
379                    first_token_ms: None,
380                    duration_ms: Some(request_started_at.elapsed().as_millis() as u64),
381                }),
382            };
383
384            crate::telemetry::record_llm_usage(
385                llm_response.usage.prompt_tokens,
386                llm_response.usage.completion_tokens,
387                llm_response.usage.total_tokens,
388                llm_response.stop_reason.as_deref(),
389            );
390
391            Ok(llm_response)
392        }
393    }
394
395    async fn complete_streaming(
396        &self,
397        messages: &[Message],
398        system: Option<&str>,
399        tools: &[ToolDefinition],
400    ) -> Result<mpsc::Receiver<StreamEvent>> {
401        {
402            let request_started_at = Instant::now();
403            let mut openai_messages = Vec::new();
404
405            if let Some(sys) = system {
406                openai_messages.push(serde_json::json!({
407                    "role": "system",
408                    "content": sys,
409                }));
410            }
411
412            openai_messages.extend(self.convert_messages(messages));
413
414            let mut request = serde_json::json!({
415                "model": self.model,
416                "messages": openai_messages,
417                "stream": true,
418                "stream_options": { "include_usage": true },
419            });
420
421            if let Some(temp) = self.temperature {
422                request["temperature"] = serde_json::json!(temp);
423            }
424            if let Some(max) = self.max_tokens {
425                request["max_tokens"] = serde_json::json!(max);
426            }
427
428            if !tools.is_empty() {
429                request["tools"] = serde_json::json!(self.convert_tools(tools));
430            }
431
432            let url = format!("{}{}", self.base_url, self.chat_completions_path);
433            let auth_header = format!("Bearer {}", self.api_key.expose());
434            let headers = vec![("Authorization", auth_header.as_str())];
435
436            let streaming_resp = crate::retry::with_retry(&self.retry_config, |_attempt| {
437                let http = &self.http;
438                let url = &url;
439                let headers = headers.clone();
440                let request = &request;
441                async move {
442                    match http.post_streaming(url, headers, request).await {
443                        Ok(resp) => {
444                            let status = reqwest::StatusCode::from_u16(resp.status)
445                                .unwrap_or(reqwest::StatusCode::INTERNAL_SERVER_ERROR);
446                            if status.is_success() {
447                                AttemptOutcome::Success(resp)
448                            } else {
449                                let retry_after = resp
450                                    .retry_after
451                                    .as_deref()
452                                    .and_then(|v| RetryConfig::parse_retry_after(Some(v)));
453                                if self.retry_config.is_retryable_status(status) {
454                                    AttemptOutcome::Retryable {
455                                        status,
456                                        body: resp.error_body,
457                                        retry_after,
458                                    }
459                                } else {
460                                    AttemptOutcome::Fatal(anyhow::anyhow!(
461                                        "OpenAI API error at {} ({}): {}",
462                                        url,
463                                        status,
464                                        resp.error_body
465                                    ))
466                                }
467                            }
468                        }
469                        Err(e) => AttemptOutcome::Fatal(anyhow::anyhow!(
470                            "Failed to send streaming request: {}",
471                            e
472                        )),
473                    }
474                }
475            })
476            .await?;
477
478            let (tx, rx) = mpsc::channel(100);
479
480            let mut stream = streaming_resp.byte_stream;
481            let provider_name = self.provider_name.clone();
482            let request_model = self.model.clone();
483            let request_url = url.clone();
484            tokio::spawn(async move {
485                let mut buffer = String::new();
486                let mut content_blocks: Vec<ContentBlock> = Vec::new();
487                let mut text_content = String::new();
488                let mut reasoning_content_accum = String::new();
489                let mut tool_calls: std::collections::BTreeMap<usize, (String, String, String)> =
490                    std::collections::BTreeMap::new();
491                let mut usage = TokenUsage::default();
492                let mut finish_reason = None;
493                let mut response_id = None;
494                let mut response_model = None;
495                let mut response_object = None;
496                let mut first_token_ms = None;
497                let mut saw_done = false;
498                let mut parsed_any_event = false;
499
500                while let Some(chunk_result) = stream.next().await {
501                    let chunk = match chunk_result {
502                        Ok(c) => c,
503                        Err(e) => {
504                            tracing::error!("Stream error: {}", e);
505                            break;
506                        }
507                    };
508
509                    buffer.push_str(&String::from_utf8_lossy(&chunk));
510
511                    while let Some(event_end) = buffer.find("\n\n") {
512                        let event_data: String = buffer.drain(..event_end).collect();
513                        buffer.drain(..2);
514
515                        for line in event_data.lines() {
516                            if let Some(data) = line.strip_prefix("data: ") {
517                                if data == "[DONE]" {
518                                    saw_done = true;
519                                    if !text_content.is_empty() {
520                                        content_blocks.push(ContentBlock::Text {
521                                            text: text_content.clone(),
522                                        });
523                                    }
524                                    for (_, (id, name, args)) in tool_calls.iter() {
525                                        content_blocks.push(ContentBlock::ToolUse {
526                                            id: id.clone(),
527                                            name: name.clone(),
528                                            input: Self::parse_tool_arguments(name, args),
529                                        });
530                                    }
531                                    tool_calls.clear();
532                                    crate::telemetry::record_llm_usage(
533                                        usage.prompt_tokens,
534                                        usage.completion_tokens,
535                                        usage.total_tokens,
536                                        finish_reason.as_deref(),
537                                    );
538                                    let response = LlmResponse {
539                                        message: Message {
540                                            role: "assistant".to_string(),
541                                            content: std::mem::take(&mut content_blocks),
542                                            reasoning_content: if reasoning_content_accum.is_empty()
543                                            {
544                                                None
545                                            } else {
546                                                Some(std::mem::take(&mut reasoning_content_accum))
547                                            },
548                                        },
549                                        usage: usage.clone(),
550                                        stop_reason: std::mem::take(&mut finish_reason),
551                                        meta: Some(LlmResponseMeta {
552                                            provider: Some(provider_name.clone()),
553                                            request_model: Some(request_model.clone()),
554                                            request_url: Some(request_url.clone()),
555                                            response_id: response_id.clone(),
556                                            response_model: response_model.clone(),
557                                            response_object: response_object.clone(),
558                                            first_token_ms,
559                                            duration_ms: Some(
560                                                request_started_at.elapsed().as_millis() as u64,
561                                            ),
562                                        }),
563                                    };
564                                    let _ = tx.send(StreamEvent::Done(response)).await;
565                                    continue;
566                                }
567
568                                if let Ok(event) = serde_json::from_str::<OpenAiStreamChunk>(data) {
569                                    parsed_any_event = true;
570                                    if response_id.is_none() {
571                                        response_id = event.id.clone();
572                                    }
573                                    if response_model.is_none() {
574                                        response_model = event.model.clone();
575                                    }
576                                    if response_object.is_none() {
577                                        response_object = event.object.clone();
578                                    }
579                                    if let Some(u) = event.usage {
580                                        usage.prompt_tokens = u.prompt_tokens;
581                                        usage.completion_tokens = u.completion_tokens;
582                                        usage.total_tokens = u.total_tokens;
583                                        usage.cache_read_tokens = u
584                                            .prompt_tokens_details
585                                            .as_ref()
586                                            .and_then(|d| d.cached_tokens);
587                                    }
588
589                                    if let Some(choice) = event.choices.into_iter().next() {
590                                        if let Some(reason) = choice.finish_reason {
591                                            finish_reason = Some(reason);
592                                        }
593
594                                        let delta_content = choice
595                                            .delta
596                                            .as_ref()
597                                            .and_then(|delta| delta.content.clone());
598
599                                        if let Some(message) = choice.message {
600                                            if let Some(reasoning) = message.reasoning_content {
601                                                reasoning_content_accum.push_str(&reasoning);
602                                            }
603                                            if delta_content.is_none() {
604                                                if let Some(content) = message
605                                                    .content
606                                                    .filter(|value| !value.is_empty())
607                                                {
608                                                    if first_token_ms.is_none() {
609                                                        first_token_ms = Some(
610                                                            request_started_at.elapsed().as_millis()
611                                                                as u64,
612                                                        );
613                                                    }
614                                                    if let Some(delta) = Self::merge_stream_text(
615                                                        &mut text_content,
616                                                        &content,
617                                                    ) {
618                                                        let _ = tx
619                                                            .send(StreamEvent::TextDelta(delta))
620                                                            .await;
621                                                    }
622                                                }
623                                            }
624                                            if let Some(tcs) = message.tool_calls {
625                                                for (index, tc) in tcs.into_iter().enumerate() {
626                                                    tool_calls.insert(
627                                                        index,
628                                                        (
629                                                            tc.id,
630                                                            tc.function.name,
631                                                            tc.function.arguments,
632                                                        ),
633                                                    );
634                                                }
635                                            }
636                                        }
637
638                                        if let Some(delta) = choice.delta {
639                                            if let Some(ref rc) = delta.reasoning_content {
640                                                reasoning_content_accum.push_str(rc);
641                                            }
642
643                                            if let Some(content) = delta.content {
644                                                if first_token_ms.is_none() {
645                                                    first_token_ms = Some(
646                                                        request_started_at.elapsed().as_millis()
647                                                            as u64,
648                                                    );
649                                                }
650                                                if let Some(delta) = Self::merge_stream_text(
651                                                    &mut text_content,
652                                                    &content,
653                                                ) {
654                                                    let _ = tx
655                                                        .send(StreamEvent::TextDelta(delta))
656                                                        .await;
657                                                }
658                                            }
659
660                                            if let Some(tcs) = delta.tool_calls {
661                                                for tc in tcs {
662                                                    let entry = tool_calls
663                                                        .entry(tc.index)
664                                                        .or_insert_with(|| {
665                                                            (
666                                                                String::new(),
667                                                                String::new(),
668                                                                String::new(),
669                                                            )
670                                                        });
671
672                                                    if let Some(id) = tc.id {
673                                                        entry.0 = id;
674                                                    }
675                                                    if let Some(func) = tc.function {
676                                                        if let Some(name) = func.name {
677                                                            if first_token_ms.is_none() {
678                                                                first_token_ms = Some(
679                                                                    request_started_at
680                                                                        .elapsed()
681                                                                        .as_millis()
682                                                                        as u64,
683                                                                );
684                                                            }
685                                                            entry.1 = name.clone();
686                                                            let _ = tx
687                                                                .send(StreamEvent::ToolUseStart {
688                                                                    id: entry.0.clone(),
689                                                                    name,
690                                                                })
691                                                                .await;
692                                                        }
693                                                        if let Some(args) = func.arguments {
694                                                            entry.2.push_str(&args);
695                                                            let _ = tx
696                                                                .send(
697                                                                    StreamEvent::ToolUseInputDelta(
698                                                                        args,
699                                                                    ),
700                                                                )
701                                                                .await;
702                                                        }
703                                                    }
704                                                }
705                                            }
706                                        }
707                                    }
708                                }
709                            }
710                        }
711                    }
712                }
713
714                if saw_done {
715                    return;
716                }
717
718                let trailing = buffer.trim();
719                if !trailing.is_empty() {
720                    if let Ok(event) = serde_json::from_str::<OpenAiStreamChunk>(trailing) {
721                        parsed_any_event = true;
722                        if response_id.is_none() {
723                            response_id = event.id.clone();
724                        }
725                        if response_model.is_none() {
726                            response_model = event.model.clone();
727                        }
728                        if response_object.is_none() {
729                            response_object = event.object.clone();
730                        }
731                        if let Some(u) = event.usage {
732                            usage.prompt_tokens = u.prompt_tokens;
733                            usage.completion_tokens = u.completion_tokens;
734                            usage.total_tokens = u.total_tokens;
735                            usage.cache_read_tokens = u
736                                .prompt_tokens_details
737                                .as_ref()
738                                .and_then(|d| d.cached_tokens);
739                        }
740                        if let Some(choice) = event.choices.into_iter().next() {
741                            if let Some(reason) = choice.finish_reason {
742                                finish_reason = Some(reason);
743                            }
744                            let delta_content = choice
745                                .delta
746                                .as_ref()
747                                .and_then(|delta| delta.content.clone());
748                            if let Some(message) = choice.message {
749                                if let Some(reasoning) = message.reasoning_content {
750                                    reasoning_content_accum.push_str(&reasoning);
751                                }
752                                if delta_content.is_none() {
753                                    if let Some(content) =
754                                        message.content.filter(|value| !value.is_empty())
755                                    {
756                                        if first_token_ms.is_none() {
757                                            first_token_ms = Some(
758                                                request_started_at.elapsed().as_millis() as u64,
759                                            );
760                                        }
761                                        if let Some(delta) =
762                                            Self::merge_stream_text(&mut text_content, &content)
763                                        {
764                                            let _ = tx.send(StreamEvent::TextDelta(delta)).await;
765                                        }
766                                    }
767                                }
768                                if let Some(tcs) = message.tool_calls {
769                                    for (index, tc) in tcs.into_iter().enumerate() {
770                                        tool_calls.insert(
771                                            index,
772                                            (tc.id, tc.function.name, tc.function.arguments),
773                                        );
774                                    }
775                                }
776                            }
777                            if let Some(delta) = choice.delta {
778                                if let Some(ref rc) = delta.reasoning_content {
779                                    reasoning_content_accum.push_str(rc);
780                                }
781                                if let Some(content) = delta.content {
782                                    if first_token_ms.is_none() {
783                                        first_token_ms =
784                                            Some(request_started_at.elapsed().as_millis() as u64);
785                                    }
786                                    if let Some(delta) =
787                                        Self::merge_stream_text(&mut text_content, &content)
788                                    {
789                                        let _ = tx.send(StreamEvent::TextDelta(delta)).await;
790                                    }
791                                }
792                            }
793                        }
794                    } else if let Ok(response) = serde_json::from_str::<OpenAiResponse>(trailing) {
795                        parsed_any_event = true;
796                        response_id = response.id.clone();
797                        response_model = response.model.clone();
798                        response_object = response.object.clone();
799                        usage.prompt_tokens = response.usage.prompt_tokens;
800                        usage.completion_tokens = response.usage.completion_tokens;
801                        usage.total_tokens = response.usage.total_tokens;
802                        usage.cache_read_tokens = response
803                            .usage
804                            .prompt_tokens_details
805                            .as_ref()
806                            .and_then(|d| d.cached_tokens);
807
808                        if let Some(choice) = response.choices.into_iter().next() {
809                            finish_reason = choice.finish_reason;
810                            if let Some(text) =
811                                choice.message.content.filter(|text| !text.is_empty())
812                            {
813                                if first_token_ms.is_none() {
814                                    first_token_ms =
815                                        Some(request_started_at.elapsed().as_millis() as u64);
816                                }
817                                let _ = Self::merge_stream_text(&mut text_content, &text);
818                            }
819                            if let Some(reasoning) = choice.message.reasoning_content {
820                                reasoning_content_accum.push_str(&reasoning);
821                            }
822                            if let Some(final_tool_calls) = choice.message.tool_calls {
823                                for tc in final_tool_calls {
824                                    tool_calls.insert(
825                                        tool_calls.len(),
826                                        (tc.id, tc.function.name, tc.function.arguments),
827                                    );
828                                }
829                            }
830                        }
831                    }
832                }
833
834                if parsed_any_event
835                    || !text_content.is_empty()
836                    || !tool_calls.is_empty()
837                    || !content_blocks.is_empty()
838                {
839                    tracing::warn!(
840                        provider = %provider_name,
841                        model = %request_model,
842                        "OpenAI-compatible stream ended without [DONE]; finalizing buffered response"
843                    );
844                    if !text_content.is_empty() {
845                        content_blocks.push(ContentBlock::Text {
846                            text: text_content.clone(),
847                        });
848                    }
849                    for (_, (id, name, args)) in tool_calls.iter() {
850                        content_blocks.push(ContentBlock::ToolUse {
851                            id: id.clone(),
852                            name: name.clone(),
853                            input: Self::parse_tool_arguments(name, args),
854                        });
855                    }
856                    tool_calls.clear();
857                    crate::telemetry::record_llm_usage(
858                        usage.prompt_tokens,
859                        usage.completion_tokens,
860                        usage.total_tokens,
861                        finish_reason.as_deref(),
862                    );
863                    let response = LlmResponse {
864                        message: Message {
865                            role: "assistant".to_string(),
866                            content: std::mem::take(&mut content_blocks),
867                            reasoning_content: if reasoning_content_accum.is_empty() {
868                                None
869                            } else {
870                                Some(std::mem::take(&mut reasoning_content_accum))
871                            },
872                        },
873                        usage: usage.clone(),
874                        stop_reason: std::mem::take(&mut finish_reason),
875                        meta: Some(LlmResponseMeta {
876                            provider: Some(provider_name.clone()),
877                            request_model: Some(request_model.clone()),
878                            request_url: Some(request_url.clone()),
879                            response_id: response_id.clone(),
880                            response_model: response_model.clone(),
881                            response_object: response_object.clone(),
882                            first_token_ms,
883                            duration_ms: Some(request_started_at.elapsed().as_millis() as u64),
884                        }),
885                    };
886                    let _ = tx.send(StreamEvent::Done(response)).await;
887                } else {
888                    tracing::warn!(
889                        provider = %provider_name,
890                        model = %request_model,
891                        trailing = %trailing.chars().take(400).collect::<String>(),
892                        "OpenAI-compatible stream ended without any parseable events"
893                    );
894                }
895            });
896
897            Ok(rx)
898        }
899    }
900}
901
902// OpenAI API response types (private)
903#[derive(Debug, Deserialize)]
904pub(crate) struct OpenAiResponse {
905    #[serde(default)]
906    pub(crate) id: Option<String>,
907    #[serde(default)]
908    pub(crate) object: Option<String>,
909    #[serde(default)]
910    pub(crate) model: Option<String>,
911    pub(crate) choices: Vec<OpenAiChoice>,
912    pub(crate) usage: OpenAiUsage,
913}
914
915#[derive(Debug, Deserialize)]
916pub(crate) struct OpenAiChoice {
917    pub(crate) message: OpenAiMessage,
918    pub(crate) finish_reason: Option<String>,
919}
920
921#[derive(Debug, Deserialize)]
922pub(crate) struct OpenAiMessage {
923    pub(crate) reasoning_content: Option<String>,
924    pub(crate) content: Option<String>,
925    pub(crate) tool_calls: Option<Vec<OpenAiToolCall>>,
926}
927
928#[derive(Debug, Deserialize)]
929pub(crate) struct OpenAiToolCall {
930    pub(crate) id: String,
931    pub(crate) function: OpenAiFunction,
932}
933
934#[derive(Debug, Deserialize)]
935pub(crate) struct OpenAiFunction {
936    pub(crate) name: String,
937    pub(crate) arguments: String,
938}
939
940#[derive(Debug, Deserialize)]
941pub(crate) struct OpenAiUsage {
942    #[serde(default)]
943    pub(crate) prompt_tokens: usize,
944    #[serde(default)]
945    pub(crate) completion_tokens: usize,
946    #[serde(default)]
947    pub(crate) total_tokens: usize,
948    /// OpenAI returns cached token count in `prompt_tokens_details.cached_tokens`
949    #[serde(default)]
950    pub(crate) prompt_tokens_details: Option<OpenAiPromptTokensDetails>,
951}
952
953#[derive(Debug, Deserialize)]
954pub(crate) struct OpenAiPromptTokensDetails {
955    #[serde(default)]
956    pub(crate) cached_tokens: Option<usize>,
957}
958
959// OpenAI streaming types
960#[derive(Debug, Deserialize)]
961pub(crate) struct OpenAiStreamChunk {
962    #[serde(default)]
963    pub(crate) id: Option<String>,
964    #[serde(default)]
965    pub(crate) object: Option<String>,
966    #[serde(default)]
967    pub(crate) model: Option<String>,
968    pub(crate) choices: Vec<OpenAiStreamChoice>,
969    pub(crate) usage: Option<OpenAiUsage>,
970}
971
972#[derive(Debug, Deserialize)]
973pub(crate) struct OpenAiStreamChoice {
974    pub(crate) message: Option<OpenAiMessage>,
975    pub(crate) delta: Option<OpenAiDelta>,
976    pub(crate) finish_reason: Option<String>,
977}
978
979#[derive(Debug, Deserialize)]
980pub(crate) struct OpenAiDelta {
981    pub(crate) reasoning_content: Option<String>,
982    pub(crate) content: Option<String>,
983    pub(crate) tool_calls: Option<Vec<OpenAiToolCallDelta>>,
984}
985
986#[derive(Debug, Deserialize)]
987pub(crate) struct OpenAiToolCallDelta {
988    pub(crate) index: usize,
989    pub(crate) id: Option<String>,
990    pub(crate) function: Option<OpenAiFunctionDelta>,
991}
992
993#[derive(Debug, Deserialize)]
994pub(crate) struct OpenAiFunctionDelta {
995    pub(crate) name: Option<String>,
996    pub(crate) arguments: Option<String>,
997}