Skip to main content

a3s_code_core/llm/
openai.rs

1//! OpenAI-compatible LLM client
2
3use super::http::{default_http_client, normalize_base_url, HttpClient};
4use super::types::*;
5use super::LlmClient;
6use crate::llm::types::{ToolResultContent, ToolResultContentField};
7use crate::retry::{AttemptOutcome, RetryConfig};
8use anyhow::{Context, Result};
9use async_trait::async_trait;
10use futures::StreamExt;
11use serde::Deserialize;
12use std::collections::HashMap;
13use std::sync::Arc;
14use std::time::Instant;
15use tokio::sync::mpsc;
16
17/// OpenAI client
18pub struct OpenAiClient {
19    pub(crate) provider_name: String,
20    pub(crate) api_key: SecretString,
21    pub(crate) model: String,
22    pub(crate) base_url: String,
23    pub(crate) chat_completions_path: String,
24    pub(crate) headers: HashMap<String, String>,
25    pub(crate) temperature: Option<f32>,
26    pub(crate) max_tokens: Option<usize>,
27    pub(crate) http: Arc<dyn HttpClient>,
28    pub(crate) retry_config: RetryConfig,
29}
30
31impl OpenAiClient {
32    pub(crate) fn parse_tool_arguments(tool_name: &str, arguments: &str) -> serde_json::Value {
33        if arguments.trim().is_empty() {
34            return serde_json::Value::Object(Default::default());
35        }
36
37        serde_json::from_str(arguments).unwrap_or_else(|e| {
38            tracing::warn!(
39                "Failed to parse tool arguments JSON for tool '{}': {}",
40                tool_name,
41                e
42            );
43            serde_json::json!({
44                "__parse_error": format!(
45                    "Malformed tool arguments: {}. Raw input: {}",
46                    e, arguments
47                )
48            })
49        })
50    }
51
52    fn merge_stream_text(text_content: &mut String, incoming: &str) -> Option<String> {
53        if incoming.is_empty() {
54            return None;
55        }
56        if text_content.is_empty() {
57            text_content.push_str(incoming);
58            return Some(incoming.to_string());
59        }
60        if incoming == text_content.as_str() || text_content.ends_with(incoming) {
61            return None;
62        }
63        // If incoming contains text_content as a prefix (incoming is the full content),
64        // replace text_content instead of appending (avoids duplicate full content)
65        if incoming.starts_with(text_content.as_str()) && incoming.len() > text_content.len() {
66            let suffix = &incoming[text_content.len()..];
67            if !suffix.is_empty() {
68                *text_content = incoming.to_string();
69                return Some(suffix.to_string());
70            }
71            return None;
72        }
73        if let Some(suffix) = incoming.strip_prefix(text_content.as_str()) {
74            if suffix.is_empty() {
75                return None;
76            }
77            text_content.push_str(suffix);
78            return Some(suffix.to_string());
79        }
80        text_content.push_str(incoming);
81        Some(incoming.to_string())
82    }
83
84    pub fn new(api_key: String, model: String) -> Self {
85        Self {
86            provider_name: "openai".to_string(),
87            api_key: SecretString::new(api_key),
88            model,
89            base_url: "https://api.openai.com".to_string(),
90            chat_completions_path: "/v1/chat/completions".to_string(),
91            headers: HashMap::new(),
92            temperature: None,
93            max_tokens: None,
94            http: default_http_client(),
95            retry_config: RetryConfig::default(),
96        }
97    }
98
99    pub fn with_base_url(mut self, base_url: String) -> Self {
100        self.base_url = normalize_base_url(&base_url);
101        self
102    }
103
104    pub fn with_provider_name(mut self, provider_name: impl Into<String>) -> Self {
105        self.provider_name = provider_name.into();
106        self
107    }
108
109    pub fn with_chat_completions_path(mut self, path: impl Into<String>) -> Self {
110        let path = path.into();
111        self.chat_completions_path = if path.starts_with('/') {
112            path
113        } else {
114            format!("/{}", path)
115        };
116        self
117    }
118
119    pub fn with_temperature(mut self, temperature: f32) -> Self {
120        self.temperature = Some(temperature);
121        self
122    }
123
124    pub fn with_headers(mut self, headers: HashMap<String, String>) -> Self {
125        self.headers = headers;
126        self
127    }
128
129    pub fn with_max_tokens(mut self, max_tokens: usize) -> Self {
130        self.max_tokens = Some(max_tokens);
131        self
132    }
133
134    pub fn with_retry_config(mut self, retry_config: RetryConfig) -> Self {
135        self.retry_config = retry_config;
136        self
137    }
138
139    pub fn with_http_client(mut self, http: Arc<dyn HttpClient>) -> Self {
140        self.http = http;
141        self
142    }
143
144    pub(crate) fn request_headers(&self) -> Vec<(String, String)> {
145        let mut headers = Vec::with_capacity(self.headers.len() + 1);
146        let has_authorization = self
147            .headers
148            .keys()
149            .any(|key| key.eq_ignore_ascii_case("authorization"));
150        if !has_authorization {
151            headers.push((
152                "Authorization".to_string(),
153                format!("Bearer {}", self.api_key.expose()),
154            ));
155        }
156        headers.extend(
157            self.headers
158                .iter()
159                .map(|(key, value)| (key.clone(), value.clone())),
160        );
161        headers
162    }
163
164    pub(crate) fn convert_messages(&self, messages: &[Message]) -> Vec<serde_json::Value> {
165        messages
166            .iter()
167            .map(|msg| {
168                let content: serde_json::Value = if msg.content.len() == 1 {
169                    match &msg.content[0] {
170                        ContentBlock::Text { text } => serde_json::json!(text),
171                        ContentBlock::ToolResult {
172                            tool_use_id,
173                            content,
174                            ..
175                        } => {
176                            let content_str = match content {
177                                ToolResultContentField::Text(s) => s.clone(),
178                                ToolResultContentField::Blocks(blocks) => blocks
179                                    .iter()
180                                    .filter_map(|b| {
181                                        if let ToolResultContent::Text { text } = b {
182                                            Some(text.clone())
183                                        } else {
184                                            None
185                                        }
186                                    })
187                                    .collect::<Vec<_>>()
188                                    .join("\n"),
189                            };
190                            return serde_json::json!({
191                                "role": "tool",
192                                "tool_call_id": tool_use_id,
193                                "content": content_str,
194                            });
195                        }
196                        _ => serde_json::json!(""),
197                    }
198                } else {
199                    serde_json::json!(msg
200                        .content
201                        .iter()
202                        .map(|block| {
203                            match block {
204                                ContentBlock::Text { text } => serde_json::json!({
205                                    "type": "text",
206                                    "text": text,
207                                }),
208                                ContentBlock::Image { source } => serde_json::json!({
209                                    "type": "image_url",
210                                    "image_url": {
211                                        "url": format!(
212                                            "data:{};base64,{}",
213                                            source.media_type, source.data
214                                        ),
215                                    }
216                                }),
217                                ContentBlock::ToolUse { id, name, input } => serde_json::json!({
218                                    "type": "function",
219                                    "id": id,
220                                    "function": {
221                                        "name": name,
222                                        "arguments": input.to_string(),
223                                    }
224                                }),
225                                _ => serde_json::json!({}),
226                            }
227                        })
228                        .collect::<Vec<_>>())
229                };
230
231                // Handle assistant messages — kimi-k2.5 requires reasoning_content
232                // on all assistant messages when thinking mode is enabled
233                if msg.role == "assistant" {
234                    let rc = msg.reasoning_content.as_deref().unwrap_or("");
235                    let tool_calls: Vec<_> = msg.tool_calls();
236                    if !tool_calls.is_empty() {
237                        return serde_json::json!({
238                            "role": "assistant",
239                            "content": msg.text(),
240                            "reasoning_content": rc,
241                            "tool_calls": tool_calls.iter().map(|tc| {
242                                serde_json::json!({
243                                    "id": tc.id,
244                                    "type": "function",
245                                    "function": {
246                                        "name": tc.name,
247                                        "arguments": tc.args.to_string(),
248                                    }
249                                })
250                            }).collect::<Vec<_>>(),
251                        });
252                    }
253                    return serde_json::json!({
254                        "role": "assistant",
255                        "content": content,
256                        "reasoning_content": rc,
257                    });
258                }
259
260                serde_json::json!({
261                    "role": msg.role,
262                    "content": content,
263                })
264            })
265            .collect()
266    }
267
268    pub(crate) fn convert_tools(&self, tools: &[ToolDefinition]) -> Vec<serde_json::Value> {
269        tools
270            .iter()
271            .map(|t| {
272                serde_json::json!({
273                    "type": "function",
274                    "function": {
275                        "name": t.name,
276                        "description": t.description,
277                        "parameters": t.parameters,
278                    }
279                })
280            })
281            .collect()
282    }
283}
284
285#[async_trait]
286impl LlmClient for OpenAiClient {
287    async fn complete(
288        &self,
289        messages: &[Message],
290        system: Option<&str>,
291        tools: &[ToolDefinition],
292    ) -> Result<LlmResponse> {
293        {
294            let request_started_at = Instant::now();
295            let mut openai_messages = Vec::new();
296
297            if let Some(sys) = system {
298                openai_messages.push(serde_json::json!({
299                    "role": "system",
300                    "content": sys,
301                }));
302            }
303
304            openai_messages.extend(self.convert_messages(messages));
305
306            let mut request = serde_json::json!({
307                "model": self.model,
308                "messages": openai_messages,
309            });
310
311            if let Some(temp) = self.temperature {
312                request["temperature"] = serde_json::json!(temp);
313            }
314            if let Some(max) = self.max_tokens {
315                request["max_tokens"] = serde_json::json!(max);
316            }
317
318            if !tools.is_empty() {
319                request["tools"] = serde_json::json!(self.convert_tools(tools));
320            }
321
322            let url = format!("{}{}", self.base_url, self.chat_completions_path);
323            let request_headers = self.request_headers();
324
325            let response = crate::retry::with_retry(&self.retry_config, |_attempt| {
326                let http = &self.http;
327                let url = &url;
328                let request_headers = request_headers.clone();
329                let request = &request;
330                async move {
331                    let headers = request_headers
332                        .iter()
333                        .map(|(key, value)| (key.as_str(), value.as_str()))
334                        .collect::<Vec<_>>();
335                    // Non-streaming: use a non-cancelled token for now
336                    let cancel_token = tokio_util::sync::CancellationToken::new();
337                    match http.post(url, headers, request, cancel_token).await {
338                        Ok(resp) => {
339                            let status = reqwest::StatusCode::from_u16(resp.status)
340                                .unwrap_or(reqwest::StatusCode::INTERNAL_SERVER_ERROR);
341                            if status.is_success() {
342                                AttemptOutcome::Success(resp.body)
343                            } else if self.retry_config.is_retryable_status(status) {
344                                AttemptOutcome::Retryable {
345                                    status,
346                                    body: resp.body,
347                                    retry_after: None,
348                                }
349                            } else {
350                                AttemptOutcome::Fatal(anyhow::anyhow!(
351                                    "OpenAI API error at {} ({}): {}",
352                                    url,
353                                    status,
354                                    resp.body
355                                ))
356                            }
357                        }
358                        Err(e) => {
359                            eprintln!("[DEBUG] HTTP error: {:?}", e);
360                            AttemptOutcome::Fatal(e)
361                        }
362                    }
363                }
364            })
365            .await?;
366
367            let parsed: OpenAiResponse =
368                serde_json::from_str(&response).context("Failed to parse OpenAI response")?;
369
370            let choice = parsed.choices.into_iter().next().context("No choices")?;
371
372            let mut content = vec![];
373
374            let reasoning_content = choice.message.reasoning_content;
375
376            let text_content = choice.message.content;
377
378            if let Some(text) = text_content {
379                if !text.is_empty() {
380                    content.push(ContentBlock::Text { text });
381                }
382            }
383
384            if let Some(tool_calls) = choice.message.tool_calls {
385                for tc in tool_calls {
386                    content.push(ContentBlock::ToolUse {
387                        id: tc.id,
388                        name: tc.function.name.clone(),
389                        input: Self::parse_tool_arguments(
390                            &tc.function.name,
391                            &tc.function.arguments,
392                        ),
393                    });
394                }
395            }
396
397            let llm_response = LlmResponse {
398                message: Message {
399                    role: "assistant".to_string(),
400                    content,
401                    reasoning_content,
402                },
403                usage: TokenUsage {
404                    prompt_tokens: parsed.usage.prompt_tokens,
405                    completion_tokens: parsed.usage.completion_tokens,
406                    total_tokens: {
407                        let t = parsed.usage.total_tokens;
408                        // MiniMax: fall back to total_characters when total_tokens is 0.
409                        if t == 0 {
410                            parsed.usage.total_characters.unwrap_or(0)
411                        } else {
412                            t
413                        }
414                    },
415                    cache_read_tokens: parsed
416                        .usage
417                        .prompt_tokens_details
418                        .as_ref()
419                        .and_then(|d| d.cached_tokens),
420                    cache_write_tokens: None,
421                },
422                stop_reason: choice.finish_reason,
423                meta: Some(LlmResponseMeta {
424                    provider: Some(self.provider_name.clone()),
425                    request_model: Some(self.model.clone()),
426                    request_url: Some(url.clone()),
427                    response_id: parsed.id,
428                    response_model: parsed.model,
429                    response_object: parsed.object,
430                    first_token_ms: None,
431                    duration_ms: Some(request_started_at.elapsed().as_millis() as u64),
432                }),
433            };
434
435            crate::telemetry::record_llm_usage(
436                llm_response.usage.prompt_tokens,
437                llm_response.usage.completion_tokens,
438                llm_response.usage.total_tokens,
439                llm_response.stop_reason.as_deref(),
440            );
441
442            Ok(llm_response)
443        }
444    }
445
446    async fn complete_streaming(
447        &self,
448        messages: &[Message],
449        system: Option<&str>,
450        tools: &[ToolDefinition],
451        cancel_token: tokio_util::sync::CancellationToken,
452    ) -> Result<mpsc::Receiver<StreamEvent>> {
453        {
454            let request_started_at = Instant::now();
455            let mut openai_messages = Vec::new();
456
457            if let Some(sys) = system {
458                openai_messages.push(serde_json::json!({
459                    "role": "system",
460                    "content": sys,
461                }));
462            }
463
464            openai_messages.extend(self.convert_messages(messages));
465
466            let mut request = serde_json::json!({
467                "model": self.model,
468                "messages": openai_messages,
469                "stream": true,
470                "stream_options": { "include_usage": true },
471            });
472
473            if let Some(temp) = self.temperature {
474                request["temperature"] = serde_json::json!(temp);
475            }
476            if let Some(max) = self.max_tokens {
477                request["max_tokens"] = serde_json::json!(max);
478            }
479
480            if !tools.is_empty() {
481                request["tools"] = serde_json::json!(self.convert_tools(tools));
482            }
483
484            let url = format!("{}{}", self.base_url, self.chat_completions_path);
485            let request_headers = self.request_headers();
486
487            let streaming_resp = crate::retry::with_retry(&self.retry_config, |_attempt| {
488                let http = &self.http;
489                let url = &url;
490                let request_headers = request_headers.clone();
491                let request = &request;
492                let cancel_token = cancel_token.clone();
493                async move {
494                    let headers = request_headers
495                        .iter()
496                        .map(|(key, value)| (key.as_str(), value.as_str()))
497                        .collect::<Vec<_>>();
498                    // Wrap in tokio::select! so cancellation aborts the HTTP request mid-flight
499                    let resp = tokio::select! {
500                        _ = cancel_token.cancelled() => {
501                            return AttemptOutcome::Fatal(anyhow::anyhow!("HTTP request cancelled"));
502                        }
503                        result = http.post_streaming(url, headers, request, cancel_token.clone()) => {
504                            match result {
505                                Ok(r) => r,
506                                Err(e) => {
507                                    return AttemptOutcome::Fatal(anyhow::anyhow!("HTTP request failed: {}", e));
508                                }
509                            }
510                        }
511                    };
512                    let status = reqwest::StatusCode::from_u16(resp.status)
513                        .unwrap_or(reqwest::StatusCode::INTERNAL_SERVER_ERROR);
514                    if status.is_success() {
515                        AttemptOutcome::Success(resp)
516                    } else {
517                        let retry_after = resp
518                            .retry_after
519                            .as_deref()
520                            .and_then(|v| RetryConfig::parse_retry_after(Some(v)));
521                        if self.retry_config.is_retryable_status(status) {
522                            AttemptOutcome::Retryable {
523                                status,
524                                body: resp.error_body,
525                                retry_after,
526                            }
527                        } else {
528                            AttemptOutcome::Fatal(anyhow::anyhow!(
529                                "OpenAI API error at {} ({}): {}",
530                                url,
531                                status,
532                                resp.error_body
533                            ))
534                        }
535                    }
536                }
537            })
538            .await?;
539
540            let (tx, rx) = mpsc::channel(100);
541
542            let mut stream = streaming_resp.byte_stream;
543            let provider_name = self.provider_name.clone();
544            let request_model = self.model.clone();
545            let request_url = url.clone();
546            tokio::spawn(async move {
547                let mut buffer = String::new();
548                let mut content_blocks: Vec<ContentBlock> = Vec::new();
549                let mut text_content = String::new();
550                let mut reasoning_content_accum = String::new();
551                let mut tool_calls: std::collections::BTreeMap<usize, (String, String, String)> =
552                    std::collections::BTreeMap::new();
553                let mut usage = TokenUsage::default();
554                let mut finish_reason = None;
555                let mut response_id = None;
556                let mut response_model = None;
557                let mut response_object = None;
558                let mut first_token_ms = None;
559                let mut saw_done = false;
560                let mut parsed_any_event = false;
561
562                while let Some(chunk_result) = stream.next().await {
563                    let chunk = match chunk_result {
564                        Ok(c) => c,
565                        Err(e) => {
566                            tracing::error!("Stream error: {}", e);
567                            break;
568                        }
569                    };
570
571                    buffer.push_str(&String::from_utf8_lossy(&chunk));
572
573                    while let Some(event_end) = buffer.find("\n\n") {
574                        let event_data: String = buffer.drain(..event_end).collect();
575                        buffer.drain(..2);
576
577                        for line in event_data.lines() {
578                            if let Some(data) = line.strip_prefix("data: ") {
579                                if data == "[DONE]" {
580                                    saw_done = true;
581                                    if !text_content.is_empty() {
582                                        content_blocks.push(ContentBlock::Text {
583                                            text: text_content.clone(),
584                                        });
585                                    }
586                                    for (_, (id, name, args)) in tool_calls.iter() {
587                                        content_blocks.push(ContentBlock::ToolUse {
588                                            id: id.clone(),
589                                            name: name.clone(),
590                                            input: Self::parse_tool_arguments(name, args),
591                                        });
592                                    }
593                                    tool_calls.clear();
594                                    crate::telemetry::record_llm_usage(
595                                        usage.prompt_tokens,
596                                        usage.completion_tokens,
597                                        usage.total_tokens,
598                                        finish_reason.as_deref(),
599                                    );
600                                    let response = LlmResponse {
601                                        message: Message {
602                                            role: "assistant".to_string(),
603                                            content: std::mem::take(&mut content_blocks),
604                                            reasoning_content: if reasoning_content_accum.is_empty()
605                                            {
606                                                None
607                                            } else {
608                                                Some(std::mem::take(&mut reasoning_content_accum))
609                                            },
610                                        },
611                                        usage: usage.clone(),
612                                        stop_reason: std::mem::take(&mut finish_reason),
613                                        meta: Some(LlmResponseMeta {
614                                            provider: Some(provider_name.clone()),
615                                            request_model: Some(request_model.clone()),
616                                            request_url: Some(request_url.clone()),
617                                            response_id: response_id.clone(),
618                                            response_model: response_model.clone(),
619                                            response_object: response_object.clone(),
620                                            first_token_ms,
621                                            duration_ms: Some(
622                                                request_started_at.elapsed().as_millis() as u64,
623                                            ),
624                                        }),
625                                    };
626                                    let _ = tx.send(StreamEvent::Done(response)).await;
627                                    continue;
628                                }
629
630                                if let Ok(event) = serde_json::from_str::<OpenAiStreamChunk>(data) {
631                                    parsed_any_event = true;
632                                    if response_id.is_none() {
633                                        response_id = event.id.clone();
634                                    }
635                                    if response_model.is_none() {
636                                        response_model = event.model.clone();
637                                    }
638                                    if response_object.is_none() {
639                                        response_object = event.object.clone();
640                                    }
641                                    if let Some(u) = event.usage {
642                                        usage.prompt_tokens = u.prompt_tokens;
643                                        usage.completion_tokens = u.completion_tokens;
644                                        usage.total_tokens = u.total_tokens;
645                                        // MiniMax: fall back to total_characters when total_tokens is 0.
646                                        if usage.total_tokens == 0 {
647                                            usage.total_tokens = u.total_characters.unwrap_or(0);
648                                        }
649                                        usage.cache_read_tokens = u
650                                            .prompt_tokens_details
651                                            .as_ref()
652                                            .and_then(|d| d.cached_tokens);
653                                    }
654
655                                    if let Some(choice) = event.choices.into_iter().next() {
656                                        if let Some(reason) = choice.finish_reason {
657                                            finish_reason = Some(reason);
658                                        }
659
660                                        if let Some(message) = choice.message {
661                                            // If text_content already has content (from delta processing),
662                                            // skip message.content to avoid sending duplicate full content
663                                            let skip_content = !text_content.is_empty();
664                                            if let Some(reasoning) = message.reasoning_content {
665                                                reasoning_content_accum.push_str(&reasoning);
666                                                if first_token_ms.is_none() {
667                                                    first_token_ms = Some(
668                                                        request_started_at.elapsed().as_millis()
669                                                            as u64,
670                                                    );
671                                                }
672                                                if let Some(delta) = Self::merge_stream_text(
673                                                    &mut text_content,
674                                                    &reasoning,
675                                                ) {
676                                                    let _ = tx
677                                                        .send(StreamEvent::ReasoningDelta(delta))
678                                                        .await;
679                                                }
680                                            }
681                                            if !skip_content {
682                                                if let Some(content) = message
683                                                    .content
684                                                    .filter(|value| !value.is_empty())
685                                                {
686                                                    if first_token_ms.is_none() {
687                                                        first_token_ms = Some(
688                                                            request_started_at.elapsed().as_millis()
689                                                                as u64,
690                                                        );
691                                                    }
692                                                    if let Some(delta) = Self::merge_stream_text(
693                                                        &mut text_content,
694                                                        &content,
695                                                    ) {
696                                                        let _ = tx
697                                                            .send(StreamEvent::TextDelta(delta))
698                                                            .await;
699                                                    }
700                                                }
701                                            }
702                                            if let Some(tcs) = message.tool_calls {
703                                                for (index, tc) in tcs.into_iter().enumerate() {
704                                                    tool_calls.insert(
705                                                        index,
706                                                        (
707                                                            tc.id,
708                                                            tc.function.name,
709                                                            tc.function.arguments,
710                                                        ),
711                                                    );
712                                                }
713                                            }
714                                        } else if let Some(delta) = choice.delta {
715                                            if let Some(ref rc) = delta.reasoning_content {
716                                                reasoning_content_accum.push_str(rc);
717                                                if first_token_ms.is_none() {
718                                                    first_token_ms = Some(
719                                                        request_started_at.elapsed().as_millis()
720                                                            as u64,
721                                                    );
722                                                }
723                                                if let Some(delta) =
724                                                    Self::merge_stream_text(&mut text_content, rc)
725                                                {
726                                                    let _ = tx
727                                                        .send(StreamEvent::ReasoningDelta(delta))
728                                                        .await;
729                                                }
730                                            }
731
732                                            if let Some(content) = delta.content {
733                                                if first_token_ms.is_none() {
734                                                    first_token_ms = Some(
735                                                        request_started_at.elapsed().as_millis()
736                                                            as u64,
737                                                    );
738                                                }
739                                                if let Some(delta) = Self::merge_stream_text(
740                                                    &mut text_content,
741                                                    &content,
742                                                ) {
743                                                    let _ = tx
744                                                        .send(StreamEvent::TextDelta(delta))
745                                                        .await;
746                                                }
747                                            }
748
749                                            if let Some(tcs) = delta.tool_calls {
750                                                for tc in tcs {
751                                                    let entry = tool_calls
752                                                        .entry(tc.index)
753                                                        .or_insert_with(|| {
754                                                            (
755                                                                String::new(),
756                                                                String::new(),
757                                                                String::new(),
758                                                            )
759                                                        });
760
761                                                    if let Some(id) = tc.id {
762                                                        entry.0 = id;
763                                                    }
764                                                    if let Some(func) = tc.function {
765                                                        if let Some(name) = func.name {
766                                                            if first_token_ms.is_none() {
767                                                                first_token_ms = Some(
768                                                                    request_started_at
769                                                                        .elapsed()
770                                                                        .as_millis()
771                                                                        as u64,
772                                                                );
773                                                            }
774                                                            entry.1 = name.clone();
775                                                            let _ = tx
776                                                                .send(StreamEvent::ToolUseStart {
777                                                                    id: entry.0.clone(),
778                                                                    name,
779                                                                })
780                                                                .await;
781                                                        }
782                                                        if let Some(args) = func.arguments {
783                                                            entry.2.push_str(&args);
784                                                            let _ = tx
785                                                                .send(
786                                                                    StreamEvent::ToolUseInputDelta(
787                                                                        args,
788                                                                    ),
789                                                                )
790                                                                .await;
791                                                        }
792                                                    }
793                                                }
794                                            }
795                                        }
796                                    }
797                                }
798                            }
799                        }
800                    }
801                }
802
803                if saw_done {
804                    return;
805                }
806
807                let trailing = buffer.trim();
808                if !trailing.is_empty() {
809                    if let Ok(event) = serde_json::from_str::<OpenAiStreamChunk>(trailing) {
810                        parsed_any_event = true;
811                        if response_id.is_none() {
812                            response_id = event.id.clone();
813                        }
814                        if response_model.is_none() {
815                            response_model = event.model.clone();
816                        }
817                        if response_object.is_none() {
818                            response_object = event.object.clone();
819                        }
820                        if let Some(u) = event.usage {
821                            usage.prompt_tokens = u.prompt_tokens;
822                            usage.completion_tokens = u.completion_tokens;
823                            usage.total_tokens = u.total_tokens;
824                            usage.cache_read_tokens = u
825                                .prompt_tokens_details
826                                .as_ref()
827                                .and_then(|d| d.cached_tokens);
828                        }
829                        if let Some(choice) = event.choices.into_iter().next() {
830                            if let Some(reason) = choice.finish_reason {
831                                finish_reason = Some(reason);
832                            }
833                            // If text_content already has content (from delta processing),
834                            // skip message.content to avoid sending duplicate full content
835                            let skip_content = !text_content.is_empty();
836                            if let Some(message) = choice.message {
837                                if let Some(reasoning) = message.reasoning_content {
838                                    reasoning_content_accum.push_str(&reasoning);
839                                    if first_token_ms.is_none() {
840                                        first_token_ms =
841                                            Some(request_started_at.elapsed().as_millis() as u64);
842                                    }
843                                    if let Some(delta) =
844                                        Self::merge_stream_text(&mut text_content, &reasoning)
845                                    {
846                                        let _ = tx.send(StreamEvent::ReasoningDelta(delta)).await;
847                                    }
848                                }
849                                if !skip_content {
850                                    if let Some(content) =
851                                        message.content.filter(|value| !value.is_empty())
852                                    {
853                                        if first_token_ms.is_none() {
854                                            first_token_ms = Some(
855                                                request_started_at.elapsed().as_millis() as u64,
856                                            );
857                                        }
858                                        if let Some(delta) =
859                                            Self::merge_stream_text(&mut text_content, &content)
860                                        {
861                                            let _ = tx.send(StreamEvent::TextDelta(delta)).await;
862                                        }
863                                    }
864                                }
865                                if let Some(tcs) = message.tool_calls {
866                                    for (index, tc) in tcs.into_iter().enumerate() {
867                                        tool_calls.insert(
868                                            index,
869                                            (tc.id, tc.function.name, tc.function.arguments),
870                                        );
871                                    }
872                                }
873                            } else if let Some(delta) = choice.delta {
874                                if let Some(ref rc) = delta.reasoning_content {
875                                    reasoning_content_accum.push_str(rc);
876                                    if first_token_ms.is_none() {
877                                        first_token_ms =
878                                            Some(request_started_at.elapsed().as_millis() as u64);
879                                    }
880                                    if let Some(delta) =
881                                        Self::merge_stream_text(&mut text_content, rc)
882                                    {
883                                        let _ = tx.send(StreamEvent::ReasoningDelta(delta)).await;
884                                    }
885                                }
886                                if let Some(content) = delta.content {
887                                    if first_token_ms.is_none() {
888                                        first_token_ms =
889                                            Some(request_started_at.elapsed().as_millis() as u64);
890                                    }
891                                    if let Some(delta) =
892                                        Self::merge_stream_text(&mut text_content, &content)
893                                    {
894                                        let _ = tx.send(StreamEvent::TextDelta(delta)).await;
895                                    }
896                                }
897                            }
898                        }
899                    } else if let Ok(response) = serde_json::from_str::<OpenAiResponse>(trailing) {
900                        parsed_any_event = true;
901                        response_id = response.id.clone();
902                        response_model = response.model.clone();
903                        response_object = response.object.clone();
904                        usage.prompt_tokens = response.usage.prompt_tokens;
905                        usage.completion_tokens = response.usage.completion_tokens;
906                        usage.total_tokens = response.usage.total_tokens;
907                        // MiniMax: fall back to total_characters when total_tokens is 0.
908                        if usage.total_tokens == 0 {
909                            usage.total_tokens = response.usage.total_characters.unwrap_or(0);
910                        }
911                        usage.cache_read_tokens = response
912                            .usage
913                            .prompt_tokens_details
914                            .as_ref()
915                            .and_then(|d| d.cached_tokens);
916
917                        if let Some(choice) = response.choices.into_iter().next() {
918                            finish_reason = choice.finish_reason;
919                            if let Some(text) =
920                                choice.message.content.filter(|text| !text.is_empty())
921                            {
922                                if first_token_ms.is_none() {
923                                    first_token_ms =
924                                        Some(request_started_at.elapsed().as_millis() as u64);
925                                }
926                                let _ = Self::merge_stream_text(&mut text_content, &text);
927                            }
928                            if let Some(reasoning) = choice.message.reasoning_content {
929                                reasoning_content_accum.push_str(&reasoning);
930                            }
931                            if let Some(final_tool_calls) = choice.message.tool_calls {
932                                for tc in final_tool_calls {
933                                    tool_calls.insert(
934                                        tool_calls.len(),
935                                        (tc.id, tc.function.name, tc.function.arguments),
936                                    );
937                                }
938                            }
939                        }
940                    }
941                }
942
943                if parsed_any_event
944                    || !text_content.is_empty()
945                    || !tool_calls.is_empty()
946                    || !content_blocks.is_empty()
947                {
948                    tracing::warn!(
949                        provider = %provider_name,
950                        model = %request_model,
951                        "OpenAI-compatible stream ended without [DONE]; finalizing buffered response"
952                    );
953                    if !text_content.is_empty() {
954                        content_blocks.push(ContentBlock::Text {
955                            text: text_content.clone(),
956                        });
957                    }
958                    for (_, (id, name, args)) in tool_calls.iter() {
959                        content_blocks.push(ContentBlock::ToolUse {
960                            id: id.clone(),
961                            name: name.clone(),
962                            input: Self::parse_tool_arguments(name, args),
963                        });
964                    }
965                    tool_calls.clear();
966                    crate::telemetry::record_llm_usage(
967                        usage.prompt_tokens,
968                        usage.completion_tokens,
969                        usage.total_tokens,
970                        finish_reason.as_deref(),
971                    );
972                    let response = LlmResponse {
973                        message: Message {
974                            role: "assistant".to_string(),
975                            content: std::mem::take(&mut content_blocks),
976                            reasoning_content: if reasoning_content_accum.is_empty() {
977                                None
978                            } else {
979                                Some(std::mem::take(&mut reasoning_content_accum))
980                            },
981                        },
982                        usage: usage.clone(),
983                        stop_reason: std::mem::take(&mut finish_reason),
984                        meta: Some(LlmResponseMeta {
985                            provider: Some(provider_name.clone()),
986                            request_model: Some(request_model.clone()),
987                            request_url: Some(request_url.clone()),
988                            response_id: response_id.clone(),
989                            response_model: response_model.clone(),
990                            response_object: response_object.clone(),
991                            first_token_ms,
992                            duration_ms: Some(request_started_at.elapsed().as_millis() as u64),
993                        }),
994                    };
995                    let _ = tx.send(StreamEvent::Done(response)).await;
996                } else {
997                    tracing::warn!(
998                        provider = %provider_name,
999                        model = %request_model,
1000                        trailing = %trailing.chars().take(400).collect::<String>(),
1001                        "OpenAI-compatible stream ended without any parseable events"
1002                    );
1003                }
1004            });
1005
1006            Ok(rx)
1007        }
1008    }
1009}
1010
1011// OpenAI API response types (private)
1012#[derive(Debug, Deserialize)]
1013pub(crate) struct OpenAiResponse {
1014    #[serde(default)]
1015    pub(crate) id: Option<String>,
1016    #[serde(default)]
1017    pub(crate) object: Option<String>,
1018    #[serde(default)]
1019    pub(crate) model: Option<String>,
1020    pub(crate) choices: Vec<OpenAiChoice>,
1021    pub(crate) usage: OpenAiUsage,
1022}
1023
1024#[derive(Debug, Deserialize)]
1025pub(crate) struct OpenAiChoice {
1026    pub(crate) message: OpenAiMessage,
1027    pub(crate) finish_reason: Option<String>,
1028}
1029
1030#[derive(Debug, Deserialize)]
1031pub(crate) struct OpenAiMessage {
1032    pub(crate) reasoning_content: Option<String>,
1033    pub(crate) content: Option<String>,
1034    pub(crate) tool_calls: Option<Vec<OpenAiToolCall>>,
1035}
1036
1037#[derive(Debug, Deserialize)]
1038pub(crate) struct OpenAiToolCall {
1039    pub(crate) id: String,
1040    pub(crate) function: OpenAiFunction,
1041}
1042
1043#[derive(Debug, Deserialize)]
1044pub(crate) struct OpenAiFunction {
1045    pub(crate) name: String,
1046    pub(crate) arguments: String,
1047}
1048
1049#[derive(Debug, Deserialize)]
1050pub(crate) struct OpenAiUsage {
1051    #[serde(default)]
1052    pub(crate) prompt_tokens: usize,
1053    #[serde(default)]
1054    pub(crate) completion_tokens: usize,
1055    #[serde(default)]
1056    pub(crate) total_tokens: usize,
1057    /// MiniMax uses `total_characters` instead of token counts.
1058    #[serde(default)]
1059    pub(crate) total_characters: Option<usize>,
1060    /// OpenAI returns cached token count in `prompt_tokens_details.cached_tokens`
1061    #[serde(default)]
1062    pub(crate) prompt_tokens_details: Option<OpenAiPromptTokensDetails>,
1063}
1064
1065#[derive(Debug, Deserialize)]
1066pub(crate) struct OpenAiPromptTokensDetails {
1067    #[serde(default)]
1068    pub(crate) cached_tokens: Option<usize>,
1069}
1070
1071// OpenAI streaming types
1072#[derive(Debug, Deserialize)]
1073pub(crate) struct OpenAiStreamChunk {
1074    #[serde(default)]
1075    pub(crate) id: Option<String>,
1076    #[serde(default)]
1077    pub(crate) object: Option<String>,
1078    #[serde(default)]
1079    pub(crate) model: Option<String>,
1080    pub(crate) choices: Vec<OpenAiStreamChoice>,
1081    pub(crate) usage: Option<OpenAiUsage>,
1082}
1083
1084#[derive(Debug, Deserialize)]
1085pub(crate) struct OpenAiStreamChoice {
1086    pub(crate) message: Option<OpenAiMessage>,
1087    pub(crate) delta: Option<OpenAiDelta>,
1088    pub(crate) finish_reason: Option<String>,
1089}
1090
1091#[derive(Debug, Deserialize)]
1092pub(crate) struct OpenAiDelta {
1093    pub(crate) reasoning_content: Option<String>,
1094    pub(crate) content: Option<String>,
1095    pub(crate) tool_calls: Option<Vec<OpenAiToolCallDelta>>,
1096}
1097
1098#[derive(Debug, Deserialize)]
1099pub(crate) struct OpenAiToolCallDelta {
1100    pub(crate) index: usize,
1101    pub(crate) id: Option<String>,
1102    pub(crate) function: Option<OpenAiFunctionDelta>,
1103}
1104
1105#[derive(Debug, Deserialize)]
1106pub(crate) struct OpenAiFunctionDelta {
1107    pub(crate) name: Option<String>,
1108    pub(crate) arguments: Option<String>,
1109}