Skip to main content

a3s_code_core/llm/
openai.rs

1//! OpenAI-compatible LLM client
2
3use super::http::{default_http_client, normalize_base_url, HttpClient};
4use super::types::*;
5use super::LlmClient;
6use crate::llm::types::{ToolResultContent, ToolResultContentField};
7use crate::retry::{AttemptOutcome, RetryConfig};
8use anyhow::{Context, Result};
9use async_trait::async_trait;
10use futures::StreamExt;
11use serde::Deserialize;
12use std::collections::HashMap;
13use std::sync::Arc;
14use std::time::Instant;
15use tokio::sync::mpsc;
16
17/// OpenAI client
18pub struct OpenAiClient {
19    pub(crate) provider_name: String,
20    pub(crate) api_key: SecretString,
21    pub(crate) model: String,
22    pub(crate) base_url: String,
23    pub(crate) chat_completions_path: String,
24    pub(crate) headers: HashMap<String, String>,
25    pub(crate) temperature: Option<f32>,
26    pub(crate) max_tokens: Option<usize>,
27    pub(crate) http: Arc<dyn HttpClient>,
28    pub(crate) retry_config: RetryConfig,
29}
30
31impl OpenAiClient {
32    pub(crate) fn parse_tool_arguments(tool_name: &str, arguments: &str) -> serde_json::Value {
33        if arguments.trim().is_empty() {
34            return serde_json::Value::Object(Default::default());
35        }
36
37        serde_json::from_str(arguments).unwrap_or_else(|e| {
38            tracing::warn!(
39                "Failed to parse tool arguments JSON for tool '{}': {}",
40                tool_name,
41                e
42            );
43            serde_json::json!({
44                "__parse_error": format!(
45                    "Malformed tool arguments: {}. Raw input: {}",
46                    e, arguments
47                )
48            })
49        })
50    }
51
52    fn merge_stream_text(text_content: &mut String, incoming: &str) -> Option<String> {
53        if incoming.is_empty() {
54            return None;
55        }
56        if text_content.is_empty() {
57            text_content.push_str(incoming);
58            return Some(incoming.to_string());
59        }
60        if incoming == text_content.as_str() || text_content.ends_with(incoming) {
61            return None;
62        }
63        if let Some(suffix) = incoming.strip_prefix(text_content.as_str()) {
64            if suffix.is_empty() {
65                return None;
66            }
67            text_content.push_str(suffix);
68            return Some(suffix.to_string());
69        }
70        text_content.push_str(incoming);
71        Some(incoming.to_string())
72    }
73
74    pub fn new(api_key: String, model: String) -> Self {
75        Self {
76            provider_name: "openai".to_string(),
77            api_key: SecretString::new(api_key),
78            model,
79            base_url: "https://api.openai.com".to_string(),
80            chat_completions_path: "/v1/chat/completions".to_string(),
81            headers: HashMap::new(),
82            temperature: None,
83            max_tokens: None,
84            http: default_http_client(),
85            retry_config: RetryConfig::default(),
86        }
87    }
88
89    pub fn with_base_url(mut self, base_url: String) -> Self {
90        self.base_url = normalize_base_url(&base_url);
91        self
92    }
93
94    pub fn with_provider_name(mut self, provider_name: impl Into<String>) -> Self {
95        self.provider_name = provider_name.into();
96        self
97    }
98
99    pub fn with_chat_completions_path(mut self, path: impl Into<String>) -> Self {
100        let path = path.into();
101        self.chat_completions_path = if path.starts_with('/') {
102            path
103        } else {
104            format!("/{}", path)
105        };
106        self
107    }
108
109    pub fn with_temperature(mut self, temperature: f32) -> Self {
110        self.temperature = Some(temperature);
111        self
112    }
113
114    pub fn with_headers(mut self, headers: HashMap<String, String>) -> Self {
115        self.headers = headers;
116        self
117    }
118
119    pub fn with_max_tokens(mut self, max_tokens: usize) -> Self {
120        self.max_tokens = Some(max_tokens);
121        self
122    }
123
124    pub fn with_retry_config(mut self, retry_config: RetryConfig) -> Self {
125        self.retry_config = retry_config;
126        self
127    }
128
129    pub fn with_http_client(mut self, http: Arc<dyn HttpClient>) -> Self {
130        self.http = http;
131        self
132    }
133
134    pub(crate) fn request_headers(&self) -> Vec<(String, String)> {
135        let mut headers = Vec::with_capacity(self.headers.len() + 1);
136        let has_authorization = self
137            .headers
138            .keys()
139            .any(|key| key.eq_ignore_ascii_case("authorization"));
140        if !has_authorization {
141            headers.push((
142                "Authorization".to_string(),
143                format!("Bearer {}", self.api_key.expose()),
144            ));
145        }
146        headers.extend(
147            self.headers
148                .iter()
149                .map(|(key, value)| (key.clone(), value.clone())),
150        );
151        headers
152    }
153
154    pub(crate) fn convert_messages(&self, messages: &[Message]) -> Vec<serde_json::Value> {
155        messages
156            .iter()
157            .map(|msg| {
158                let content: serde_json::Value = if msg.content.len() == 1 {
159                    match &msg.content[0] {
160                        ContentBlock::Text { text } => serde_json::json!(text),
161                        ContentBlock::ToolResult {
162                            tool_use_id,
163                            content,
164                            ..
165                        } => {
166                            let content_str = match content {
167                                ToolResultContentField::Text(s) => s.clone(),
168                                ToolResultContentField::Blocks(blocks) => blocks
169                                    .iter()
170                                    .filter_map(|b| {
171                                        if let ToolResultContent::Text { text } = b {
172                                            Some(text.clone())
173                                        } else {
174                                            None
175                                        }
176                                    })
177                                    .collect::<Vec<_>>()
178                                    .join("\n"),
179                            };
180                            return serde_json::json!({
181                                "role": "tool",
182                                "tool_call_id": tool_use_id,
183                                "content": content_str,
184                            });
185                        }
186                        _ => serde_json::json!(""),
187                    }
188                } else {
189                    serde_json::json!(msg
190                        .content
191                        .iter()
192                        .map(|block| {
193                            match block {
194                                ContentBlock::Text { text } => serde_json::json!({
195                                    "type": "text",
196                                    "text": text,
197                                }),
198                                ContentBlock::Image { source } => serde_json::json!({
199                                    "type": "image_url",
200                                    "image_url": {
201                                        "url": format!(
202                                            "data:{};base64,{}",
203                                            source.media_type, source.data
204                                        ),
205                                    }
206                                }),
207                                ContentBlock::ToolUse { id, name, input } => serde_json::json!({
208                                    "type": "function",
209                                    "id": id,
210                                    "function": {
211                                        "name": name,
212                                        "arguments": input.to_string(),
213                                    }
214                                }),
215                                _ => serde_json::json!({}),
216                            }
217                        })
218                        .collect::<Vec<_>>())
219                };
220
221                // Handle assistant messages — kimi-k2.5 requires reasoning_content
222                // on all assistant messages when thinking mode is enabled
223                if msg.role == "assistant" {
224                    let rc = msg.reasoning_content.as_deref().unwrap_or("");
225                    let tool_calls: Vec<_> = msg.tool_calls();
226                    if !tool_calls.is_empty() {
227                        return serde_json::json!({
228                            "role": "assistant",
229                            "content": msg.text(),
230                            "reasoning_content": rc,
231                            "tool_calls": tool_calls.iter().map(|tc| {
232                                serde_json::json!({
233                                    "id": tc.id,
234                                    "type": "function",
235                                    "function": {
236                                        "name": tc.name,
237                                        "arguments": tc.args.to_string(),
238                                    }
239                                })
240                            }).collect::<Vec<_>>(),
241                        });
242                    }
243                    return serde_json::json!({
244                        "role": "assistant",
245                        "content": content,
246                        "reasoning_content": rc,
247                    });
248                }
249
250                serde_json::json!({
251                    "role": msg.role,
252                    "content": content,
253                })
254            })
255            .collect()
256    }
257
258    pub(crate) fn convert_tools(&self, tools: &[ToolDefinition]) -> Vec<serde_json::Value> {
259        tools
260            .iter()
261            .map(|t| {
262                serde_json::json!({
263                    "type": "function",
264                    "function": {
265                        "name": t.name,
266                        "description": t.description,
267                        "parameters": t.parameters,
268                    }
269                })
270            })
271            .collect()
272    }
273}
274
275#[async_trait]
276impl LlmClient for OpenAiClient {
277    async fn complete(
278        &self,
279        messages: &[Message],
280        system: Option<&str>,
281        tools: &[ToolDefinition],
282    ) -> Result<LlmResponse> {
283        {
284            let request_started_at = Instant::now();
285            let mut openai_messages = Vec::new();
286
287            if let Some(sys) = system {
288                openai_messages.push(serde_json::json!({
289                    "role": "system",
290                    "content": sys,
291                }));
292            }
293
294            openai_messages.extend(self.convert_messages(messages));
295
296            let mut request = serde_json::json!({
297                "model": self.model,
298                "messages": openai_messages,
299            });
300
301            if let Some(temp) = self.temperature {
302                request["temperature"] = serde_json::json!(temp);
303            }
304            if let Some(max) = self.max_tokens {
305                request["max_tokens"] = serde_json::json!(max);
306            }
307
308            if !tools.is_empty() {
309                request["tools"] = serde_json::json!(self.convert_tools(tools));
310            }
311
312            let url = format!("{}{}", self.base_url, self.chat_completions_path);
313            let request_headers = self.request_headers();
314
315            let response = crate::retry::with_retry(&self.retry_config, |_attempt| {
316                let http = &self.http;
317                let url = &url;
318                let request_headers = request_headers.clone();
319                let request = &request;
320                async move {
321                    let headers = request_headers
322                        .iter()
323                        .map(|(key, value)| (key.as_str(), value.as_str()))
324                        .collect::<Vec<_>>();
325                    // Non-streaming: use a non-cancelled token for now
326                    let cancel_token = tokio_util::sync::CancellationToken::new();
327                    match http.post(url, headers, request, cancel_token).await {
328                        Ok(resp) => {
329                            let status = reqwest::StatusCode::from_u16(resp.status)
330                                .unwrap_or(reqwest::StatusCode::INTERNAL_SERVER_ERROR);
331                            if status.is_success() {
332                                AttemptOutcome::Success(resp.body)
333                            } else if self.retry_config.is_retryable_status(status) {
334                                AttemptOutcome::Retryable {
335                                    status,
336                                    body: resp.body,
337                                    retry_after: None,
338                                }
339                            } else {
340                                AttemptOutcome::Fatal(anyhow::anyhow!(
341                                    "OpenAI API error at {} ({}): {}",
342                                    url,
343                                    status,
344                                    resp.body
345                                ))
346                            }
347                        }
348                        Err(e) => {
349                            eprintln!("[DEBUG] HTTP error: {:?}", e);
350                            AttemptOutcome::Fatal(e)
351                        }
352                    }
353                }
354            })
355            .await?;
356
357            let parsed: OpenAiResponse =
358                serde_json::from_str(&response).context("Failed to parse OpenAI response")?;
359
360            let choice = parsed.choices.into_iter().next().context("No choices")?;
361
362            let mut content = vec![];
363
364            let reasoning_content = choice.message.reasoning_content;
365
366            let text_content = choice.message.content;
367
368            if let Some(text) = text_content {
369                if !text.is_empty() {
370                    content.push(ContentBlock::Text { text });
371                }
372            }
373
374            if let Some(tool_calls) = choice.message.tool_calls {
375                for tc in tool_calls {
376                    content.push(ContentBlock::ToolUse {
377                        id: tc.id,
378                        name: tc.function.name.clone(),
379                        input: Self::parse_tool_arguments(
380                            &tc.function.name,
381                            &tc.function.arguments,
382                        ),
383                    });
384                }
385            }
386
387            let llm_response = LlmResponse {
388                message: Message {
389                    role: "assistant".to_string(),
390                    content,
391                    reasoning_content,
392                },
393                usage: TokenUsage {
394                    prompt_tokens: parsed.usage.prompt_tokens,
395                    completion_tokens: parsed.usage.completion_tokens,
396                    total_tokens: {
397                        let t = parsed.usage.total_tokens;
398                        // MiniMax: fall back to total_characters when total_tokens is 0.
399                        if t == 0 {
400                            parsed.usage.total_characters.unwrap_or(0)
401                        } else {
402                            t
403                        }
404                    },
405                    cache_read_tokens: parsed
406                        .usage
407                        .prompt_tokens_details
408                        .as_ref()
409                        .and_then(|d| d.cached_tokens),
410                    cache_write_tokens: None,
411                },
412                stop_reason: choice.finish_reason,
413                meta: Some(LlmResponseMeta {
414                    provider: Some(self.provider_name.clone()),
415                    request_model: Some(self.model.clone()),
416                    request_url: Some(url.clone()),
417                    response_id: parsed.id,
418                    response_model: parsed.model,
419                    response_object: parsed.object,
420                    first_token_ms: None,
421                    duration_ms: Some(request_started_at.elapsed().as_millis() as u64),
422                }),
423            };
424
425            crate::telemetry::record_llm_usage(
426                llm_response.usage.prompt_tokens,
427                llm_response.usage.completion_tokens,
428                llm_response.usage.total_tokens,
429                llm_response.stop_reason.as_deref(),
430            );
431
432            Ok(llm_response)
433        }
434    }
435
436    async fn complete_streaming(
437        &self,
438        messages: &[Message],
439        system: Option<&str>,
440        tools: &[ToolDefinition],
441        cancel_token: tokio_util::sync::CancellationToken,
442    ) -> Result<mpsc::Receiver<StreamEvent>> {
443        {
444            let request_started_at = Instant::now();
445            let mut openai_messages = Vec::new();
446
447            if let Some(sys) = system {
448                openai_messages.push(serde_json::json!({
449                    "role": "system",
450                    "content": sys,
451                }));
452            }
453
454            openai_messages.extend(self.convert_messages(messages));
455
456            let mut request = serde_json::json!({
457                "model": self.model,
458                "messages": openai_messages,
459                "stream": true,
460                "stream_options": { "include_usage": true },
461            });
462
463            if let Some(temp) = self.temperature {
464                request["temperature"] = serde_json::json!(temp);
465            }
466            if let Some(max) = self.max_tokens {
467                request["max_tokens"] = serde_json::json!(max);
468            }
469
470            if !tools.is_empty() {
471                request["tools"] = serde_json::json!(self.convert_tools(tools));
472            }
473
474            let url = format!("{}{}", self.base_url, self.chat_completions_path);
475            let request_headers = self.request_headers();
476
477            let streaming_resp = crate::retry::with_retry(&self.retry_config, |_attempt| {
478                let http = &self.http;
479                let url = &url;
480                let request_headers = request_headers.clone();
481                let request = &request;
482                let cancel_token = cancel_token.clone();
483                async move {
484                    let headers = request_headers
485                        .iter()
486                        .map(|(key, value)| (key.as_str(), value.as_str()))
487                        .collect::<Vec<_>>();
488                    // Wrap in tokio::select! so cancellation aborts the HTTP request mid-flight
489                    let resp = tokio::select! {
490                        _ = cancel_token.cancelled() => {
491                            return AttemptOutcome::Fatal(anyhow::anyhow!("HTTP request cancelled"));
492                        }
493                        result = http.post_streaming(url, headers, request, cancel_token.clone()) => {
494                            match result {
495                                Ok(r) => r,
496                                Err(e) => {
497                                    return AttemptOutcome::Fatal(anyhow::anyhow!("HTTP request failed: {}", e));
498                                }
499                            }
500                        }
501                    };
502                    let status = reqwest::StatusCode::from_u16(resp.status)
503                        .unwrap_or(reqwest::StatusCode::INTERNAL_SERVER_ERROR);
504                    if status.is_success() {
505                        AttemptOutcome::Success(resp)
506                    } else {
507                        let retry_after = resp
508                            .retry_after
509                            .as_deref()
510                            .and_then(|v| RetryConfig::parse_retry_after(Some(v)));
511                        if self.retry_config.is_retryable_status(status) {
512                            AttemptOutcome::Retryable {
513                                status,
514                                body: resp.error_body,
515                                retry_after,
516                            }
517                        } else {
518                            AttemptOutcome::Fatal(anyhow::anyhow!(
519                                "OpenAI API error at {} ({}): {}",
520                                url,
521                                status,
522                                resp.error_body
523                            ))
524                        }
525                    }
526                }
527            })
528            .await?;
529
530            let (tx, rx) = mpsc::channel(100);
531
532            let mut stream = streaming_resp.byte_stream;
533            let provider_name = self.provider_name.clone();
534            let request_model = self.model.clone();
535            let request_url = url.clone();
536            tokio::spawn(async move {
537                let mut buffer = String::new();
538                let mut content_blocks: Vec<ContentBlock> = Vec::new();
539                let mut text_content = String::new();
540                let mut reasoning_content_accum = String::new();
541                let mut tool_calls: std::collections::BTreeMap<usize, (String, String, String)> =
542                    std::collections::BTreeMap::new();
543                let mut usage = TokenUsage::default();
544                let mut finish_reason = None;
545                let mut response_id = None;
546                let mut response_model = None;
547                let mut response_object = None;
548                let mut first_token_ms = None;
549                let mut saw_done = false;
550                let mut parsed_any_event = false;
551
552                while let Some(chunk_result) = stream.next().await {
553                    let chunk = match chunk_result {
554                        Ok(c) => c,
555                        Err(e) => {
556                            tracing::error!("Stream error: {}", e);
557                            break;
558                        }
559                    };
560
561                    buffer.push_str(&String::from_utf8_lossy(&chunk));
562
563                    while let Some(event_end) = buffer.find("\n\n") {
564                        let event_data: String = buffer.drain(..event_end).collect();
565                        buffer.drain(..2);
566
567                        for line in event_data.lines() {
568                            if let Some(data) = line.strip_prefix("data: ") {
569                                if data == "[DONE]" {
570                                    saw_done = true;
571                                    if !text_content.is_empty() {
572                                        content_blocks.push(ContentBlock::Text {
573                                            text: text_content.clone(),
574                                        });
575                                    }
576                                    for (_, (id, name, args)) in tool_calls.iter() {
577                                        content_blocks.push(ContentBlock::ToolUse {
578                                            id: id.clone(),
579                                            name: name.clone(),
580                                            input: Self::parse_tool_arguments(name, args),
581                                        });
582                                    }
583                                    tool_calls.clear();
584                                    crate::telemetry::record_llm_usage(
585                                        usage.prompt_tokens,
586                                        usage.completion_tokens,
587                                        usage.total_tokens,
588                                        finish_reason.as_deref(),
589                                    );
590                                    let response = LlmResponse {
591                                        message: Message {
592                                            role: "assistant".to_string(),
593                                            content: std::mem::take(&mut content_blocks),
594                                            reasoning_content: if reasoning_content_accum.is_empty()
595                                            {
596                                                None
597                                            } else {
598                                                Some(std::mem::take(&mut reasoning_content_accum))
599                                            },
600                                        },
601                                        usage: usage.clone(),
602                                        stop_reason: std::mem::take(&mut finish_reason),
603                                        meta: Some(LlmResponseMeta {
604                                            provider: Some(provider_name.clone()),
605                                            request_model: Some(request_model.clone()),
606                                            request_url: Some(request_url.clone()),
607                                            response_id: response_id.clone(),
608                                            response_model: response_model.clone(),
609                                            response_object: response_object.clone(),
610                                            first_token_ms,
611                                            duration_ms: Some(
612                                                request_started_at.elapsed().as_millis() as u64,
613                                            ),
614                                        }),
615                                    };
616                                    let _ = tx.send(StreamEvent::Done(response)).await;
617                                    continue;
618                                }
619
620                                if let Ok(event) = serde_json::from_str::<OpenAiStreamChunk>(data) {
621                                    parsed_any_event = true;
622                                    if response_id.is_none() {
623                                        response_id = event.id.clone();
624                                    }
625                                    if response_model.is_none() {
626                                        response_model = event.model.clone();
627                                    }
628                                    if response_object.is_none() {
629                                        response_object = event.object.clone();
630                                    }
631                                    if let Some(u) = event.usage {
632                                        usage.prompt_tokens = u.prompt_tokens;
633                                        usage.completion_tokens = u.completion_tokens;
634                                        usage.total_tokens = u.total_tokens;
635                                        // MiniMax: fall back to total_characters when total_tokens is 0.
636                                        if usage.total_tokens == 0 {
637                                            usage.total_tokens = u.total_characters.unwrap_or(0);
638                                        }
639                                        usage.cache_read_tokens = u
640                                            .prompt_tokens_details
641                                            .as_ref()
642                                            .and_then(|d| d.cached_tokens);
643                                    }
644
645                                    if let Some(choice) = event.choices.into_iter().next() {
646                                        if let Some(reason) = choice.finish_reason {
647                                            finish_reason = Some(reason);
648                                        }
649
650                                        let delta_content = choice
651                                            .delta
652                                            .as_ref()
653                                            .and_then(|delta| delta.content.clone());
654
655                                        if let Some(message) = choice.message {
656                                            if let Some(reasoning) = message.reasoning_content {
657                                                reasoning_content_accum.push_str(&reasoning);
658                                                // For models like kimi that use reasoning_content for actual text output,
659                                                // emit it as TextDelta when delta_content is empty (None or "")
660                                                let delta_content_empty = delta_content
661                                                    .as_deref()
662                                                    .is_none_or(str::is_empty);
663                                                if delta_content_empty {
664                                                    if first_token_ms.is_none() {
665                                                        first_token_ms = Some(
666                                                            request_started_at.elapsed().as_millis()
667                                                                as u64,
668                                                        );
669                                                    }
670                                                    if let Some(delta) = Self::merge_stream_text(
671                                                        &mut text_content,
672                                                        &reasoning,
673                                                    ) {
674                                                        let _ = tx
675                                                            .send(StreamEvent::ReasoningDelta(
676                                                                delta,
677                                                            ))
678                                                            .await;
679                                                    }
680                                                }
681                                            }
682                                            if delta_content.as_deref().is_none_or(str::is_empty) {
683                                                if let Some(content) = message
684                                                    .content
685                                                    .filter(|value| !value.is_empty())
686                                                {
687                                                    if first_token_ms.is_none() {
688                                                        first_token_ms = Some(
689                                                            request_started_at.elapsed().as_millis()
690                                                                as u64,
691                                                        );
692                                                    }
693                                                    if let Some(delta) = Self::merge_stream_text(
694                                                        &mut text_content,
695                                                        &content,
696                                                    ) {
697                                                        let _ = tx
698                                                            .send(StreamEvent::TextDelta(delta))
699                                                            .await;
700                                                    }
701                                                }
702                                            }
703                                            if let Some(tcs) = message.tool_calls {
704                                                for (index, tc) in tcs.into_iter().enumerate() {
705                                                    tool_calls.insert(
706                                                        index,
707                                                        (
708                                                            tc.id,
709                                                            tc.function.name,
710                                                            tc.function.arguments,
711                                                        ),
712                                                    );
713                                                }
714                                            }
715                                        }
716
717                                        if let Some(delta) = choice.delta {
718                                            if let Some(ref rc) = delta.reasoning_content {
719                                                reasoning_content_accum.push_str(rc);
720                                                // For models like kimi that use reasoning_content for actual text output,
721                                                // emit it as TextDelta when content is empty (None or "")
722                                                let content_empty = delta
723                                                    .content
724                                                    .as_deref()
725                                                    .is_none_or(|s| s.is_empty());
726                                                if content_empty {
727                                                    if first_token_ms.is_none() {
728                                                        first_token_ms = Some(
729                                                            request_started_at.elapsed().as_millis()
730                                                                as u64,
731                                                        );
732                                                    }
733                                                    if let Some(delta) = Self::merge_stream_text(
734                                                        &mut text_content,
735                                                        rc,
736                                                    ) {
737                                                        let _ = tx
738                                                            .send(StreamEvent::ReasoningDelta(
739                                                                delta,
740                                                            ))
741                                                            .await;
742                                                    }
743                                                }
744                                            }
745
746                                            if let Some(content) = delta.content {
747                                                if first_token_ms.is_none() {
748                                                    first_token_ms = Some(
749                                                        request_started_at.elapsed().as_millis()
750                                                            as u64,
751                                                    );
752                                                }
753                                                if let Some(delta) = Self::merge_stream_text(
754                                                    &mut text_content,
755                                                    &content,
756                                                ) {
757                                                    let _ = tx
758                                                        .send(StreamEvent::TextDelta(delta))
759                                                        .await;
760                                                }
761                                            }
762
763                                            if let Some(tcs) = delta.tool_calls {
764                                                for tc in tcs {
765                                                    let entry = tool_calls
766                                                        .entry(tc.index)
767                                                        .or_insert_with(|| {
768                                                            (
769                                                                String::new(),
770                                                                String::new(),
771                                                                String::new(),
772                                                            )
773                                                        });
774
775                                                    if let Some(id) = tc.id {
776                                                        entry.0 = id;
777                                                    }
778                                                    if let Some(func) = tc.function {
779                                                        if let Some(name) = func.name {
780                                                            if first_token_ms.is_none() {
781                                                                first_token_ms = Some(
782                                                                    request_started_at
783                                                                        .elapsed()
784                                                                        .as_millis()
785                                                                        as u64,
786                                                                );
787                                                            }
788                                                            entry.1 = name.clone();
789                                                            let _ = tx
790                                                                .send(StreamEvent::ToolUseStart {
791                                                                    id: entry.0.clone(),
792                                                                    name,
793                                                                })
794                                                                .await;
795                                                        }
796                                                        if let Some(args) = func.arguments {
797                                                            entry.2.push_str(&args);
798                                                            let _ = tx
799                                                                .send(
800                                                                    StreamEvent::ToolUseInputDelta(
801                                                                        args,
802                                                                    ),
803                                                                )
804                                                                .await;
805                                                        }
806                                                    }
807                                                }
808                                            }
809                                        }
810                                    }
811                                }
812                            }
813                        }
814                    }
815                }
816
817                if saw_done {
818                    return;
819                }
820
821                let trailing = buffer.trim();
822                if !trailing.is_empty() {
823                    if let Ok(event) = serde_json::from_str::<OpenAiStreamChunk>(trailing) {
824                        parsed_any_event = true;
825                        if response_id.is_none() {
826                            response_id = event.id.clone();
827                        }
828                        if response_model.is_none() {
829                            response_model = event.model.clone();
830                        }
831                        if response_object.is_none() {
832                            response_object = event.object.clone();
833                        }
834                        if let Some(u) = event.usage {
835                            usage.prompt_tokens = u.prompt_tokens;
836                            usage.completion_tokens = u.completion_tokens;
837                            usage.total_tokens = u.total_tokens;
838                            usage.cache_read_tokens = u
839                                .prompt_tokens_details
840                                .as_ref()
841                                .and_then(|d| d.cached_tokens);
842                        }
843                        if let Some(choice) = event.choices.into_iter().next() {
844                            if let Some(reason) = choice.finish_reason {
845                                finish_reason = Some(reason);
846                            }
847                            let delta_content = choice
848                                .delta
849                                .as_ref()
850                                .and_then(|delta| delta.content.clone());
851                            if let Some(message) = choice.message {
852                                if let Some(reasoning) = message.reasoning_content {
853                                    reasoning_content_accum.push_str(&reasoning);
854                                }
855                                if delta_content.as_deref().is_none_or(str::is_empty) {
856                                    if let Some(content) =
857                                        message.content.filter(|value| !value.is_empty())
858                                    {
859                                        if first_token_ms.is_none() {
860                                            first_token_ms = Some(
861                                                request_started_at.elapsed().as_millis() as u64,
862                                            );
863                                        }
864                                        if let Some(delta) =
865                                            Self::merge_stream_text(&mut text_content, &content)
866                                        {
867                                            let _ = tx.send(StreamEvent::TextDelta(delta)).await;
868                                        }
869                                    }
870                                }
871                                if let Some(tcs) = message.tool_calls {
872                                    for (index, tc) in tcs.into_iter().enumerate() {
873                                        tool_calls.insert(
874                                            index,
875                                            (tc.id, tc.function.name, tc.function.arguments),
876                                        );
877                                    }
878                                }
879                            }
880                            if let Some(delta) = choice.delta {
881                                if let Some(ref rc) = delta.reasoning_content {
882                                    reasoning_content_accum.push_str(rc);
883                                    // For models like kimi that use reasoning_content for actual text output,
884                                    // emit it as TextDelta when content is empty (None or "")
885                                    let content_empty =
886                                        delta.content.as_deref().is_none_or(|s| s.is_empty());
887                                    if content_empty {
888                                        if first_token_ms.is_none() {
889                                            first_token_ms = Some(
890                                                request_started_at.elapsed().as_millis() as u64,
891                                            );
892                                        }
893                                        if let Some(delta) =
894                                            Self::merge_stream_text(&mut text_content, rc)
895                                        {
896                                            let _ =
897                                                tx.send(StreamEvent::ReasoningDelta(delta)).await;
898                                        }
899                                    }
900                                }
901                                if let Some(content) = delta.content {
902                                    if first_token_ms.is_none() {
903                                        first_token_ms =
904                                            Some(request_started_at.elapsed().as_millis() as u64);
905                                    }
906                                    if let Some(delta) =
907                                        Self::merge_stream_text(&mut text_content, &content)
908                                    {
909                                        let _ = tx.send(StreamEvent::TextDelta(delta)).await;
910                                    }
911                                }
912                            }
913                        }
914                    } else if let Ok(response) = serde_json::from_str::<OpenAiResponse>(trailing) {
915                        parsed_any_event = true;
916                        response_id = response.id.clone();
917                        response_model = response.model.clone();
918                        response_object = response.object.clone();
919                        usage.prompt_tokens = response.usage.prompt_tokens;
920                        usage.completion_tokens = response.usage.completion_tokens;
921                        usage.total_tokens = response.usage.total_tokens;
922                        // MiniMax: fall back to total_characters when total_tokens is 0.
923                        if usage.total_tokens == 0 {
924                            usage.total_tokens = response.usage.total_characters.unwrap_or(0);
925                        }
926                        usage.cache_read_tokens = response
927                            .usage
928                            .prompt_tokens_details
929                            .as_ref()
930                            .and_then(|d| d.cached_tokens);
931
932                        if let Some(choice) = response.choices.into_iter().next() {
933                            finish_reason = choice.finish_reason;
934                            if let Some(text) =
935                                choice.message.content.filter(|text| !text.is_empty())
936                            {
937                                if first_token_ms.is_none() {
938                                    first_token_ms =
939                                        Some(request_started_at.elapsed().as_millis() as u64);
940                                }
941                                let _ = Self::merge_stream_text(&mut text_content, &text);
942                            }
943                            if let Some(reasoning) = choice.message.reasoning_content {
944                                reasoning_content_accum.push_str(&reasoning);
945                            }
946                            if let Some(final_tool_calls) = choice.message.tool_calls {
947                                for tc in final_tool_calls {
948                                    tool_calls.insert(
949                                        tool_calls.len(),
950                                        (tc.id, tc.function.name, tc.function.arguments),
951                                    );
952                                }
953                            }
954                        }
955                    }
956                }
957
958                if parsed_any_event
959                    || !text_content.is_empty()
960                    || !tool_calls.is_empty()
961                    || !content_blocks.is_empty()
962                {
963                    tracing::warn!(
964                        provider = %provider_name,
965                        model = %request_model,
966                        "OpenAI-compatible stream ended without [DONE]; finalizing buffered response"
967                    );
968                    if !text_content.is_empty() {
969                        content_blocks.push(ContentBlock::Text {
970                            text: text_content.clone(),
971                        });
972                    }
973                    for (_, (id, name, args)) in tool_calls.iter() {
974                        content_blocks.push(ContentBlock::ToolUse {
975                            id: id.clone(),
976                            name: name.clone(),
977                            input: Self::parse_tool_arguments(name, args),
978                        });
979                    }
980                    tool_calls.clear();
981                    crate::telemetry::record_llm_usage(
982                        usage.prompt_tokens,
983                        usage.completion_tokens,
984                        usage.total_tokens,
985                        finish_reason.as_deref(),
986                    );
987                    let response = LlmResponse {
988                        message: Message {
989                            role: "assistant".to_string(),
990                            content: std::mem::take(&mut content_blocks),
991                            reasoning_content: if reasoning_content_accum.is_empty() {
992                                None
993                            } else {
994                                Some(std::mem::take(&mut reasoning_content_accum))
995                            },
996                        },
997                        usage: usage.clone(),
998                        stop_reason: std::mem::take(&mut finish_reason),
999                        meta: Some(LlmResponseMeta {
1000                            provider: Some(provider_name.clone()),
1001                            request_model: Some(request_model.clone()),
1002                            request_url: Some(request_url.clone()),
1003                            response_id: response_id.clone(),
1004                            response_model: response_model.clone(),
1005                            response_object: response_object.clone(),
1006                            first_token_ms,
1007                            duration_ms: Some(request_started_at.elapsed().as_millis() as u64),
1008                        }),
1009                    };
1010                    let _ = tx.send(StreamEvent::Done(response)).await;
1011                } else {
1012                    tracing::warn!(
1013                        provider = %provider_name,
1014                        model = %request_model,
1015                        trailing = %trailing.chars().take(400).collect::<String>(),
1016                        "OpenAI-compatible stream ended without any parseable events"
1017                    );
1018                }
1019            });
1020
1021            Ok(rx)
1022        }
1023    }
1024}
1025
1026// OpenAI API response types (private)
1027#[derive(Debug, Deserialize)]
1028pub(crate) struct OpenAiResponse {
1029    #[serde(default)]
1030    pub(crate) id: Option<String>,
1031    #[serde(default)]
1032    pub(crate) object: Option<String>,
1033    #[serde(default)]
1034    pub(crate) model: Option<String>,
1035    pub(crate) choices: Vec<OpenAiChoice>,
1036    pub(crate) usage: OpenAiUsage,
1037}
1038
1039#[derive(Debug, Deserialize)]
1040pub(crate) struct OpenAiChoice {
1041    pub(crate) message: OpenAiMessage,
1042    pub(crate) finish_reason: Option<String>,
1043}
1044
1045#[derive(Debug, Deserialize)]
1046pub(crate) struct OpenAiMessage {
1047    pub(crate) reasoning_content: Option<String>,
1048    pub(crate) content: Option<String>,
1049    pub(crate) tool_calls: Option<Vec<OpenAiToolCall>>,
1050}
1051
1052#[derive(Debug, Deserialize)]
1053pub(crate) struct OpenAiToolCall {
1054    pub(crate) id: String,
1055    pub(crate) function: OpenAiFunction,
1056}
1057
1058#[derive(Debug, Deserialize)]
1059pub(crate) struct OpenAiFunction {
1060    pub(crate) name: String,
1061    pub(crate) arguments: String,
1062}
1063
1064#[derive(Debug, Deserialize)]
1065pub(crate) struct OpenAiUsage {
1066    #[serde(default)]
1067    pub(crate) prompt_tokens: usize,
1068    #[serde(default)]
1069    pub(crate) completion_tokens: usize,
1070    #[serde(default)]
1071    pub(crate) total_tokens: usize,
1072    /// MiniMax uses `total_characters` instead of token counts.
1073    #[serde(default)]
1074    pub(crate) total_characters: Option<usize>,
1075    /// OpenAI returns cached token count in `prompt_tokens_details.cached_tokens`
1076    #[serde(default)]
1077    pub(crate) prompt_tokens_details: Option<OpenAiPromptTokensDetails>,
1078}
1079
1080#[derive(Debug, Deserialize)]
1081pub(crate) struct OpenAiPromptTokensDetails {
1082    #[serde(default)]
1083    pub(crate) cached_tokens: Option<usize>,
1084}
1085
1086// OpenAI streaming types
1087#[derive(Debug, Deserialize)]
1088pub(crate) struct OpenAiStreamChunk {
1089    #[serde(default)]
1090    pub(crate) id: Option<String>,
1091    #[serde(default)]
1092    pub(crate) object: Option<String>,
1093    #[serde(default)]
1094    pub(crate) model: Option<String>,
1095    pub(crate) choices: Vec<OpenAiStreamChoice>,
1096    pub(crate) usage: Option<OpenAiUsage>,
1097}
1098
1099#[derive(Debug, Deserialize)]
1100pub(crate) struct OpenAiStreamChoice {
1101    pub(crate) message: Option<OpenAiMessage>,
1102    pub(crate) delta: Option<OpenAiDelta>,
1103    pub(crate) finish_reason: Option<String>,
1104}
1105
1106#[derive(Debug, Deserialize)]
1107pub(crate) struct OpenAiDelta {
1108    pub(crate) reasoning_content: Option<String>,
1109    pub(crate) content: Option<String>,
1110    pub(crate) tool_calls: Option<Vec<OpenAiToolCallDelta>>,
1111}
1112
1113#[derive(Debug, Deserialize)]
1114pub(crate) struct OpenAiToolCallDelta {
1115    pub(crate) index: usize,
1116    pub(crate) id: Option<String>,
1117    pub(crate) function: Option<OpenAiFunctionDelta>,
1118}
1119
1120#[derive(Debug, Deserialize)]
1121pub(crate) struct OpenAiFunctionDelta {
1122    pub(crate) name: Option<String>,
1123    pub(crate) arguments: Option<String>,
1124}