Skip to main content

a3s_code_core/llm/
openai.rs

1//! OpenAI-compatible LLM client
2
3use super::http::{default_http_client, normalize_base_url, HttpClient};
4use super::types::*;
5use super::LlmClient;
6use crate::llm::types::{ToolResultContent, ToolResultContentField};
7use crate::retry::{AttemptOutcome, RetryConfig};
8use anyhow::{Context, Result};
9use async_trait::async_trait;
10use futures::StreamExt;
11use serde::Deserialize;
12use std::collections::HashMap;
13use std::sync::Arc;
14use std::time::Instant;
15use tokio::sync::mpsc;
16
17/// OpenAI client
18pub struct OpenAiClient {
19    pub(crate) provider_name: String,
20    pub(crate) api_key: SecretString,
21    pub(crate) model: String,
22    pub(crate) base_url: String,
23    pub(crate) chat_completions_path: String,
24    pub(crate) headers: HashMap<String, String>,
25    pub(crate) temperature: Option<f32>,
26    pub(crate) max_tokens: Option<usize>,
27    pub(crate) http: Arc<dyn HttpClient>,
28    pub(crate) retry_config: RetryConfig,
29}
30
31impl OpenAiClient {
32    pub(crate) fn parse_tool_arguments(tool_name: &str, arguments: &str) -> serde_json::Value {
33        if arguments.trim().is_empty() {
34            return serde_json::Value::Object(Default::default());
35        }
36
37        serde_json::from_str(arguments).unwrap_or_else(|e| {
38            tracing::warn!(
39                "Failed to parse tool arguments JSON for tool '{}': {}",
40                tool_name,
41                e
42            );
43            serde_json::json!({
44                "__parse_error": format!(
45                    "Malformed tool arguments: {}. Raw input: {}",
46                    e, arguments
47                )
48            })
49        })
50    }
51
52    fn merge_stream_text(text_content: &mut String, incoming: &str) -> Option<String> {
53        if incoming.is_empty() {
54            return None;
55        }
56        if text_content.is_empty() {
57            text_content.push_str(incoming);
58            return Some(incoming.to_string());
59        }
60        if incoming == text_content.as_str() || text_content.ends_with(incoming) {
61            return None;
62        }
63        if let Some(suffix) = incoming.strip_prefix(text_content.as_str()) {
64            if suffix.is_empty() {
65                return None;
66            }
67            text_content.push_str(suffix);
68            return Some(suffix.to_string());
69        }
70        text_content.push_str(incoming);
71        Some(incoming.to_string())
72    }
73
74    pub fn new(api_key: String, model: String) -> Self {
75        Self {
76            provider_name: "openai".to_string(),
77            api_key: SecretString::new(api_key),
78            model,
79            base_url: "https://api.openai.com".to_string(),
80            chat_completions_path: "/v1/chat/completions".to_string(),
81            headers: HashMap::new(),
82            temperature: None,
83            max_tokens: None,
84            http: default_http_client(),
85            retry_config: RetryConfig::default(),
86        }
87    }
88
89    pub fn with_base_url(mut self, base_url: String) -> Self {
90        self.base_url = normalize_base_url(&base_url);
91        self
92    }
93
94    pub fn with_provider_name(mut self, provider_name: impl Into<String>) -> Self {
95        self.provider_name = provider_name.into();
96        self
97    }
98
99    pub fn with_chat_completions_path(mut self, path: impl Into<String>) -> Self {
100        let path = path.into();
101        self.chat_completions_path = if path.starts_with('/') {
102            path
103        } else {
104            format!("/{}", path)
105        };
106        self
107    }
108
109    pub fn with_temperature(mut self, temperature: f32) -> Self {
110        self.temperature = Some(temperature);
111        self
112    }
113
114    pub fn with_headers(mut self, headers: HashMap<String, String>) -> Self {
115        self.headers = headers;
116        self
117    }
118
119    pub fn with_max_tokens(mut self, max_tokens: usize) -> Self {
120        self.max_tokens = Some(max_tokens);
121        self
122    }
123
124    pub fn with_retry_config(mut self, retry_config: RetryConfig) -> Self {
125        self.retry_config = retry_config;
126        self
127    }
128
129    pub fn with_http_client(mut self, http: Arc<dyn HttpClient>) -> Self {
130        self.http = http;
131        self
132    }
133
134    pub(crate) fn request_headers(&self) -> Vec<(String, String)> {
135        let mut headers = Vec::with_capacity(self.headers.len() + 1);
136        let has_authorization = self
137            .headers
138            .keys()
139            .any(|key| key.eq_ignore_ascii_case("authorization"));
140        if !has_authorization {
141            headers.push((
142                "Authorization".to_string(),
143                format!("Bearer {}", self.api_key.expose()),
144            ));
145        }
146        headers.extend(
147            self.headers
148                .iter()
149                .map(|(key, value)| (key.clone(), value.clone())),
150        );
151        headers
152    }
153
154    pub(crate) fn convert_messages(&self, messages: &[Message]) -> Vec<serde_json::Value> {
155        messages
156            .iter()
157            .map(|msg| {
158                let content: serde_json::Value = if msg.content.len() == 1 {
159                    match &msg.content[0] {
160                        ContentBlock::Text { text } => serde_json::json!(text),
161                        ContentBlock::ToolResult {
162                            tool_use_id,
163                            content,
164                            ..
165                        } => {
166                            let content_str = match content {
167                                ToolResultContentField::Text(s) => s.clone(),
168                                ToolResultContentField::Blocks(blocks) => blocks
169                                    .iter()
170                                    .filter_map(|b| {
171                                        if let ToolResultContent::Text { text } = b {
172                                            Some(text.clone())
173                                        } else {
174                                            None
175                                        }
176                                    })
177                                    .collect::<Vec<_>>()
178                                    .join("\n"),
179                            };
180                            return serde_json::json!({
181                                "role": "tool",
182                                "tool_call_id": tool_use_id,
183                                "content": content_str,
184                            });
185                        }
186                        _ => serde_json::json!(""),
187                    }
188                } else {
189                    serde_json::json!(msg
190                        .content
191                        .iter()
192                        .map(|block| {
193                            match block {
194                                ContentBlock::Text { text } => serde_json::json!({
195                                    "type": "text",
196                                    "text": text,
197                                }),
198                                ContentBlock::Image { source } => serde_json::json!({
199                                    "type": "image_url",
200                                    "image_url": {
201                                        "url": format!(
202                                            "data:{};base64,{}",
203                                            source.media_type, source.data
204                                        ),
205                                    }
206                                }),
207                                ContentBlock::ToolUse { id, name, input } => serde_json::json!({
208                                    "type": "function",
209                                    "id": id,
210                                    "function": {
211                                        "name": name,
212                                        "arguments": input.to_string(),
213                                    }
214                                }),
215                                _ => serde_json::json!({}),
216                            }
217                        })
218                        .collect::<Vec<_>>())
219                };
220
221                // Handle assistant messages — kimi-k2.5 requires reasoning_content
222                // on all assistant messages when thinking mode is enabled
223                if msg.role == "assistant" {
224                    let rc = msg.reasoning_content.as_deref().unwrap_or("");
225                    let tool_calls: Vec<_> = msg.tool_calls();
226                    if !tool_calls.is_empty() {
227                        return serde_json::json!({
228                            "role": "assistant",
229                            "content": msg.text(),
230                            "reasoning_content": rc,
231                            "tool_calls": tool_calls.iter().map(|tc| {
232                                serde_json::json!({
233                                    "id": tc.id,
234                                    "type": "function",
235                                    "function": {
236                                        "name": tc.name,
237                                        "arguments": tc.args.to_string(),
238                                    }
239                                })
240                            }).collect::<Vec<_>>(),
241                        });
242                    }
243                    return serde_json::json!({
244                        "role": "assistant",
245                        "content": content,
246                        "reasoning_content": rc,
247                    });
248                }
249
250                serde_json::json!({
251                    "role": msg.role,
252                    "content": content,
253                })
254            })
255            .collect()
256    }
257
258    pub(crate) fn convert_tools(&self, tools: &[ToolDefinition]) -> Vec<serde_json::Value> {
259        tools
260            .iter()
261            .map(|t| {
262                serde_json::json!({
263                    "type": "function",
264                    "function": {
265                        "name": t.name,
266                        "description": t.description,
267                        "parameters": t.parameters,
268                    }
269                })
270            })
271            .collect()
272    }
273}
274
275#[async_trait]
276impl LlmClient for OpenAiClient {
277    async fn complete(
278        &self,
279        messages: &[Message],
280        system: Option<&str>,
281        tools: &[ToolDefinition],
282    ) -> Result<LlmResponse> {
283        {
284            let request_started_at = Instant::now();
285            let mut openai_messages = Vec::new();
286
287            if let Some(sys) = system {
288                openai_messages.push(serde_json::json!({
289                    "role": "system",
290                    "content": sys,
291                }));
292            }
293
294            openai_messages.extend(self.convert_messages(messages));
295
296            let mut request = serde_json::json!({
297                "model": self.model,
298                "messages": openai_messages,
299            });
300
301            if let Some(temp) = self.temperature {
302                request["temperature"] = serde_json::json!(temp);
303            }
304            if let Some(max) = self.max_tokens {
305                request["max_tokens"] = serde_json::json!(max);
306            }
307
308            if !tools.is_empty() {
309                request["tools"] = serde_json::json!(self.convert_tools(tools));
310            }
311
312            let url = format!("{}{}", self.base_url, self.chat_completions_path);
313            let request_headers = self.request_headers();
314
315            let response = crate::retry::with_retry(&self.retry_config, |_attempt| {
316                let http = &self.http;
317                let url = &url;
318                let request_headers = request_headers.clone();
319                let request = &request;
320                async move {
321                    let headers = request_headers
322                        .iter()
323                        .map(|(key, value)| (key.as_str(), value.as_str()))
324                        .collect::<Vec<_>>();
325                    match http.post(url, headers, request).await {
326                        Ok(resp) => {
327                            let status = reqwest::StatusCode::from_u16(resp.status)
328                                .unwrap_or(reqwest::StatusCode::INTERNAL_SERVER_ERROR);
329                            if status.is_success() {
330                                AttemptOutcome::Success(resp.body)
331                            } else if self.retry_config.is_retryable_status(status) {
332                                AttemptOutcome::Retryable {
333                                    status,
334                                    body: resp.body,
335                                    retry_after: None,
336                                }
337                            } else {
338                                AttemptOutcome::Fatal(anyhow::anyhow!(
339                                    "OpenAI API error at {} ({}): {}",
340                                    url,
341                                    status,
342                                    resp.body
343                                ))
344                            }
345                        }
346                        Err(e) => {
347                            eprintln!("[DEBUG] HTTP error: {:?}", e);
348                            AttemptOutcome::Fatal(e)
349                        }
350                    }
351                }
352            })
353            .await?;
354
355            let parsed: OpenAiResponse =
356                serde_json::from_str(&response).context("Failed to parse OpenAI response")?;
357
358            let choice = parsed.choices.into_iter().next().context("No choices")?;
359
360            let mut content = vec![];
361
362            let reasoning_content = choice.message.reasoning_content;
363
364            let text_content = choice.message.content;
365
366            if let Some(text) = text_content {
367                if !text.is_empty() {
368                    content.push(ContentBlock::Text { text });
369                }
370            }
371
372            if let Some(tool_calls) = choice.message.tool_calls {
373                for tc in tool_calls {
374                    content.push(ContentBlock::ToolUse {
375                        id: tc.id,
376                        name: tc.function.name.clone(),
377                        input: Self::parse_tool_arguments(
378                            &tc.function.name,
379                            &tc.function.arguments,
380                        ),
381                    });
382                }
383            }
384
385            let llm_response = LlmResponse {
386                message: Message {
387                    role: "assistant".to_string(),
388                    content,
389                    reasoning_content,
390                },
391                usage: TokenUsage {
392                    prompt_tokens: parsed.usage.prompt_tokens,
393                    completion_tokens: parsed.usage.completion_tokens,
394                    total_tokens: parsed.usage.total_tokens,
395                    cache_read_tokens: parsed
396                        .usage
397                        .prompt_tokens_details
398                        .as_ref()
399                        .and_then(|d| d.cached_tokens),
400                    cache_write_tokens: None,
401                },
402                stop_reason: choice.finish_reason,
403                meta: Some(LlmResponseMeta {
404                    provider: Some(self.provider_name.clone()),
405                    request_model: Some(self.model.clone()),
406                    request_url: Some(url.clone()),
407                    response_id: parsed.id,
408                    response_model: parsed.model,
409                    response_object: parsed.object,
410                    first_token_ms: None,
411                    duration_ms: Some(request_started_at.elapsed().as_millis() as u64),
412                }),
413            };
414
415            crate::telemetry::record_llm_usage(
416                llm_response.usage.prompt_tokens,
417                llm_response.usage.completion_tokens,
418                llm_response.usage.total_tokens,
419                llm_response.stop_reason.as_deref(),
420            );
421
422            Ok(llm_response)
423        }
424    }
425
426    async fn complete_streaming(
427        &self,
428        messages: &[Message],
429        system: Option<&str>,
430        tools: &[ToolDefinition],
431    ) -> Result<mpsc::Receiver<StreamEvent>> {
432        {
433            let request_started_at = Instant::now();
434            let mut openai_messages = Vec::new();
435
436            if let Some(sys) = system {
437                openai_messages.push(serde_json::json!({
438                    "role": "system",
439                    "content": sys,
440                }));
441            }
442
443            openai_messages.extend(self.convert_messages(messages));
444
445            let mut request = serde_json::json!({
446                "model": self.model,
447                "messages": openai_messages,
448                "stream": true,
449                "stream_options": { "include_usage": true },
450            });
451
452            if let Some(temp) = self.temperature {
453                request["temperature"] = serde_json::json!(temp);
454            }
455            if let Some(max) = self.max_tokens {
456                request["max_tokens"] = serde_json::json!(max);
457            }
458
459            if !tools.is_empty() {
460                request["tools"] = serde_json::json!(self.convert_tools(tools));
461            }
462
463            let url = format!("{}{}", self.base_url, self.chat_completions_path);
464            let request_headers = self.request_headers();
465
466            let streaming_resp = crate::retry::with_retry(&self.retry_config, |_attempt| {
467                let http = &self.http;
468                let url = &url;
469                let request_headers = request_headers.clone();
470                let request = &request;
471                async move {
472                    let headers = request_headers
473                        .iter()
474                        .map(|(key, value)| (key.as_str(), value.as_str()))
475                        .collect::<Vec<_>>();
476                    match http.post_streaming(url, headers, request).await {
477                        Ok(resp) => {
478                            let status = reqwest::StatusCode::from_u16(resp.status)
479                                .unwrap_or(reqwest::StatusCode::INTERNAL_SERVER_ERROR);
480                            if status.is_success() {
481                                AttemptOutcome::Success(resp)
482                            } else {
483                                let retry_after = resp
484                                    .retry_after
485                                    .as_deref()
486                                    .and_then(|v| RetryConfig::parse_retry_after(Some(v)));
487                                if self.retry_config.is_retryable_status(status) {
488                                    AttemptOutcome::Retryable {
489                                        status,
490                                        body: resp.error_body,
491                                        retry_after,
492                                    }
493                                } else {
494                                    AttemptOutcome::Fatal(anyhow::anyhow!(
495                                        "OpenAI API error at {} ({}): {}",
496                                        url,
497                                        status,
498                                        resp.error_body
499                                    ))
500                                }
501                            }
502                        }
503                        Err(e) => AttemptOutcome::Fatal(anyhow::anyhow!(
504                            "Failed to send streaming request: {}",
505                            e
506                        )),
507                    }
508                }
509            })
510            .await?;
511
512            let (tx, rx) = mpsc::channel(100);
513
514            let mut stream = streaming_resp.byte_stream;
515            let provider_name = self.provider_name.clone();
516            let request_model = self.model.clone();
517            let request_url = url.clone();
518            tokio::spawn(async move {
519                let mut buffer = String::new();
520                let mut content_blocks: Vec<ContentBlock> = Vec::new();
521                let mut text_content = String::new();
522                let mut reasoning_content_accum = String::new();
523                let mut tool_calls: std::collections::BTreeMap<usize, (String, String, String)> =
524                    std::collections::BTreeMap::new();
525                let mut usage = TokenUsage::default();
526                let mut finish_reason = None;
527                let mut response_id = None;
528                let mut response_model = None;
529                let mut response_object = None;
530                let mut first_token_ms = None;
531                let mut saw_done = false;
532                let mut parsed_any_event = false;
533
534                while let Some(chunk_result) = stream.next().await {
535                    let chunk = match chunk_result {
536                        Ok(c) => c,
537                        Err(e) => {
538                            tracing::error!("Stream error: {}", e);
539                            break;
540                        }
541                    };
542
543                    buffer.push_str(&String::from_utf8_lossy(&chunk));
544
545                    while let Some(event_end) = buffer.find("\n\n") {
546                        let event_data: String = buffer.drain(..event_end).collect();
547                        buffer.drain(..2);
548
549                        for line in event_data.lines() {
550                            if let Some(data) = line.strip_prefix("data: ") {
551                                if data == "[DONE]" {
552                                    saw_done = true;
553                                    if !text_content.is_empty() {
554                                        content_blocks.push(ContentBlock::Text {
555                                            text: text_content.clone(),
556                                        });
557                                    }
558                                    for (_, (id, name, args)) in tool_calls.iter() {
559                                        content_blocks.push(ContentBlock::ToolUse {
560                                            id: id.clone(),
561                                            name: name.clone(),
562                                            input: Self::parse_tool_arguments(name, args),
563                                        });
564                                    }
565                                    tool_calls.clear();
566                                    crate::telemetry::record_llm_usage(
567                                        usage.prompt_tokens,
568                                        usage.completion_tokens,
569                                        usage.total_tokens,
570                                        finish_reason.as_deref(),
571                                    );
572                                    let response = LlmResponse {
573                                        message: Message {
574                                            role: "assistant".to_string(),
575                                            content: std::mem::take(&mut content_blocks),
576                                            reasoning_content: if reasoning_content_accum.is_empty()
577                                            {
578                                                None
579                                            } else {
580                                                Some(std::mem::take(&mut reasoning_content_accum))
581                                            },
582                                        },
583                                        usage: usage.clone(),
584                                        stop_reason: std::mem::take(&mut finish_reason),
585                                        meta: Some(LlmResponseMeta {
586                                            provider: Some(provider_name.clone()),
587                                            request_model: Some(request_model.clone()),
588                                            request_url: Some(request_url.clone()),
589                                            response_id: response_id.clone(),
590                                            response_model: response_model.clone(),
591                                            response_object: response_object.clone(),
592                                            first_token_ms,
593                                            duration_ms: Some(
594                                                request_started_at.elapsed().as_millis() as u64,
595                                            ),
596                                        }),
597                                    };
598                                    let _ = tx.send(StreamEvent::Done(response)).await;
599                                    continue;
600                                }
601
602                                if let Ok(event) = serde_json::from_str::<OpenAiStreamChunk>(data) {
603                                    parsed_any_event = true;
604                                    if response_id.is_none() {
605                                        response_id = event.id.clone();
606                                    }
607                                    if response_model.is_none() {
608                                        response_model = event.model.clone();
609                                    }
610                                    if response_object.is_none() {
611                                        response_object = event.object.clone();
612                                    }
613                                    if let Some(u) = event.usage {
614                                        usage.prompt_tokens = u.prompt_tokens;
615                                        usage.completion_tokens = u.completion_tokens;
616                                        usage.total_tokens = u.total_tokens;
617                                        usage.cache_read_tokens = u
618                                            .prompt_tokens_details
619                                            .as_ref()
620                                            .and_then(|d| d.cached_tokens);
621                                    }
622
623                                    if let Some(choice) = event.choices.into_iter().next() {
624                                        if let Some(reason) = choice.finish_reason {
625                                            finish_reason = Some(reason);
626                                        }
627
628                                        let delta_content = choice
629                                            .delta
630                                            .as_ref()
631                                            .and_then(|delta| delta.content.clone());
632
633                                        if let Some(message) = choice.message {
634                                            if let Some(reasoning) = message.reasoning_content {
635                                                reasoning_content_accum.push_str(&reasoning);
636                                            }
637                                            if delta_content.is_none() {
638                                                if let Some(content) = message
639                                                    .content
640                                                    .filter(|value| !value.is_empty())
641                                                {
642                                                    if first_token_ms.is_none() {
643                                                        first_token_ms = Some(
644                                                            request_started_at.elapsed().as_millis()
645                                                                as u64,
646                                                        );
647                                                    }
648                                                    if let Some(delta) = Self::merge_stream_text(
649                                                        &mut text_content,
650                                                        &content,
651                                                    ) {
652                                                        let _ = tx
653                                                            .send(StreamEvent::TextDelta(delta))
654                                                            .await;
655                                                    }
656                                                }
657                                            }
658                                            if let Some(tcs) = message.tool_calls {
659                                                for (index, tc) in tcs.into_iter().enumerate() {
660                                                    tool_calls.insert(
661                                                        index,
662                                                        (
663                                                            tc.id,
664                                                            tc.function.name,
665                                                            tc.function.arguments,
666                                                        ),
667                                                    );
668                                                }
669                                            }
670                                        }
671
672                                        if let Some(delta) = choice.delta {
673                                            if let Some(ref rc) = delta.reasoning_content {
674                                                reasoning_content_accum.push_str(rc);
675                                            }
676
677                                            if let Some(content) = delta.content {
678                                                if first_token_ms.is_none() {
679                                                    first_token_ms = Some(
680                                                        request_started_at.elapsed().as_millis()
681                                                            as u64,
682                                                    );
683                                                }
684                                                if let Some(delta) = Self::merge_stream_text(
685                                                    &mut text_content,
686                                                    &content,
687                                                ) {
688                                                    let _ = tx
689                                                        .send(StreamEvent::TextDelta(delta))
690                                                        .await;
691                                                }
692                                            }
693
694                                            if let Some(tcs) = delta.tool_calls {
695                                                for tc in tcs {
696                                                    let entry = tool_calls
697                                                        .entry(tc.index)
698                                                        .or_insert_with(|| {
699                                                            (
700                                                                String::new(),
701                                                                String::new(),
702                                                                String::new(),
703                                                            )
704                                                        });
705
706                                                    if let Some(id) = tc.id {
707                                                        entry.0 = id;
708                                                    }
709                                                    if let Some(func) = tc.function {
710                                                        if let Some(name) = func.name {
711                                                            if first_token_ms.is_none() {
712                                                                first_token_ms = Some(
713                                                                    request_started_at
714                                                                        .elapsed()
715                                                                        .as_millis()
716                                                                        as u64,
717                                                                );
718                                                            }
719                                                            entry.1 = name.clone();
720                                                            let _ = tx
721                                                                .send(StreamEvent::ToolUseStart {
722                                                                    id: entry.0.clone(),
723                                                                    name,
724                                                                })
725                                                                .await;
726                                                        }
727                                                        if let Some(args) = func.arguments {
728                                                            entry.2.push_str(&args);
729                                                            let _ = tx
730                                                                .send(
731                                                                    StreamEvent::ToolUseInputDelta(
732                                                                        args,
733                                                                    ),
734                                                                )
735                                                                .await;
736                                                        }
737                                                    }
738                                                }
739                                            }
740                                        }
741                                    }
742                                }
743                            }
744                        }
745                    }
746                }
747
748                if saw_done {
749                    return;
750                }
751
752                let trailing = buffer.trim();
753                if !trailing.is_empty() {
754                    if let Ok(event) = serde_json::from_str::<OpenAiStreamChunk>(trailing) {
755                        parsed_any_event = true;
756                        if response_id.is_none() {
757                            response_id = event.id.clone();
758                        }
759                        if response_model.is_none() {
760                            response_model = event.model.clone();
761                        }
762                        if response_object.is_none() {
763                            response_object = event.object.clone();
764                        }
765                        if let Some(u) = event.usage {
766                            usage.prompt_tokens = u.prompt_tokens;
767                            usage.completion_tokens = u.completion_tokens;
768                            usage.total_tokens = u.total_tokens;
769                            usage.cache_read_tokens = u
770                                .prompt_tokens_details
771                                .as_ref()
772                                .and_then(|d| d.cached_tokens);
773                        }
774                        if let Some(choice) = event.choices.into_iter().next() {
775                            if let Some(reason) = choice.finish_reason {
776                                finish_reason = Some(reason);
777                            }
778                            let delta_content = choice
779                                .delta
780                                .as_ref()
781                                .and_then(|delta| delta.content.clone());
782                            if let Some(message) = choice.message {
783                                if let Some(reasoning) = message.reasoning_content {
784                                    reasoning_content_accum.push_str(&reasoning);
785                                }
786                                if delta_content.is_none() {
787                                    if let Some(content) =
788                                        message.content.filter(|value| !value.is_empty())
789                                    {
790                                        if first_token_ms.is_none() {
791                                            first_token_ms = Some(
792                                                request_started_at.elapsed().as_millis() as u64,
793                                            );
794                                        }
795                                        if let Some(delta) =
796                                            Self::merge_stream_text(&mut text_content, &content)
797                                        {
798                                            let _ = tx.send(StreamEvent::TextDelta(delta)).await;
799                                        }
800                                    }
801                                }
802                                if let Some(tcs) = message.tool_calls {
803                                    for (index, tc) in tcs.into_iter().enumerate() {
804                                        tool_calls.insert(
805                                            index,
806                                            (tc.id, tc.function.name, tc.function.arguments),
807                                        );
808                                    }
809                                }
810                            }
811                            if let Some(delta) = choice.delta {
812                                if let Some(ref rc) = delta.reasoning_content {
813                                    reasoning_content_accum.push_str(rc);
814                                }
815                                if let Some(content) = delta.content {
816                                    if first_token_ms.is_none() {
817                                        first_token_ms =
818                                            Some(request_started_at.elapsed().as_millis() as u64);
819                                    }
820                                    if let Some(delta) =
821                                        Self::merge_stream_text(&mut text_content, &content)
822                                    {
823                                        let _ = tx.send(StreamEvent::TextDelta(delta)).await;
824                                    }
825                                }
826                            }
827                        }
828                    } else if let Ok(response) = serde_json::from_str::<OpenAiResponse>(trailing) {
829                        parsed_any_event = true;
830                        response_id = response.id.clone();
831                        response_model = response.model.clone();
832                        response_object = response.object.clone();
833                        usage.prompt_tokens = response.usage.prompt_tokens;
834                        usage.completion_tokens = response.usage.completion_tokens;
835                        usage.total_tokens = response.usage.total_tokens;
836                        usage.cache_read_tokens = response
837                            .usage
838                            .prompt_tokens_details
839                            .as_ref()
840                            .and_then(|d| d.cached_tokens);
841
842                        if let Some(choice) = response.choices.into_iter().next() {
843                            finish_reason = choice.finish_reason;
844                            if let Some(text) =
845                                choice.message.content.filter(|text| !text.is_empty())
846                            {
847                                if first_token_ms.is_none() {
848                                    first_token_ms =
849                                        Some(request_started_at.elapsed().as_millis() as u64);
850                                }
851                                let _ = Self::merge_stream_text(&mut text_content, &text);
852                            }
853                            if let Some(reasoning) = choice.message.reasoning_content {
854                                reasoning_content_accum.push_str(&reasoning);
855                            }
856                            if let Some(final_tool_calls) = choice.message.tool_calls {
857                                for tc in final_tool_calls {
858                                    tool_calls.insert(
859                                        tool_calls.len(),
860                                        (tc.id, tc.function.name, tc.function.arguments),
861                                    );
862                                }
863                            }
864                        }
865                    }
866                }
867
868                if parsed_any_event
869                    || !text_content.is_empty()
870                    || !tool_calls.is_empty()
871                    || !content_blocks.is_empty()
872                {
873                    tracing::warn!(
874                        provider = %provider_name,
875                        model = %request_model,
876                        "OpenAI-compatible stream ended without [DONE]; finalizing buffered response"
877                    );
878                    if !text_content.is_empty() {
879                        content_blocks.push(ContentBlock::Text {
880                            text: text_content.clone(),
881                        });
882                    }
883                    for (_, (id, name, args)) in tool_calls.iter() {
884                        content_blocks.push(ContentBlock::ToolUse {
885                            id: id.clone(),
886                            name: name.clone(),
887                            input: Self::parse_tool_arguments(name, args),
888                        });
889                    }
890                    tool_calls.clear();
891                    crate::telemetry::record_llm_usage(
892                        usage.prompt_tokens,
893                        usage.completion_tokens,
894                        usage.total_tokens,
895                        finish_reason.as_deref(),
896                    );
897                    let response = LlmResponse {
898                        message: Message {
899                            role: "assistant".to_string(),
900                            content: std::mem::take(&mut content_blocks),
901                            reasoning_content: if reasoning_content_accum.is_empty() {
902                                None
903                            } else {
904                                Some(std::mem::take(&mut reasoning_content_accum))
905                            },
906                        },
907                        usage: usage.clone(),
908                        stop_reason: std::mem::take(&mut finish_reason),
909                        meta: Some(LlmResponseMeta {
910                            provider: Some(provider_name.clone()),
911                            request_model: Some(request_model.clone()),
912                            request_url: Some(request_url.clone()),
913                            response_id: response_id.clone(),
914                            response_model: response_model.clone(),
915                            response_object: response_object.clone(),
916                            first_token_ms,
917                            duration_ms: Some(request_started_at.elapsed().as_millis() as u64),
918                        }),
919                    };
920                    let _ = tx.send(StreamEvent::Done(response)).await;
921                } else {
922                    tracing::warn!(
923                        provider = %provider_name,
924                        model = %request_model,
925                        trailing = %trailing.chars().take(400).collect::<String>(),
926                        "OpenAI-compatible stream ended without any parseable events"
927                    );
928                }
929            });
930
931            Ok(rx)
932        }
933    }
934}
935
936// OpenAI API response types (private)
937#[derive(Debug, Deserialize)]
938pub(crate) struct OpenAiResponse {
939    #[serde(default)]
940    pub(crate) id: Option<String>,
941    #[serde(default)]
942    pub(crate) object: Option<String>,
943    #[serde(default)]
944    pub(crate) model: Option<String>,
945    pub(crate) choices: Vec<OpenAiChoice>,
946    pub(crate) usage: OpenAiUsage,
947}
948
949#[derive(Debug, Deserialize)]
950pub(crate) struct OpenAiChoice {
951    pub(crate) message: OpenAiMessage,
952    pub(crate) finish_reason: Option<String>,
953}
954
955#[derive(Debug, Deserialize)]
956pub(crate) struct OpenAiMessage {
957    pub(crate) reasoning_content: Option<String>,
958    pub(crate) content: Option<String>,
959    pub(crate) tool_calls: Option<Vec<OpenAiToolCall>>,
960}
961
962#[derive(Debug, Deserialize)]
963pub(crate) struct OpenAiToolCall {
964    pub(crate) id: String,
965    pub(crate) function: OpenAiFunction,
966}
967
968#[derive(Debug, Deserialize)]
969pub(crate) struct OpenAiFunction {
970    pub(crate) name: String,
971    pub(crate) arguments: String,
972}
973
974#[derive(Debug, Deserialize)]
975pub(crate) struct OpenAiUsage {
976    #[serde(default)]
977    pub(crate) prompt_tokens: usize,
978    #[serde(default)]
979    pub(crate) completion_tokens: usize,
980    #[serde(default)]
981    pub(crate) total_tokens: usize,
982    /// OpenAI returns cached token count in `prompt_tokens_details.cached_tokens`
983    #[serde(default)]
984    pub(crate) prompt_tokens_details: Option<OpenAiPromptTokensDetails>,
985}
986
987#[derive(Debug, Deserialize)]
988pub(crate) struct OpenAiPromptTokensDetails {
989    #[serde(default)]
990    pub(crate) cached_tokens: Option<usize>,
991}
992
993// OpenAI streaming types
994#[derive(Debug, Deserialize)]
995pub(crate) struct OpenAiStreamChunk {
996    #[serde(default)]
997    pub(crate) id: Option<String>,
998    #[serde(default)]
999    pub(crate) object: Option<String>,
1000    #[serde(default)]
1001    pub(crate) model: Option<String>,
1002    pub(crate) choices: Vec<OpenAiStreamChoice>,
1003    pub(crate) usage: Option<OpenAiUsage>,
1004}
1005
1006#[derive(Debug, Deserialize)]
1007pub(crate) struct OpenAiStreamChoice {
1008    pub(crate) message: Option<OpenAiMessage>,
1009    pub(crate) delta: Option<OpenAiDelta>,
1010    pub(crate) finish_reason: Option<String>,
1011}
1012
1013#[derive(Debug, Deserialize)]
1014pub(crate) struct OpenAiDelta {
1015    pub(crate) reasoning_content: Option<String>,
1016    pub(crate) content: Option<String>,
1017    pub(crate) tool_calls: Option<Vec<OpenAiToolCallDelta>>,
1018}
1019
1020#[derive(Debug, Deserialize)]
1021pub(crate) struct OpenAiToolCallDelta {
1022    pub(crate) index: usize,
1023    pub(crate) id: Option<String>,
1024    pub(crate) function: Option<OpenAiFunctionDelta>,
1025}
1026
1027#[derive(Debug, Deserialize)]
1028pub(crate) struct OpenAiFunctionDelta {
1029    pub(crate) name: Option<String>,
1030    pub(crate) arguments: Option<String>,
1031}