Skip to main content

a3s_code_core/llm/
openai.rs

1//! OpenAI-compatible LLM client
2
3use super::http::{default_http_client, normalize_base_url, HttpClient};
4use super::types::*;
5use super::LlmClient;
6use crate::llm::types::{ToolResultContent, ToolResultContentField};
7use crate::retry::{AttemptOutcome, RetryConfig};
8use anyhow::{Context, Result};
9use async_trait::async_trait;
10use futures::StreamExt;
11use serde::Deserialize;
12use std::collections::HashMap;
13use std::sync::Arc;
14use std::time::Instant;
15use tokio::sync::mpsc;
16
17/// OpenAI client
18pub struct OpenAiClient {
19    pub(crate) provider_name: String,
20    pub(crate) api_key: SecretString,
21    pub(crate) model: String,
22    pub(crate) base_url: String,
23    pub(crate) chat_completions_path: String,
24    pub(crate) headers: HashMap<String, String>,
25    pub(crate) temperature: Option<f32>,
26    pub(crate) max_tokens: Option<usize>,
27    pub(crate) http: Arc<dyn HttpClient>,
28    pub(crate) retry_config: RetryConfig,
29}
30
31impl OpenAiClient {
32    pub(crate) fn parse_tool_arguments(tool_name: &str, arguments: &str) -> serde_json::Value {
33        if arguments.trim().is_empty() {
34            return serde_json::Value::Object(Default::default());
35        }
36
37        serde_json::from_str(arguments).unwrap_or_else(|e| {
38            tracing::warn!(
39                "Failed to parse tool arguments JSON for tool '{}': {}",
40                tool_name,
41                e
42            );
43            serde_json::json!({
44                "__parse_error": format!(
45                    "Malformed tool arguments: {}. Raw input: {}",
46                    e, arguments
47                )
48            })
49        })
50    }
51
52    fn merge_stream_text(text_content: &mut String, incoming: &str) -> Option<String> {
53        if incoming.is_empty() {
54            return None;
55        }
56        if text_content.is_empty() {
57            text_content.push_str(incoming);
58            return Some(incoming.to_string());
59        }
60        if incoming == text_content.as_str() || text_content.ends_with(incoming) {
61            return None;
62        }
63        if let Some(suffix) = incoming.strip_prefix(text_content.as_str()) {
64            if suffix.is_empty() {
65                return None;
66            }
67            text_content.push_str(suffix);
68            return Some(suffix.to_string());
69        }
70        text_content.push_str(incoming);
71        Some(incoming.to_string())
72    }
73
74    pub fn new(api_key: String, model: String) -> Self {
75        Self {
76            provider_name: "openai".to_string(),
77            api_key: SecretString::new(api_key),
78            model,
79            base_url: "https://api.openai.com".to_string(),
80            chat_completions_path: "/v1/chat/completions".to_string(),
81            headers: HashMap::new(),
82            temperature: None,
83            max_tokens: None,
84            http: default_http_client(),
85            retry_config: RetryConfig::default(),
86        }
87    }
88
89    pub fn with_base_url(mut self, base_url: String) -> Self {
90        self.base_url = normalize_base_url(&base_url);
91        self
92    }
93
94    pub fn with_provider_name(mut self, provider_name: impl Into<String>) -> Self {
95        self.provider_name = provider_name.into();
96        self
97    }
98
99    pub fn with_chat_completions_path(mut self, path: impl Into<String>) -> Self {
100        let path = path.into();
101        self.chat_completions_path = if path.starts_with('/') {
102            path
103        } else {
104            format!("/{}", path)
105        };
106        self
107    }
108
109    pub fn with_temperature(mut self, temperature: f32) -> Self {
110        self.temperature = Some(temperature);
111        self
112    }
113
114    pub fn with_headers(mut self, headers: HashMap<String, String>) -> Self {
115        self.headers = headers;
116        self
117    }
118
119    pub fn with_max_tokens(mut self, max_tokens: usize) -> Self {
120        self.max_tokens = Some(max_tokens);
121        self
122    }
123
124    pub fn with_retry_config(mut self, retry_config: RetryConfig) -> Self {
125        self.retry_config = retry_config;
126        self
127    }
128
129    pub fn with_http_client(mut self, http: Arc<dyn HttpClient>) -> Self {
130        self.http = http;
131        self
132    }
133
134    pub(crate) fn request_headers(&self) -> Vec<(String, String)> {
135        let mut headers = Vec::with_capacity(self.headers.len() + 1);
136        let has_authorization = self
137            .headers
138            .keys()
139            .any(|key| key.eq_ignore_ascii_case("authorization"));
140        if !has_authorization {
141            headers.push((
142                "Authorization".to_string(),
143                format!("Bearer {}", self.api_key.expose()),
144            ));
145        }
146        headers.extend(
147            self.headers
148                .iter()
149                .map(|(key, value)| (key.clone(), value.clone())),
150        );
151        headers
152    }
153
154    pub(crate) fn convert_messages(&self, messages: &[Message]) -> Vec<serde_json::Value> {
155        messages
156            .iter()
157            .map(|msg| {
158                let content: serde_json::Value = if msg.content.len() == 1 {
159                    match &msg.content[0] {
160                        ContentBlock::Text { text } => serde_json::json!(text),
161                        ContentBlock::ToolResult {
162                            tool_use_id,
163                            content,
164                            ..
165                        } => {
166                            let content_str = match content {
167                                ToolResultContentField::Text(s) => s.clone(),
168                                ToolResultContentField::Blocks(blocks) => blocks
169                                    .iter()
170                                    .filter_map(|b| {
171                                        if let ToolResultContent::Text { text } = b {
172                                            Some(text.clone())
173                                        } else {
174                                            None
175                                        }
176                                    })
177                                    .collect::<Vec<_>>()
178                                    .join("\n"),
179                            };
180                            return serde_json::json!({
181                                "role": "tool",
182                                "tool_call_id": tool_use_id,
183                                "content": content_str,
184                            });
185                        }
186                        _ => serde_json::json!(""),
187                    }
188                } else {
189                    serde_json::json!(msg
190                        .content
191                        .iter()
192                        .map(|block| {
193                            match block {
194                                ContentBlock::Text { text } => serde_json::json!({
195                                    "type": "text",
196                                    "text": text,
197                                }),
198                                ContentBlock::Image { source } => serde_json::json!({
199                                    "type": "image_url",
200                                    "image_url": {
201                                        "url": format!(
202                                            "data:{};base64,{}",
203                                            source.media_type, source.data
204                                        ),
205                                    }
206                                }),
207                                ContentBlock::ToolUse { id, name, input } => serde_json::json!({
208                                    "type": "function",
209                                    "id": id,
210                                    "function": {
211                                        "name": name,
212                                        "arguments": input.to_string(),
213                                    }
214                                }),
215                                _ => serde_json::json!({}),
216                            }
217                        })
218                        .collect::<Vec<_>>())
219                };
220
221                // Handle assistant messages — kimi-k2.5 requires reasoning_content
222                // on all assistant messages when thinking mode is enabled
223                if msg.role == "assistant" {
224                    let rc = msg.reasoning_content.as_deref().unwrap_or("");
225                    let tool_calls: Vec<_> = msg.tool_calls();
226                    if !tool_calls.is_empty() {
227                        return serde_json::json!({
228                            "role": "assistant",
229                            "content": msg.text(),
230                            "reasoning_content": rc,
231                            "tool_calls": tool_calls.iter().map(|tc| {
232                                serde_json::json!({
233                                    "id": tc.id,
234                                    "type": "function",
235                                    "function": {
236                                        "name": tc.name,
237                                        "arguments": tc.args.to_string(),
238                                    }
239                                })
240                            }).collect::<Vec<_>>(),
241                        });
242                    }
243                    return serde_json::json!({
244                        "role": "assistant",
245                        "content": content,
246                        "reasoning_content": rc,
247                    });
248                }
249
250                serde_json::json!({
251                    "role": msg.role,
252                    "content": content,
253                })
254            })
255            .collect()
256    }
257
258    pub(crate) fn convert_tools(&self, tools: &[ToolDefinition]) -> Vec<serde_json::Value> {
259        tools
260            .iter()
261            .map(|t| {
262                serde_json::json!({
263                    "type": "function",
264                    "function": {
265                        "name": t.name,
266                        "description": t.description,
267                        "parameters": t.parameters,
268                    }
269                })
270            })
271            .collect()
272    }
273}
274
275#[async_trait]
276impl LlmClient for OpenAiClient {
277    async fn complete(
278        &self,
279        messages: &[Message],
280        system: Option<&str>,
281        tools: &[ToolDefinition],
282    ) -> Result<LlmResponse> {
283        {
284            let request_started_at = Instant::now();
285            let mut openai_messages = Vec::new();
286
287            if let Some(sys) = system {
288                openai_messages.push(serde_json::json!({
289                    "role": "system",
290                    "content": sys,
291                }));
292            }
293
294            openai_messages.extend(self.convert_messages(messages));
295
296            let mut request = serde_json::json!({
297                "model": self.model,
298                "messages": openai_messages,
299            });
300
301            if let Some(temp) = self.temperature {
302                request["temperature"] = serde_json::json!(temp);
303            }
304            if let Some(max) = self.max_tokens {
305                request["max_tokens"] = serde_json::json!(max);
306            }
307
308            if !tools.is_empty() {
309                request["tools"] = serde_json::json!(self.convert_tools(tools));
310            }
311
312            let url = format!("{}{}", self.base_url, self.chat_completions_path);
313            let request_headers = self.request_headers();
314
315            let response = crate::retry::with_retry(&self.retry_config, |_attempt| {
316                let http = &self.http;
317                let url = &url;
318                let request_headers = request_headers.clone();
319                let request = &request;
320                async move {
321                    let headers = request_headers
322                        .iter()
323                        .map(|(key, value)| (key.as_str(), value.as_str()))
324                        .collect::<Vec<_>>();
325                    // Non-streaming: use a non-cancelled token for now
326                    let cancel_token = tokio_util::sync::CancellationToken::new();
327                    match http.post(url, headers, request, cancel_token).await {
328                        Ok(resp) => {
329                            let status = reqwest::StatusCode::from_u16(resp.status)
330                                .unwrap_or(reqwest::StatusCode::INTERNAL_SERVER_ERROR);
331                            if status.is_success() {
332                                AttemptOutcome::Success(resp.body)
333                            } else if self.retry_config.is_retryable_status(status) {
334                                AttemptOutcome::Retryable {
335                                    status,
336                                    body: resp.body,
337                                    retry_after: None,
338                                }
339                            } else {
340                                AttemptOutcome::Fatal(anyhow::anyhow!(
341                                    "OpenAI API error at {} ({}): {}",
342                                    url,
343                                    status,
344                                    resp.body
345                                ))
346                            }
347                        }
348                        Err(e) => {
349                            eprintln!("[DEBUG] HTTP error: {:?}", e);
350                            AttemptOutcome::Fatal(e)
351                        }
352                    }
353                }
354            })
355            .await?;
356
357            let parsed: OpenAiResponse =
358                serde_json::from_str(&response).context("Failed to parse OpenAI response")?;
359
360            let choice = parsed.choices.into_iter().next().context("No choices")?;
361
362            let mut content = vec![];
363
364            let reasoning_content = choice.message.reasoning_content;
365
366            let text_content = choice.message.content;
367
368            if let Some(text) = text_content {
369                if !text.is_empty() {
370                    content.push(ContentBlock::Text { text });
371                }
372            }
373
374            if let Some(tool_calls) = choice.message.tool_calls {
375                for tc in tool_calls {
376                    content.push(ContentBlock::ToolUse {
377                        id: tc.id,
378                        name: tc.function.name.clone(),
379                        input: Self::parse_tool_arguments(
380                            &tc.function.name,
381                            &tc.function.arguments,
382                        ),
383                    });
384                }
385            }
386
387            let llm_response = LlmResponse {
388                message: Message {
389                    role: "assistant".to_string(),
390                    content,
391                    reasoning_content,
392                },
393                usage: TokenUsage {
394                    prompt_tokens: parsed.usage.prompt_tokens,
395                    completion_tokens: parsed.usage.completion_tokens,
396                    total_tokens: parsed.usage.total_tokens,
397                    cache_read_tokens: parsed
398                        .usage
399                        .prompt_tokens_details
400                        .as_ref()
401                        .and_then(|d| d.cached_tokens),
402                    cache_write_tokens: None,
403                },
404                stop_reason: choice.finish_reason,
405                meta: Some(LlmResponseMeta {
406                    provider: Some(self.provider_name.clone()),
407                    request_model: Some(self.model.clone()),
408                    request_url: Some(url.clone()),
409                    response_id: parsed.id,
410                    response_model: parsed.model,
411                    response_object: parsed.object,
412                    first_token_ms: None,
413                    duration_ms: Some(request_started_at.elapsed().as_millis() as u64),
414                }),
415            };
416
417            crate::telemetry::record_llm_usage(
418                llm_response.usage.prompt_tokens,
419                llm_response.usage.completion_tokens,
420                llm_response.usage.total_tokens,
421                llm_response.stop_reason.as_deref(),
422            );
423
424            Ok(llm_response)
425        }
426    }
427
428    async fn complete_streaming(
429        &self,
430        messages: &[Message],
431        system: Option<&str>,
432        tools: &[ToolDefinition],
433        cancel_token: tokio_util::sync::CancellationToken,
434    ) -> Result<mpsc::Receiver<StreamEvent>> {
435        {
436            let request_started_at = Instant::now();
437            let mut openai_messages = Vec::new();
438
439            if let Some(sys) = system {
440                openai_messages.push(serde_json::json!({
441                    "role": "system",
442                    "content": sys,
443                }));
444            }
445
446            openai_messages.extend(self.convert_messages(messages));
447
448            let mut request = serde_json::json!({
449                "model": self.model,
450                "messages": openai_messages,
451                "stream": true,
452                "stream_options": { "include_usage": true },
453            });
454
455            if let Some(temp) = self.temperature {
456                request["temperature"] = serde_json::json!(temp);
457            }
458            if let Some(max) = self.max_tokens {
459                request["max_tokens"] = serde_json::json!(max);
460            }
461
462            if !tools.is_empty() {
463                request["tools"] = serde_json::json!(self.convert_tools(tools));
464            }
465
466            let url = format!("{}{}", self.base_url, self.chat_completions_path);
467            let request_headers = self.request_headers();
468
469            let streaming_resp = crate::retry::with_retry(&self.retry_config, |_attempt| {
470                let http = &self.http;
471                let url = &url;
472                let request_headers = request_headers.clone();
473                let request = &request;
474                let cancel_token = cancel_token.clone();
475                async move {
476                    let headers = request_headers
477                        .iter()
478                        .map(|(key, value)| (key.as_str(), value.as_str()))
479                        .collect::<Vec<_>>();
480                    // Wrap in tokio::select! so cancellation aborts the HTTP request mid-flight
481                    let resp = tokio::select! {
482                        _ = cancel_token.cancelled() => {
483                            return AttemptOutcome::Fatal(anyhow::anyhow!("HTTP request cancelled"));
484                        }
485                        result = http.post_streaming(url, headers, request, cancel_token.clone()) => {
486                            match result {
487                                Ok(r) => r,
488                                Err(e) => {
489                                    return AttemptOutcome::Fatal(anyhow::anyhow!("HTTP request failed: {}", e));
490                                }
491                            }
492                        }
493                    };
494                    let status = reqwest::StatusCode::from_u16(resp.status)
495                        .unwrap_or(reqwest::StatusCode::INTERNAL_SERVER_ERROR);
496                    if status.is_success() {
497                        AttemptOutcome::Success(resp)
498                    } else {
499                        let retry_after = resp
500                            .retry_after
501                            .as_deref()
502                            .and_then(|v| RetryConfig::parse_retry_after(Some(v)));
503                        if self.retry_config.is_retryable_status(status) {
504                            AttemptOutcome::Retryable {
505                                status,
506                                body: resp.error_body,
507                                retry_after,
508                            }
509                        } else {
510                            AttemptOutcome::Fatal(anyhow::anyhow!(
511                                "OpenAI API error at {} ({}): {}",
512                                url,
513                                status,
514                                resp.error_body
515                            ))
516                        }
517                    }
518                }
519            })
520            .await?;
521
522            let (tx, rx) = mpsc::channel(100);
523
524            let mut stream = streaming_resp.byte_stream;
525            let provider_name = self.provider_name.clone();
526            let request_model = self.model.clone();
527            let request_url = url.clone();
528            tokio::spawn(async move {
529                let mut buffer = String::new();
530                let mut content_blocks: Vec<ContentBlock> = Vec::new();
531                let mut text_content = String::new();
532                let mut reasoning_content_accum = String::new();
533                let mut tool_calls: std::collections::BTreeMap<usize, (String, String, String)> =
534                    std::collections::BTreeMap::new();
535                let mut usage = TokenUsage::default();
536                let mut finish_reason = None;
537                let mut response_id = None;
538                let mut response_model = None;
539                let mut response_object = None;
540                let mut first_token_ms = None;
541                let mut saw_done = false;
542                let mut parsed_any_event = false;
543
544                while let Some(chunk_result) = stream.next().await {
545                    let chunk = match chunk_result {
546                        Ok(c) => c,
547                        Err(e) => {
548                            tracing::error!("Stream error: {}", e);
549                            break;
550                        }
551                    };
552
553                    buffer.push_str(&String::from_utf8_lossy(&chunk));
554
555                    while let Some(event_end) = buffer.find("\n\n") {
556                        let event_data: String = buffer.drain(..event_end).collect();
557                        buffer.drain(..2);
558
559                        for line in event_data.lines() {
560                            if let Some(data) = line.strip_prefix("data: ") {
561                                if data == "[DONE]" {
562                                    saw_done = true;
563                                    if !text_content.is_empty() {
564                                        content_blocks.push(ContentBlock::Text {
565                                            text: text_content.clone(),
566                                        });
567                                    }
568                                    for (_, (id, name, args)) in tool_calls.iter() {
569                                        content_blocks.push(ContentBlock::ToolUse {
570                                            id: id.clone(),
571                                            name: name.clone(),
572                                            input: Self::parse_tool_arguments(name, args),
573                                        });
574                                    }
575                                    tool_calls.clear();
576                                    crate::telemetry::record_llm_usage(
577                                        usage.prompt_tokens,
578                                        usage.completion_tokens,
579                                        usage.total_tokens,
580                                        finish_reason.as_deref(),
581                                    );
582                                    let response = LlmResponse {
583                                        message: Message {
584                                            role: "assistant".to_string(),
585                                            content: std::mem::take(&mut content_blocks),
586                                            reasoning_content: if reasoning_content_accum.is_empty()
587                                            {
588                                                None
589                                            } else {
590                                                Some(std::mem::take(&mut reasoning_content_accum))
591                                            },
592                                        },
593                                        usage: usage.clone(),
594                                        stop_reason: std::mem::take(&mut finish_reason),
595                                        meta: Some(LlmResponseMeta {
596                                            provider: Some(provider_name.clone()),
597                                            request_model: Some(request_model.clone()),
598                                            request_url: Some(request_url.clone()),
599                                            response_id: response_id.clone(),
600                                            response_model: response_model.clone(),
601                                            response_object: response_object.clone(),
602                                            first_token_ms,
603                                            duration_ms: Some(
604                                                request_started_at.elapsed().as_millis() as u64,
605                                            ),
606                                        }),
607                                    };
608                                    let _ = tx.send(StreamEvent::Done(response)).await;
609                                    continue;
610                                }
611
612                                if let Ok(event) = serde_json::from_str::<OpenAiStreamChunk>(data) {
613                                    parsed_any_event = true;
614                                    if response_id.is_none() {
615                                        response_id = event.id.clone();
616                                    }
617                                    if response_model.is_none() {
618                                        response_model = event.model.clone();
619                                    }
620                                    if response_object.is_none() {
621                                        response_object = event.object.clone();
622                                    }
623                                    if let Some(u) = event.usage {
624                                        usage.prompt_tokens = u.prompt_tokens;
625                                        usage.completion_tokens = u.completion_tokens;
626                                        usage.total_tokens = u.total_tokens;
627                                        usage.cache_read_tokens = u
628                                            .prompt_tokens_details
629                                            .as_ref()
630                                            .and_then(|d| d.cached_tokens);
631                                    }
632
633                                    if let Some(choice) = event.choices.into_iter().next() {
634                                        if let Some(reason) = choice.finish_reason {
635                                            finish_reason = Some(reason);
636                                        }
637
638                                        let delta_content = choice
639                                            .delta
640                                            .as_ref()
641                                            .and_then(|delta| delta.content.clone());
642
643                                        if let Some(message) = choice.message {
644                                            if let Some(reasoning) = message.reasoning_content {
645                                                reasoning_content_accum.push_str(&reasoning);
646                                            }
647                                            if delta_content.is_none() {
648                                                if let Some(content) = message
649                                                    .content
650                                                    .filter(|value| !value.is_empty())
651                                                {
652                                                    if first_token_ms.is_none() {
653                                                        first_token_ms = Some(
654                                                            request_started_at.elapsed().as_millis()
655                                                                as u64,
656                                                        );
657                                                    }
658                                                    if let Some(delta) = Self::merge_stream_text(
659                                                        &mut text_content,
660                                                        &content,
661                                                    ) {
662                                                        let _ = tx
663                                                            .send(StreamEvent::TextDelta(delta))
664                                                            .await;
665                                                    }
666                                                }
667                                            }
668                                            if let Some(tcs) = message.tool_calls {
669                                                for (index, tc) in tcs.into_iter().enumerate() {
670                                                    tool_calls.insert(
671                                                        index,
672                                                        (
673                                                            tc.id,
674                                                            tc.function.name,
675                                                            tc.function.arguments,
676                                                        ),
677                                                    );
678                                                }
679                                            }
680                                        }
681
682                                        if let Some(delta) = choice.delta {
683                                            if let Some(ref rc) = delta.reasoning_content {
684                                                reasoning_content_accum.push_str(rc);
685                                            }
686
687                                            if let Some(content) = delta.content {
688                                                if first_token_ms.is_none() {
689                                                    first_token_ms = Some(
690                                                        request_started_at.elapsed().as_millis()
691                                                            as u64,
692                                                    );
693                                                }
694                                                if let Some(delta) = Self::merge_stream_text(
695                                                    &mut text_content,
696                                                    &content,
697                                                ) {
698                                                    let _ = tx
699                                                        .send(StreamEvent::TextDelta(delta))
700                                                        .await;
701                                                }
702                                            }
703
704                                            if let Some(tcs) = delta.tool_calls {
705                                                for tc in tcs {
706                                                    let entry = tool_calls
707                                                        .entry(tc.index)
708                                                        .or_insert_with(|| {
709                                                            (
710                                                                String::new(),
711                                                                String::new(),
712                                                                String::new(),
713                                                            )
714                                                        });
715
716                                                    if let Some(id) = tc.id {
717                                                        entry.0 = id;
718                                                    }
719                                                    if let Some(func) = tc.function {
720                                                        if let Some(name) = func.name {
721                                                            if first_token_ms.is_none() {
722                                                                first_token_ms = Some(
723                                                                    request_started_at
724                                                                        .elapsed()
725                                                                        .as_millis()
726                                                                        as u64,
727                                                                );
728                                                            }
729                                                            entry.1 = name.clone();
730                                                            let _ = tx
731                                                                .send(StreamEvent::ToolUseStart {
732                                                                    id: entry.0.clone(),
733                                                                    name,
734                                                                })
735                                                                .await;
736                                                        }
737                                                        if let Some(args) = func.arguments {
738                                                            entry.2.push_str(&args);
739                                                            let _ = tx
740                                                                .send(
741                                                                    StreamEvent::ToolUseInputDelta(
742                                                                        args,
743                                                                    ),
744                                                                )
745                                                                .await;
746                                                        }
747                                                    }
748                                                }
749                                            }
750                                        }
751                                    }
752                                }
753                            }
754                        }
755                    }
756                }
757
758                if saw_done {
759                    return;
760                }
761
762                let trailing = buffer.trim();
763                if !trailing.is_empty() {
764                    if let Ok(event) = serde_json::from_str::<OpenAiStreamChunk>(trailing) {
765                        parsed_any_event = true;
766                        if response_id.is_none() {
767                            response_id = event.id.clone();
768                        }
769                        if response_model.is_none() {
770                            response_model = event.model.clone();
771                        }
772                        if response_object.is_none() {
773                            response_object = event.object.clone();
774                        }
775                        if let Some(u) = event.usage {
776                            usage.prompt_tokens = u.prompt_tokens;
777                            usage.completion_tokens = u.completion_tokens;
778                            usage.total_tokens = u.total_tokens;
779                            usage.cache_read_tokens = u
780                                .prompt_tokens_details
781                                .as_ref()
782                                .and_then(|d| d.cached_tokens);
783                        }
784                        if let Some(choice) = event.choices.into_iter().next() {
785                            if let Some(reason) = choice.finish_reason {
786                                finish_reason = Some(reason);
787                            }
788                            let delta_content = choice
789                                .delta
790                                .as_ref()
791                                .and_then(|delta| delta.content.clone());
792                            if let Some(message) = choice.message {
793                                if let Some(reasoning) = message.reasoning_content {
794                                    reasoning_content_accum.push_str(&reasoning);
795                                }
796                                if delta_content.is_none() {
797                                    if let Some(content) =
798                                        message.content.filter(|value| !value.is_empty())
799                                    {
800                                        if first_token_ms.is_none() {
801                                            first_token_ms = Some(
802                                                request_started_at.elapsed().as_millis() as u64,
803                                            );
804                                        }
805                                        if let Some(delta) =
806                                            Self::merge_stream_text(&mut text_content, &content)
807                                        {
808                                            let _ = tx.send(StreamEvent::TextDelta(delta)).await;
809                                        }
810                                    }
811                                }
812                                if let Some(tcs) = message.tool_calls {
813                                    for (index, tc) in tcs.into_iter().enumerate() {
814                                        tool_calls.insert(
815                                            index,
816                                            (tc.id, tc.function.name, tc.function.arguments),
817                                        );
818                                    }
819                                }
820                            }
821                            if let Some(delta) = choice.delta {
822                                if let Some(ref rc) = delta.reasoning_content {
823                                    reasoning_content_accum.push_str(rc);
824                                }
825                                if let Some(content) = delta.content {
826                                    if first_token_ms.is_none() {
827                                        first_token_ms =
828                                            Some(request_started_at.elapsed().as_millis() as u64);
829                                    }
830                                    if let Some(delta) =
831                                        Self::merge_stream_text(&mut text_content, &content)
832                                    {
833                                        let _ = tx.send(StreamEvent::TextDelta(delta)).await;
834                                    }
835                                }
836                            }
837                        }
838                    } else if let Ok(response) = serde_json::from_str::<OpenAiResponse>(trailing) {
839                        parsed_any_event = true;
840                        response_id = response.id.clone();
841                        response_model = response.model.clone();
842                        response_object = response.object.clone();
843                        usage.prompt_tokens = response.usage.prompt_tokens;
844                        usage.completion_tokens = response.usage.completion_tokens;
845                        usage.total_tokens = response.usage.total_tokens;
846                        usage.cache_read_tokens = response
847                            .usage
848                            .prompt_tokens_details
849                            .as_ref()
850                            .and_then(|d| d.cached_tokens);
851
852                        if let Some(choice) = response.choices.into_iter().next() {
853                            finish_reason = choice.finish_reason;
854                            if let Some(text) =
855                                choice.message.content.filter(|text| !text.is_empty())
856                            {
857                                if first_token_ms.is_none() {
858                                    first_token_ms =
859                                        Some(request_started_at.elapsed().as_millis() as u64);
860                                }
861                                let _ = Self::merge_stream_text(&mut text_content, &text);
862                            }
863                            if let Some(reasoning) = choice.message.reasoning_content {
864                                reasoning_content_accum.push_str(&reasoning);
865                            }
866                            if let Some(final_tool_calls) = choice.message.tool_calls {
867                                for tc in final_tool_calls {
868                                    tool_calls.insert(
869                                        tool_calls.len(),
870                                        (tc.id, tc.function.name, tc.function.arguments),
871                                    );
872                                }
873                            }
874                        }
875                    }
876                }
877
878                if parsed_any_event
879                    || !text_content.is_empty()
880                    || !tool_calls.is_empty()
881                    || !content_blocks.is_empty()
882                {
883                    tracing::warn!(
884                        provider = %provider_name,
885                        model = %request_model,
886                        "OpenAI-compatible stream ended without [DONE]; finalizing buffered response"
887                    );
888                    if !text_content.is_empty() {
889                        content_blocks.push(ContentBlock::Text {
890                            text: text_content.clone(),
891                        });
892                    }
893                    for (_, (id, name, args)) in tool_calls.iter() {
894                        content_blocks.push(ContentBlock::ToolUse {
895                            id: id.clone(),
896                            name: name.clone(),
897                            input: Self::parse_tool_arguments(name, args),
898                        });
899                    }
900                    tool_calls.clear();
901                    crate::telemetry::record_llm_usage(
902                        usage.prompt_tokens,
903                        usage.completion_tokens,
904                        usage.total_tokens,
905                        finish_reason.as_deref(),
906                    );
907                    let response = LlmResponse {
908                        message: Message {
909                            role: "assistant".to_string(),
910                            content: std::mem::take(&mut content_blocks),
911                            reasoning_content: if reasoning_content_accum.is_empty() {
912                                None
913                            } else {
914                                Some(std::mem::take(&mut reasoning_content_accum))
915                            },
916                        },
917                        usage: usage.clone(),
918                        stop_reason: std::mem::take(&mut finish_reason),
919                        meta: Some(LlmResponseMeta {
920                            provider: Some(provider_name.clone()),
921                            request_model: Some(request_model.clone()),
922                            request_url: Some(request_url.clone()),
923                            response_id: response_id.clone(),
924                            response_model: response_model.clone(),
925                            response_object: response_object.clone(),
926                            first_token_ms,
927                            duration_ms: Some(request_started_at.elapsed().as_millis() as u64),
928                        }),
929                    };
930                    let _ = tx.send(StreamEvent::Done(response)).await;
931                } else {
932                    tracing::warn!(
933                        provider = %provider_name,
934                        model = %request_model,
935                        trailing = %trailing.chars().take(400).collect::<String>(),
936                        "OpenAI-compatible stream ended without any parseable events"
937                    );
938                }
939            });
940
941            Ok(rx)
942        }
943    }
944}
945
946// OpenAI API response types (private)
947#[derive(Debug, Deserialize)]
948pub(crate) struct OpenAiResponse {
949    #[serde(default)]
950    pub(crate) id: Option<String>,
951    #[serde(default)]
952    pub(crate) object: Option<String>,
953    #[serde(default)]
954    pub(crate) model: Option<String>,
955    pub(crate) choices: Vec<OpenAiChoice>,
956    pub(crate) usage: OpenAiUsage,
957}
958
959#[derive(Debug, Deserialize)]
960pub(crate) struct OpenAiChoice {
961    pub(crate) message: OpenAiMessage,
962    pub(crate) finish_reason: Option<String>,
963}
964
965#[derive(Debug, Deserialize)]
966pub(crate) struct OpenAiMessage {
967    pub(crate) reasoning_content: Option<String>,
968    pub(crate) content: Option<String>,
969    pub(crate) tool_calls: Option<Vec<OpenAiToolCall>>,
970}
971
972#[derive(Debug, Deserialize)]
973pub(crate) struct OpenAiToolCall {
974    pub(crate) id: String,
975    pub(crate) function: OpenAiFunction,
976}
977
978#[derive(Debug, Deserialize)]
979pub(crate) struct OpenAiFunction {
980    pub(crate) name: String,
981    pub(crate) arguments: String,
982}
983
984#[derive(Debug, Deserialize)]
985pub(crate) struct OpenAiUsage {
986    #[serde(default)]
987    pub(crate) prompt_tokens: usize,
988    #[serde(default)]
989    pub(crate) completion_tokens: usize,
990    #[serde(default)]
991    pub(crate) total_tokens: usize,
992    /// OpenAI returns cached token count in `prompt_tokens_details.cached_tokens`
993    #[serde(default)]
994    pub(crate) prompt_tokens_details: Option<OpenAiPromptTokensDetails>,
995}
996
997#[derive(Debug, Deserialize)]
998pub(crate) struct OpenAiPromptTokensDetails {
999    #[serde(default)]
1000    pub(crate) cached_tokens: Option<usize>,
1001}
1002
1003// OpenAI streaming types
1004#[derive(Debug, Deserialize)]
1005pub(crate) struct OpenAiStreamChunk {
1006    #[serde(default)]
1007    pub(crate) id: Option<String>,
1008    #[serde(default)]
1009    pub(crate) object: Option<String>,
1010    #[serde(default)]
1011    pub(crate) model: Option<String>,
1012    pub(crate) choices: Vec<OpenAiStreamChoice>,
1013    pub(crate) usage: Option<OpenAiUsage>,
1014}
1015
1016#[derive(Debug, Deserialize)]
1017pub(crate) struct OpenAiStreamChoice {
1018    pub(crate) message: Option<OpenAiMessage>,
1019    pub(crate) delta: Option<OpenAiDelta>,
1020    pub(crate) finish_reason: Option<String>,
1021}
1022
1023#[derive(Debug, Deserialize)]
1024pub(crate) struct OpenAiDelta {
1025    pub(crate) reasoning_content: Option<String>,
1026    pub(crate) content: Option<String>,
1027    pub(crate) tool_calls: Option<Vec<OpenAiToolCallDelta>>,
1028}
1029
1030#[derive(Debug, Deserialize)]
1031pub(crate) struct OpenAiToolCallDelta {
1032    pub(crate) index: usize,
1033    pub(crate) id: Option<String>,
1034    pub(crate) function: Option<OpenAiFunctionDelta>,
1035}
1036
1037#[derive(Debug, Deserialize)]
1038pub(crate) struct OpenAiFunctionDelta {
1039    pub(crate) name: Option<String>,
1040    pub(crate) arguments: Option<String>,
1041}