Skip to main content

a3s_code_core/llm/
openai.rs

1//! OpenAI-compatible LLM client
2
3use super::http::{default_http_client, normalize_base_url, HttpClient};
4use super::structured;
5use super::types::*;
6use super::LlmClient;
7use crate::llm::types::{ToolResultContent, ToolResultContentField};
8use crate::retry::{AttemptOutcome, RetryConfig};
9use anyhow::{Context, Result};
10use async_trait::async_trait;
11use futures::StreamExt;
12use serde::Deserialize;
13use std::collections::HashMap;
14use std::sync::Arc;
15use std::time::Instant;
16use tokio::sync::mpsc;
17
18/// OpenAI client
19pub struct OpenAiClient {
20    pub(crate) provider_name: String,
21    pub(crate) api_key: SecretString,
22    pub(crate) model: String,
23    pub(crate) base_url: String,
24    pub(crate) chat_completions_path: String,
25    pub(crate) headers: HashMap<String, String>,
26    pub(crate) temperature: Option<f32>,
27    pub(crate) max_tokens: Option<usize>,
28    pub(crate) http: Arc<dyn HttpClient>,
29    pub(crate) retry_config: RetryConfig,
30}
31
32impl OpenAiClient {
33    pub(crate) fn parse_tool_arguments(tool_name: &str, arguments: &str) -> serde_json::Value {
34        if arguments.trim().is_empty() {
35            return serde_json::Value::Object(Default::default());
36        }
37
38        serde_json::from_str(arguments).unwrap_or_else(|e| {
39            tracing::warn!(
40                "Failed to parse tool arguments JSON for tool '{}': {}",
41                tool_name,
42                e
43            );
44            serde_json::json!({
45                "__parse_error": format!(
46                    "Malformed tool arguments: {}. Raw input: {}",
47                    e, arguments
48                )
49            })
50        })
51    }
52
53    fn merge_stream_text(text_content: &mut String, incoming: &str) -> Option<String> {
54        if incoming.is_empty() {
55            return None;
56        }
57        if text_content.is_empty() {
58            text_content.push_str(incoming);
59            return Some(incoming.to_string());
60        }
61        if incoming == text_content.as_str() || text_content.ends_with(incoming) {
62            return None;
63        }
64        // If incoming contains text_content as a prefix (incoming is the full content),
65        // replace text_content instead of appending (avoids duplicate full content)
66        if incoming.starts_with(text_content.as_str()) && incoming.len() > text_content.len() {
67            let suffix = &incoming[text_content.len()..];
68            if !suffix.is_empty() {
69                *text_content = incoming.to_string();
70                return Some(suffix.to_string());
71            }
72            return None;
73        }
74        if let Some(suffix) = incoming.strip_prefix(text_content.as_str()) {
75            if suffix.is_empty() {
76                return None;
77            }
78            text_content.push_str(suffix);
79            return Some(suffix.to_string());
80        }
81        text_content.push_str(incoming);
82        Some(incoming.to_string())
83    }
84
85    pub fn new(api_key: String, model: String) -> Self {
86        Self {
87            provider_name: "openai".to_string(),
88            api_key: SecretString::new(api_key),
89            model,
90            base_url: "https://api.openai.com".to_string(),
91            chat_completions_path: "/v1/chat/completions".to_string(),
92            headers: HashMap::new(),
93            temperature: None,
94            max_tokens: None,
95            http: default_http_client(),
96            retry_config: RetryConfig::default(),
97        }
98    }
99
100    pub fn with_base_url(mut self, base_url: String) -> Self {
101        self.base_url = normalize_base_url(&base_url);
102        self
103    }
104
105    pub fn with_provider_name(mut self, provider_name: impl Into<String>) -> Self {
106        self.provider_name = provider_name.into();
107        self
108    }
109
110    pub fn with_chat_completions_path(mut self, path: impl Into<String>) -> Self {
111        let path = path.into();
112        self.chat_completions_path = if path.starts_with('/') {
113            path
114        } else {
115            format!("/{}", path)
116        };
117        self
118    }
119
120    pub fn with_temperature(mut self, temperature: f32) -> Self {
121        self.temperature = Some(temperature);
122        self
123    }
124
125    pub fn with_headers(mut self, headers: HashMap<String, String>) -> Self {
126        self.headers = headers;
127        self
128    }
129
130    pub fn with_max_tokens(mut self, max_tokens: usize) -> Self {
131        self.max_tokens = Some(max_tokens);
132        self
133    }
134
135    pub fn with_retry_config(mut self, retry_config: RetryConfig) -> Self {
136        self.retry_config = retry_config;
137        self
138    }
139
140    pub fn with_http_client(mut self, http: Arc<dyn HttpClient>) -> Self {
141        self.http = http;
142        self
143    }
144
145    pub(crate) fn request_headers(&self) -> Vec<(String, String)> {
146        let mut headers = Vec::with_capacity(self.headers.len() + 1);
147        let has_authorization = self
148            .headers
149            .keys()
150            .any(|key| key.eq_ignore_ascii_case("authorization"));
151        if !has_authorization {
152            headers.push((
153                "Authorization".to_string(),
154                format!("Bearer {}", self.api_key.expose()),
155            ));
156        }
157        headers.extend(
158            self.headers
159                .iter()
160                .map(|(key, value)| (key.clone(), value.clone())),
161        );
162        headers
163    }
164
165    pub(crate) fn convert_messages(&self, messages: &[Message]) -> Vec<serde_json::Value> {
166        messages
167            .iter()
168            .map(|msg| {
169                let content: serde_json::Value = if msg.content.len() == 1 {
170                    match &msg.content[0] {
171                        ContentBlock::Text { text } => serde_json::json!(text),
172                        ContentBlock::ToolResult {
173                            tool_use_id,
174                            content,
175                            ..
176                        } => {
177                            let content_str = match content {
178                                ToolResultContentField::Text(s) => s.clone(),
179                                ToolResultContentField::Blocks(blocks) => blocks
180                                    .iter()
181                                    .filter_map(|b| {
182                                        if let ToolResultContent::Text { text } = b {
183                                            Some(text.clone())
184                                        } else {
185                                            None
186                                        }
187                                    })
188                                    .collect::<Vec<_>>()
189                                    .join("\n"),
190                            };
191                            return serde_json::json!({
192                                "role": "tool",
193                                "tool_call_id": tool_use_id,
194                                "content": content_str,
195                            });
196                        }
197                        _ => serde_json::json!(""),
198                    }
199                } else {
200                    serde_json::json!(msg
201                        .content
202                        .iter()
203                        .map(|block| {
204                            match block {
205                                ContentBlock::Text { text } => serde_json::json!({
206                                    "type": "text",
207                                    "text": text,
208                                }),
209                                ContentBlock::Image { source } => serde_json::json!({
210                                    "type": "image_url",
211                                    "image_url": {
212                                        "url": format!(
213                                            "data:{};base64,{}",
214                                            source.media_type, source.data
215                                        ),
216                                    }
217                                }),
218                                ContentBlock::ToolUse { id, name, input } => serde_json::json!({
219                                    "type": "function",
220                                    "id": id,
221                                    "function": {
222                                        "name": name,
223                                        "arguments": input.to_string(),
224                                    }
225                                }),
226                                _ => serde_json::json!({}),
227                            }
228                        })
229                        .collect::<Vec<_>>())
230                };
231
232                // Handle assistant messages — kimi-k2.5 requires reasoning_content
233                // on all assistant messages when thinking mode is enabled
234                if msg.role == "assistant" {
235                    let rc = msg.reasoning_content.as_deref().unwrap_or("");
236                    let tool_calls: Vec<_> = msg.tool_calls();
237                    if !tool_calls.is_empty() {
238                        return serde_json::json!({
239                            "role": "assistant",
240                            "content": msg.text(),
241                            "reasoning_content": rc,
242                            "tool_calls": tool_calls.iter().map(|tc| {
243                                serde_json::json!({
244                                    "id": tc.id,
245                                    "type": "function",
246                                    "function": {
247                                        "name": tc.name,
248                                        "arguments": tc.args.to_string(),
249                                    }
250                                })
251                            }).collect::<Vec<_>>(),
252                        });
253                    }
254                    return serde_json::json!({
255                        "role": "assistant",
256                        "content": content,
257                        "reasoning_content": rc,
258                    });
259                }
260
261                serde_json::json!({
262                    "role": msg.role,
263                    "content": content,
264                })
265            })
266            .collect()
267    }
268
269    pub(crate) fn convert_tools(&self, tools: &[ToolDefinition]) -> Vec<serde_json::Value> {
270        tools
271            .iter()
272            .map(|t| {
273                serde_json::json!({
274                    "type": "function",
275                    "function": {
276                        "name": t.name,
277                        "description": t.description,
278                        "parameters": t.parameters,
279                    }
280                })
281            })
282            .collect()
283    }
284}
285
286impl OpenAiClient {
287    /// Apply a structured-output directive to an OpenAI-compatible request.
288    ///
289    /// OpenAI-compatible APIs support both forced function `tool_choice` and
290    /// native `response_format` (`json_object` / `json_schema` + `strict`).
291    fn apply_directive(
292        request: &mut serde_json::Value,
293        directive: &structured::StructuredDirective,
294    ) {
295        if let Some(tool) = &directive.force_tool {
296            request["tool_choice"] = serde_json::json!({
297                "type": "function",
298                "function": { "name": tool }
299            });
300        }
301        if let Some(rf) = &directive.response_format {
302            request["response_format"] = match rf {
303                structured::ResponseFormat::JsonObject => {
304                    serde_json::json!({ "type": "json_object" })
305                }
306                structured::ResponseFormat::JsonSchema { name, schema } => serde_json::json!({
307                    "type": "json_schema",
308                    "json_schema": { "name": name, "schema": schema, "strict": true }
309                }),
310            };
311        }
312    }
313
314    /// Build a chat-completions request body, optionally applying a directive.
315    fn build_chat_request(
316        &self,
317        messages: &[Message],
318        system: Option<&str>,
319        tools: &[ToolDefinition],
320        directive: Option<&structured::StructuredDirective>,
321    ) -> serde_json::Value {
322        let mut openai_messages = Vec::new();
323
324        if let Some(sys) = system {
325            openai_messages.push(serde_json::json!({
326                "role": "system",
327                "content": sys,
328            }));
329        }
330
331        openai_messages.extend(self.convert_messages(messages));
332
333        let mut request = serde_json::json!({
334            "model": self.model,
335            "messages": openai_messages,
336        });
337
338        if let Some(temp) = self.temperature {
339            request["temperature"] = serde_json::json!(temp);
340        }
341        if let Some(max) = self.max_tokens {
342            request["max_tokens"] = serde_json::json!(max);
343        }
344
345        if !tools.is_empty() {
346            request["tools"] = serde_json::json!(self.convert_tools(tools));
347        }
348
349        if let Some(directive) = directive {
350            Self::apply_directive(&mut request, directive);
351        }
352
353        request
354    }
355
356    /// Execute a fully-built (non-streaming) chat-completions request.
357    async fn send_request(&self, request: serde_json::Value) -> Result<LlmResponse> {
358        {
359            let request_started_at = Instant::now();
360            let url = format!("{}{}", self.base_url, self.chat_completions_path);
361            let request_headers = self.request_headers();
362
363            let response = crate::retry::with_retry(&self.retry_config, |_attempt| {
364                let http = &self.http;
365                let url = &url;
366                let request_headers = request_headers.clone();
367                let request = &request;
368                async move {
369                    let headers = request_headers
370                        .iter()
371                        .map(|(key, value)| (key.as_str(), value.as_str()))
372                        .collect::<Vec<_>>();
373                    // Non-streaming: use a non-cancelled token for now
374                    let cancel_token = tokio_util::sync::CancellationToken::new();
375                    match http.post(url, headers, request, cancel_token).await {
376                        Ok(resp) => {
377                            let status = reqwest::StatusCode::from_u16(resp.status)
378                                .unwrap_or(reqwest::StatusCode::INTERNAL_SERVER_ERROR);
379                            if status.is_success() {
380                                AttemptOutcome::Success(resp.body)
381                            } else if self.retry_config.is_retryable_status(status) {
382                                AttemptOutcome::Retryable {
383                                    status,
384                                    body: resp.body,
385                                    retry_after: None,
386                                }
387                            } else {
388                                AttemptOutcome::Fatal(anyhow::anyhow!(
389                                    "OpenAI API error at {} ({}): {}",
390                                    url,
391                                    status,
392                                    resp.body
393                                ))
394                            }
395                        }
396                        Err(e) => {
397                            tracing::error!("HTTP error: {e:?}");
398                            AttemptOutcome::Fatal(e)
399                        }
400                    }
401                }
402            })
403            .await?;
404
405            let parsed: OpenAiResponse =
406                serde_json::from_str(&response).context("Failed to parse OpenAI response")?;
407
408            let choice = parsed.choices.into_iter().next().context("No choices")?;
409
410            let mut content = vec![];
411
412            let reasoning_content = choice.message.reasoning_content;
413
414            let text_content = choice.message.content;
415
416            if let Some(text) = text_content {
417                if !text.is_empty() {
418                    content.push(ContentBlock::Text { text });
419                }
420            }
421
422            if let Some(tool_calls) = choice.message.tool_calls {
423                for tc in tool_calls {
424                    content.push(ContentBlock::ToolUse {
425                        id: tc.id,
426                        name: tc.function.name.clone(),
427                        input: Self::parse_tool_arguments(
428                            &tc.function.name,
429                            &tc.function.arguments,
430                        ),
431                    });
432                }
433            }
434
435            let llm_response = LlmResponse {
436                message: Message {
437                    role: "assistant".to_string(),
438                    content,
439                    reasoning_content,
440                },
441                usage: TokenUsage {
442                    prompt_tokens: parsed.usage.prompt_tokens,
443                    completion_tokens: parsed.usage.completion_tokens,
444                    total_tokens: {
445                        let t = parsed.usage.total_tokens;
446                        // MiniMax: fall back to total_characters when total_tokens is 0.
447                        if t == 0 {
448                            parsed.usage.total_characters.unwrap_or(0)
449                        } else {
450                            t
451                        }
452                    },
453                    cache_read_tokens: parsed
454                        .usage
455                        .prompt_tokens_details
456                        .as_ref()
457                        .and_then(|d| d.cached_tokens),
458                    cache_write_tokens: None,
459                },
460                stop_reason: choice.finish_reason,
461                meta: Some(LlmResponseMeta {
462                    provider: Some(self.provider_name.clone()),
463                    request_model: Some(self.model.clone()),
464                    request_url: Some(url.clone()),
465                    response_id: parsed.id,
466                    response_model: parsed.model,
467                    response_object: parsed.object,
468                    first_token_ms: None,
469                    duration_ms: Some(request_started_at.elapsed().as_millis() as u64),
470                }),
471            };
472
473            crate::telemetry::record_llm_usage(
474                llm_response.usage.prompt_tokens,
475                llm_response.usage.completion_tokens,
476                llm_response.usage.total_tokens,
477                llm_response.stop_reason.as_deref(),
478            );
479
480            Ok(llm_response)
481        }
482    }
483}
484
485#[async_trait]
486impl LlmClient for OpenAiClient {
487    async fn complete(
488        &self,
489        messages: &[Message],
490        system: Option<&str>,
491        tools: &[ToolDefinition],
492    ) -> Result<LlmResponse> {
493        self.send_request(self.build_chat_request(messages, system, tools, None))
494            .await
495    }
496
497    async fn complete_structured(
498        &self,
499        messages: &[Message],
500        system: Option<&str>,
501        tools: &[ToolDefinition],
502        directive: &structured::StructuredDirective,
503    ) -> Result<LlmResponse> {
504        self.send_request(self.build_chat_request(messages, system, tools, Some(directive)))
505            .await
506    }
507
508    fn native_structured_support(&self) -> structured::NativeStructuredSupport {
509        structured::NativeStructuredSupport::JsonSchema
510    }
511
512    async fn complete_streaming(
513        &self,
514        messages: &[Message],
515        system: Option<&str>,
516        tools: &[ToolDefinition],
517        cancel_token: tokio_util::sync::CancellationToken,
518    ) -> Result<mpsc::Receiver<StreamEvent>> {
519        self.send_streaming(
520            self.build_chat_request(messages, system, tools, None),
521            cancel_token,
522        )
523        .await
524    }
525
526    async fn complete_streaming_structured(
527        &self,
528        messages: &[Message],
529        system: Option<&str>,
530        tools: &[ToolDefinition],
531        directive: &structured::StructuredDirective,
532        cancel_token: tokio_util::sync::CancellationToken,
533    ) -> Result<mpsc::Receiver<StreamEvent>> {
534        self.send_streaming(
535            self.build_chat_request(messages, system, tools, Some(directive)),
536            cancel_token,
537        )
538        .await
539    }
540}
541
542impl OpenAiClient {
543    /// Execute a fully-built streaming chat-completions request (sets `stream`).
544    async fn send_streaming(
545        &self,
546        mut request: serde_json::Value,
547        cancel_token: tokio_util::sync::CancellationToken,
548    ) -> Result<mpsc::Receiver<StreamEvent>> {
549        {
550            request["stream"] = serde_json::json!(true);
551            request["stream_options"] = serde_json::json!({ "include_usage": true });
552            let request_started_at = Instant::now();
553            let url = format!("{}{}", self.base_url, self.chat_completions_path);
554            let request_headers = self.request_headers();
555
556            let streaming_resp = crate::retry::with_retry(&self.retry_config, |_attempt| {
557                let http = &self.http;
558                let url = &url;
559                let request_headers = request_headers.clone();
560                let request = &request;
561                let cancel_token = cancel_token.clone();
562                async move {
563                    let headers = request_headers
564                        .iter()
565                        .map(|(key, value)| (key.as_str(), value.as_str()))
566                        .collect::<Vec<_>>();
567                    // Wrap in tokio::select! so cancellation aborts the HTTP request mid-flight
568                    let resp = tokio::select! {
569                        _ = cancel_token.cancelled() => {
570                            return AttemptOutcome::Fatal(anyhow::anyhow!("HTTP request cancelled"));
571                        }
572                        result = http.post_streaming(url, headers, request, cancel_token.clone()) => {
573                            match result {
574                                Ok(r) => r,
575                                Err(e) => {
576                                    return AttemptOutcome::Fatal(anyhow::anyhow!("HTTP request failed: {}", e));
577                                }
578                            }
579                        }
580                    };
581                    let status = reqwest::StatusCode::from_u16(resp.status)
582                        .unwrap_or(reqwest::StatusCode::INTERNAL_SERVER_ERROR);
583                    if status.is_success() {
584                        AttemptOutcome::Success(resp)
585                    } else {
586                        let retry_after = resp
587                            .retry_after
588                            .as_deref()
589                            .and_then(|v| RetryConfig::parse_retry_after(Some(v)));
590                        if self.retry_config.is_retryable_status(status) {
591                            AttemptOutcome::Retryable {
592                                status,
593                                body: resp.error_body,
594                                retry_after,
595                            }
596                        } else {
597                            AttemptOutcome::Fatal(anyhow::anyhow!(
598                                "OpenAI API error at {} ({}): {}",
599                                url,
600                                status,
601                                resp.error_body
602                            ))
603                        }
604                    }
605                }
606            })
607            .await?;
608
609            let (tx, rx) = mpsc::channel(100);
610
611            let mut stream = streaming_resp.byte_stream;
612            let provider_name = self.provider_name.clone();
613            let request_model = self.model.clone();
614            let request_url = url.clone();
615            tokio::spawn(async move {
616                let mut buffer = String::new();
617                let mut content_blocks: Vec<ContentBlock> = Vec::new();
618                let mut text_content = String::new();
619                let mut reasoning_content_accum = String::new();
620                let mut tool_calls: std::collections::BTreeMap<usize, (String, String, String)> =
621                    std::collections::BTreeMap::new();
622                let mut usage = TokenUsage::default();
623                let mut finish_reason = None;
624                let mut response_id = None;
625                let mut response_model = None;
626                let mut response_object = None;
627                let mut first_token_ms = None;
628                let mut saw_done = false;
629                let mut parsed_any_event = false;
630
631                while let Some(chunk_result) = stream.next().await {
632                    let chunk = match chunk_result {
633                        Ok(c) => c,
634                        Err(e) => {
635                            tracing::error!("Stream error: {}", e);
636                            break;
637                        }
638                    };
639
640                    buffer.push_str(&String::from_utf8_lossy(&chunk));
641
642                    while let Some(event_end) = buffer.find("\n\n") {
643                        let event_data: String = buffer.drain(..event_end).collect();
644                        buffer.drain(..2);
645
646                        for line in event_data.lines() {
647                            if let Some(data) = line.strip_prefix("data: ") {
648                                if data == "[DONE]" {
649                                    saw_done = true;
650                                    if !text_content.is_empty() {
651                                        content_blocks.push(ContentBlock::Text {
652                                            text: text_content.clone(),
653                                        });
654                                    }
655                                    for (_, (id, name, args)) in tool_calls.iter() {
656                                        content_blocks.push(ContentBlock::ToolUse {
657                                            id: id.clone(),
658                                            name: name.clone(),
659                                            input: Self::parse_tool_arguments(name, args),
660                                        });
661                                    }
662                                    tool_calls.clear();
663                                    crate::telemetry::record_llm_usage(
664                                        usage.prompt_tokens,
665                                        usage.completion_tokens,
666                                        usage.total_tokens,
667                                        finish_reason.as_deref(),
668                                    );
669                                    let response = LlmResponse {
670                                        message: Message {
671                                            role: "assistant".to_string(),
672                                            content: std::mem::take(&mut content_blocks),
673                                            reasoning_content: if reasoning_content_accum.is_empty()
674                                            {
675                                                None
676                                            } else {
677                                                Some(std::mem::take(&mut reasoning_content_accum))
678                                            },
679                                        },
680                                        usage: usage.clone(),
681                                        stop_reason: std::mem::take(&mut finish_reason),
682                                        meta: Some(LlmResponseMeta {
683                                            provider: Some(provider_name.clone()),
684                                            request_model: Some(request_model.clone()),
685                                            request_url: Some(request_url.clone()),
686                                            response_id: response_id.clone(),
687                                            response_model: response_model.clone(),
688                                            response_object: response_object.clone(),
689                                            first_token_ms,
690                                            duration_ms: Some(
691                                                request_started_at.elapsed().as_millis() as u64,
692                                            ),
693                                        }),
694                                    };
695                                    let _ = tx.send(StreamEvent::Done(response)).await;
696                                    continue;
697                                }
698
699                                if let Ok(event) = serde_json::from_str::<OpenAiStreamChunk>(data) {
700                                    parsed_any_event = true;
701                                    if response_id.is_none() {
702                                        response_id = event.id.clone();
703                                    }
704                                    if response_model.is_none() {
705                                        response_model = event.model.clone();
706                                    }
707                                    if response_object.is_none() {
708                                        response_object = event.object.clone();
709                                    }
710                                    if let Some(u) = event.usage {
711                                        usage.prompt_tokens = u.prompt_tokens;
712                                        usage.completion_tokens = u.completion_tokens;
713                                        usage.total_tokens = u.total_tokens;
714                                        // MiniMax: fall back to total_characters when total_tokens is 0.
715                                        if usage.total_tokens == 0 {
716                                            usage.total_tokens = u.total_characters.unwrap_or(0);
717                                        }
718                                        usage.cache_read_tokens = u
719                                            .prompt_tokens_details
720                                            .as_ref()
721                                            .and_then(|d| d.cached_tokens);
722                                    }
723
724                                    if let Some(choice) = event.choices.into_iter().next() {
725                                        if let Some(reason) = choice.finish_reason {
726                                            finish_reason = Some(reason);
727                                        }
728
729                                        if let Some(message) = choice.message {
730                                            // If text_content already has content (from delta processing),
731                                            // skip message.content to avoid sending duplicate full content
732                                            let skip_content = !text_content.is_empty();
733                                            if let Some(reasoning) = message.reasoning_content {
734                                                reasoning_content_accum.push_str(&reasoning);
735                                                if first_token_ms.is_none() {
736                                                    first_token_ms = Some(
737                                                        request_started_at.elapsed().as_millis()
738                                                            as u64,
739                                                    );
740                                                }
741                                                if let Some(delta) = Self::merge_stream_text(
742                                                    &mut text_content,
743                                                    &reasoning,
744                                                ) {
745                                                    let _ = tx
746                                                        .send(StreamEvent::ReasoningDelta(delta))
747                                                        .await;
748                                                }
749                                            }
750                                            if !skip_content {
751                                                if let Some(content) = message
752                                                    .content
753                                                    .filter(|value| !value.is_empty())
754                                                {
755                                                    if first_token_ms.is_none() {
756                                                        first_token_ms = Some(
757                                                            request_started_at.elapsed().as_millis()
758                                                                as u64,
759                                                        );
760                                                    }
761                                                    if let Some(delta) = Self::merge_stream_text(
762                                                        &mut text_content,
763                                                        &content,
764                                                    ) {
765                                                        let _ = tx
766                                                            .send(StreamEvent::TextDelta(delta))
767                                                            .await;
768                                                    }
769                                                }
770                                            }
771                                            if let Some(tcs) = message.tool_calls {
772                                                for (index, tc) in tcs.into_iter().enumerate() {
773                                                    tool_calls.insert(
774                                                        index,
775                                                        (
776                                                            tc.id,
777                                                            tc.function.name,
778                                                            tc.function.arguments,
779                                                        ),
780                                                    );
781                                                }
782                                            }
783                                        } else if let Some(delta) = choice.delta {
784                                            if let Some(ref rc) = delta.reasoning_content {
785                                                reasoning_content_accum.push_str(rc);
786                                                if first_token_ms.is_none() {
787                                                    first_token_ms = Some(
788                                                        request_started_at.elapsed().as_millis()
789                                                            as u64,
790                                                    );
791                                                }
792                                                if let Some(delta) =
793                                                    Self::merge_stream_text(&mut text_content, rc)
794                                                {
795                                                    let _ = tx
796                                                        .send(StreamEvent::ReasoningDelta(delta))
797                                                        .await;
798                                                }
799                                            }
800
801                                            if let Some(content) = delta.content {
802                                                if first_token_ms.is_none() {
803                                                    first_token_ms = Some(
804                                                        request_started_at.elapsed().as_millis()
805                                                            as u64,
806                                                    );
807                                                }
808                                                if let Some(delta) = Self::merge_stream_text(
809                                                    &mut text_content,
810                                                    &content,
811                                                ) {
812                                                    let _ = tx
813                                                        .send(StreamEvent::TextDelta(delta))
814                                                        .await;
815                                                }
816                                            }
817
818                                            if let Some(tcs) = delta.tool_calls {
819                                                for tc in tcs {
820                                                    let entry = tool_calls
821                                                        .entry(tc.index)
822                                                        .or_insert_with(|| {
823                                                            (
824                                                                String::new(),
825                                                                String::new(),
826                                                                String::new(),
827                                                            )
828                                                        });
829
830                                                    if let Some(id) = tc.id {
831                                                        entry.0 = id;
832                                                    }
833                                                    if let Some(func) = tc.function {
834                                                        if let Some(name) = func.name {
835                                                            if first_token_ms.is_none() {
836                                                                first_token_ms = Some(
837                                                                    request_started_at
838                                                                        .elapsed()
839                                                                        .as_millis()
840                                                                        as u64,
841                                                                );
842                                                            }
843                                                            entry.1 = name.clone();
844                                                            let _ = tx
845                                                                .send(StreamEvent::ToolUseStart {
846                                                                    id: entry.0.clone(),
847                                                                    name,
848                                                                })
849                                                                .await;
850                                                        }
851                                                        if let Some(args) = func.arguments {
852                                                            entry.2.push_str(&args);
853                                                            let _ = tx
854                                                                .send(
855                                                                    StreamEvent::ToolUseInputDelta(
856                                                                        args,
857                                                                    ),
858                                                                )
859                                                                .await;
860                                                        }
861                                                    }
862                                                }
863                                            }
864                                        }
865                                    }
866                                }
867                            }
868                        }
869                    }
870                }
871
872                if saw_done {
873                    return;
874                }
875
876                let trailing = buffer.trim();
877                if !trailing.is_empty() {
878                    if let Ok(event) = serde_json::from_str::<OpenAiStreamChunk>(trailing) {
879                        parsed_any_event = true;
880                        if response_id.is_none() {
881                            response_id = event.id.clone();
882                        }
883                        if response_model.is_none() {
884                            response_model = event.model.clone();
885                        }
886                        if response_object.is_none() {
887                            response_object = event.object.clone();
888                        }
889                        if let Some(u) = event.usage {
890                            usage.prompt_tokens = u.prompt_tokens;
891                            usage.completion_tokens = u.completion_tokens;
892                            usage.total_tokens = u.total_tokens;
893                            usage.cache_read_tokens = u
894                                .prompt_tokens_details
895                                .as_ref()
896                                .and_then(|d| d.cached_tokens);
897                        }
898                        if let Some(choice) = event.choices.into_iter().next() {
899                            if let Some(reason) = choice.finish_reason {
900                                finish_reason = Some(reason);
901                            }
902                            // If text_content already has content (from delta processing),
903                            // skip message.content to avoid sending duplicate full content
904                            let skip_content = !text_content.is_empty();
905                            if let Some(message) = choice.message {
906                                if let Some(reasoning) = message.reasoning_content {
907                                    reasoning_content_accum.push_str(&reasoning);
908                                    if first_token_ms.is_none() {
909                                        first_token_ms =
910                                            Some(request_started_at.elapsed().as_millis() as u64);
911                                    }
912                                    if let Some(delta) =
913                                        Self::merge_stream_text(&mut text_content, &reasoning)
914                                    {
915                                        let _ = tx.send(StreamEvent::ReasoningDelta(delta)).await;
916                                    }
917                                }
918                                if !skip_content {
919                                    if let Some(content) =
920                                        message.content.filter(|value| !value.is_empty())
921                                    {
922                                        if first_token_ms.is_none() {
923                                            first_token_ms = Some(
924                                                request_started_at.elapsed().as_millis() as u64,
925                                            );
926                                        }
927                                        if let Some(delta) =
928                                            Self::merge_stream_text(&mut text_content, &content)
929                                        {
930                                            let _ = tx.send(StreamEvent::TextDelta(delta)).await;
931                                        }
932                                    }
933                                }
934                                if let Some(tcs) = message.tool_calls {
935                                    for (index, tc) in tcs.into_iter().enumerate() {
936                                        tool_calls.insert(
937                                            index,
938                                            (tc.id, tc.function.name, tc.function.arguments),
939                                        );
940                                    }
941                                }
942                            } else if let Some(delta) = choice.delta {
943                                if let Some(ref rc) = delta.reasoning_content {
944                                    reasoning_content_accum.push_str(rc);
945                                    if first_token_ms.is_none() {
946                                        first_token_ms =
947                                            Some(request_started_at.elapsed().as_millis() as u64);
948                                    }
949                                    if let Some(delta) =
950                                        Self::merge_stream_text(&mut text_content, rc)
951                                    {
952                                        let _ = tx.send(StreamEvent::ReasoningDelta(delta)).await;
953                                    }
954                                }
955                                if let Some(content) = delta.content {
956                                    if first_token_ms.is_none() {
957                                        first_token_ms =
958                                            Some(request_started_at.elapsed().as_millis() as u64);
959                                    }
960                                    if let Some(delta) =
961                                        Self::merge_stream_text(&mut text_content, &content)
962                                    {
963                                        let _ = tx.send(StreamEvent::TextDelta(delta)).await;
964                                    }
965                                }
966                            }
967                        }
968                    } else if let Ok(response) = serde_json::from_str::<OpenAiResponse>(trailing) {
969                        parsed_any_event = true;
970                        response_id = response.id.clone();
971                        response_model = response.model.clone();
972                        response_object = response.object.clone();
973                        usage.prompt_tokens = response.usage.prompt_tokens;
974                        usage.completion_tokens = response.usage.completion_tokens;
975                        usage.total_tokens = response.usage.total_tokens;
976                        // MiniMax: fall back to total_characters when total_tokens is 0.
977                        if usage.total_tokens == 0 {
978                            usage.total_tokens = response.usage.total_characters.unwrap_or(0);
979                        }
980                        usage.cache_read_tokens = response
981                            .usage
982                            .prompt_tokens_details
983                            .as_ref()
984                            .and_then(|d| d.cached_tokens);
985
986                        if let Some(choice) = response.choices.into_iter().next() {
987                            finish_reason = choice.finish_reason;
988                            if let Some(text) =
989                                choice.message.content.filter(|text| !text.is_empty())
990                            {
991                                if first_token_ms.is_none() {
992                                    first_token_ms =
993                                        Some(request_started_at.elapsed().as_millis() as u64);
994                                }
995                                let _ = Self::merge_stream_text(&mut text_content, &text);
996                            }
997                            if let Some(reasoning) = choice.message.reasoning_content {
998                                reasoning_content_accum.push_str(&reasoning);
999                            }
1000                            if let Some(final_tool_calls) = choice.message.tool_calls {
1001                                for tc in final_tool_calls {
1002                                    tool_calls.insert(
1003                                        tool_calls.len(),
1004                                        (tc.id, tc.function.name, tc.function.arguments),
1005                                    );
1006                                }
1007                            }
1008                        }
1009                    }
1010                }
1011
1012                if parsed_any_event
1013                    || !text_content.is_empty()
1014                    || !tool_calls.is_empty()
1015                    || !content_blocks.is_empty()
1016                {
1017                    tracing::warn!(
1018                        provider = %provider_name,
1019                        model = %request_model,
1020                        "OpenAI-compatible stream ended without [DONE]; finalizing buffered response"
1021                    );
1022                    if !text_content.is_empty() {
1023                        content_blocks.push(ContentBlock::Text {
1024                            text: text_content.clone(),
1025                        });
1026                    }
1027                    for (_, (id, name, args)) in tool_calls.iter() {
1028                        content_blocks.push(ContentBlock::ToolUse {
1029                            id: id.clone(),
1030                            name: name.clone(),
1031                            input: Self::parse_tool_arguments(name, args),
1032                        });
1033                    }
1034                    tool_calls.clear();
1035                    crate::telemetry::record_llm_usage(
1036                        usage.prompt_tokens,
1037                        usage.completion_tokens,
1038                        usage.total_tokens,
1039                        finish_reason.as_deref(),
1040                    );
1041                    let response = LlmResponse {
1042                        message: Message {
1043                            role: "assistant".to_string(),
1044                            content: std::mem::take(&mut content_blocks),
1045                            reasoning_content: if reasoning_content_accum.is_empty() {
1046                                None
1047                            } else {
1048                                Some(std::mem::take(&mut reasoning_content_accum))
1049                            },
1050                        },
1051                        usage: usage.clone(),
1052                        stop_reason: std::mem::take(&mut finish_reason),
1053                        meta: Some(LlmResponseMeta {
1054                            provider: Some(provider_name.clone()),
1055                            request_model: Some(request_model.clone()),
1056                            request_url: Some(request_url.clone()),
1057                            response_id: response_id.clone(),
1058                            response_model: response_model.clone(),
1059                            response_object: response_object.clone(),
1060                            first_token_ms,
1061                            duration_ms: Some(request_started_at.elapsed().as_millis() as u64),
1062                        }),
1063                    };
1064                    let _ = tx.send(StreamEvent::Done(response)).await;
1065                } else {
1066                    tracing::warn!(
1067                        provider = %provider_name,
1068                        model = %request_model,
1069                        trailing = %trailing.chars().take(400).collect::<String>(),
1070                        "OpenAI-compatible stream ended without any parseable events"
1071                    );
1072                }
1073            });
1074
1075            Ok(rx)
1076        }
1077    }
1078}
1079
1080// OpenAI API response types (private)
1081#[derive(Debug, Deserialize)]
1082pub(crate) struct OpenAiResponse {
1083    #[serde(default)]
1084    pub(crate) id: Option<String>,
1085    #[serde(default)]
1086    pub(crate) object: Option<String>,
1087    #[serde(default)]
1088    pub(crate) model: Option<String>,
1089    pub(crate) choices: Vec<OpenAiChoice>,
1090    pub(crate) usage: OpenAiUsage,
1091}
1092
1093#[derive(Debug, Deserialize)]
1094pub(crate) struct OpenAiChoice {
1095    pub(crate) message: OpenAiMessage,
1096    pub(crate) finish_reason: Option<String>,
1097}
1098
1099#[derive(Debug, Deserialize)]
1100pub(crate) struct OpenAiMessage {
1101    pub(crate) reasoning_content: Option<String>,
1102    pub(crate) content: Option<String>,
1103    pub(crate) tool_calls: Option<Vec<OpenAiToolCall>>,
1104}
1105
1106#[derive(Debug, Deserialize)]
1107pub(crate) struct OpenAiToolCall {
1108    pub(crate) id: String,
1109    pub(crate) function: OpenAiFunction,
1110}
1111
1112#[derive(Debug, Deserialize)]
1113pub(crate) struct OpenAiFunction {
1114    pub(crate) name: String,
1115    pub(crate) arguments: String,
1116}
1117
1118#[derive(Debug, Deserialize)]
1119pub(crate) struct OpenAiUsage {
1120    #[serde(default)]
1121    pub(crate) prompt_tokens: usize,
1122    #[serde(default)]
1123    pub(crate) completion_tokens: usize,
1124    #[serde(default)]
1125    pub(crate) total_tokens: usize,
1126    /// MiniMax uses `total_characters` instead of token counts.
1127    #[serde(default)]
1128    pub(crate) total_characters: Option<usize>,
1129    /// OpenAI returns cached token count in `prompt_tokens_details.cached_tokens`
1130    #[serde(default)]
1131    pub(crate) prompt_tokens_details: Option<OpenAiPromptTokensDetails>,
1132}
1133
1134#[derive(Debug, Deserialize)]
1135pub(crate) struct OpenAiPromptTokensDetails {
1136    #[serde(default)]
1137    pub(crate) cached_tokens: Option<usize>,
1138}
1139
1140// OpenAI streaming types
1141#[derive(Debug, Deserialize)]
1142pub(crate) struct OpenAiStreamChunk {
1143    #[serde(default)]
1144    pub(crate) id: Option<String>,
1145    #[serde(default)]
1146    pub(crate) object: Option<String>,
1147    #[serde(default)]
1148    pub(crate) model: Option<String>,
1149    pub(crate) choices: Vec<OpenAiStreamChoice>,
1150    pub(crate) usage: Option<OpenAiUsage>,
1151}
1152
1153#[derive(Debug, Deserialize)]
1154pub(crate) struct OpenAiStreamChoice {
1155    pub(crate) message: Option<OpenAiMessage>,
1156    pub(crate) delta: Option<OpenAiDelta>,
1157    pub(crate) finish_reason: Option<String>,
1158}
1159
1160#[derive(Debug, Deserialize)]
1161pub(crate) struct OpenAiDelta {
1162    pub(crate) reasoning_content: Option<String>,
1163    pub(crate) content: Option<String>,
1164    pub(crate) tool_calls: Option<Vec<OpenAiToolCallDelta>>,
1165}
1166
1167#[derive(Debug, Deserialize)]
1168pub(crate) struct OpenAiToolCallDelta {
1169    pub(crate) index: usize,
1170    pub(crate) id: Option<String>,
1171    pub(crate) function: Option<OpenAiFunctionDelta>,
1172}
1173
1174#[derive(Debug, Deserialize)]
1175pub(crate) struct OpenAiFunctionDelta {
1176    pub(crate) name: Option<String>,
1177    pub(crate) arguments: Option<String>,
1178}
1179
1180// ============================================================================
1181// Tests
1182// ============================================================================
1183
1184#[cfg(test)]
1185mod tests {
1186    use super::*;
1187    use crate::llm::types::{Message, ToolDefinition};
1188
1189    fn make_client() -> OpenAiClient {
1190        OpenAiClient::new("test-key".to_string(), "gpt-test".to_string())
1191    }
1192
1193    #[test]
1194    fn test_apply_directive_forced_function_tool_choice() {
1195        let mut req = serde_json::json!({ "model": "m" });
1196        OpenAiClient::apply_directive(
1197            &mut req,
1198            &structured::StructuredDirective {
1199                force_tool: Some("emit_person".to_string()),
1200                response_format: None,
1201            },
1202        );
1203        assert_eq!(req["tool_choice"]["type"], "function");
1204        assert_eq!(req["tool_choice"]["function"]["name"], "emit_person");
1205        assert!(req.get("response_format").is_none());
1206    }
1207
1208    #[test]
1209    fn test_apply_directive_json_schema_strict() {
1210        let mut req = serde_json::json!({});
1211        OpenAiClient::apply_directive(
1212            &mut req,
1213            &structured::StructuredDirective {
1214                force_tool: None,
1215                response_format: Some(structured::ResponseFormat::JsonSchema {
1216                    name: "person".to_string(),
1217                    schema: serde_json::json!({ "type": "object" }),
1218                }),
1219            },
1220        );
1221        assert_eq!(req["response_format"]["type"], "json_schema");
1222        assert_eq!(req["response_format"]["json_schema"]["name"], "person");
1223        assert_eq!(req["response_format"]["json_schema"]["strict"], true);
1224        assert!(req.get("tool_choice").is_none());
1225    }
1226
1227    #[test]
1228    fn test_apply_directive_json_object() {
1229        let mut req = serde_json::json!({});
1230        OpenAiClient::apply_directive(
1231            &mut req,
1232            &structured::StructuredDirective {
1233                force_tool: None,
1234                response_format: Some(structured::ResponseFormat::JsonObject),
1235            },
1236        );
1237        assert_eq!(req["response_format"]["type"], "json_object");
1238    }
1239
1240    #[test]
1241    fn test_build_chat_request_applies_directive_and_system() {
1242        let req = make_client().build_chat_request(
1243            &[Message::user("hi")],
1244            Some("sys"),
1245            &[ToolDefinition {
1246                name: "emit_x".to_string(),
1247                description: "emit".to_string(),
1248                parameters: serde_json::json!({ "type": "object" }),
1249            }],
1250            Some(&structured::StructuredDirective {
1251                force_tool: Some("emit_x".to_string()),
1252                response_format: None,
1253            }),
1254        );
1255        assert_eq!(req["messages"][0]["role"], "system");
1256        assert_eq!(req["tool_choice"]["function"]["name"], "emit_x");
1257        assert_eq!(req["tools"][0]["function"]["name"], "emit_x");
1258    }
1259
1260    #[test]
1261    fn test_build_chat_request_without_directive_is_plain() {
1262        let req = make_client().build_chat_request(&[Message::user("hi")], None, &[], None);
1263        assert!(req.get("tool_choice").is_none());
1264        assert!(req.get("response_format").is_none());
1265    }
1266
1267    #[test]
1268    fn test_native_structured_support_is_json_schema() {
1269        assert_eq!(
1270            make_client().native_structured_support(),
1271            structured::NativeStructuredSupport::JsonSchema
1272        );
1273    }
1274}