1use super::http::{default_http_client, normalize_base_url, HttpClient};
4use super::types::*;
5use super::LlmClient;
6use crate::llm::types::{ToolResultContent, ToolResultContentField};
7use crate::retry::{AttemptOutcome, RetryConfig};
8use anyhow::{Context, Result};
9use async_trait::async_trait;
10use futures::StreamExt;
11use serde::Deserialize;
12use std::collections::HashMap;
13use std::sync::Arc;
14use std::time::Instant;
15use tokio::sync::mpsc;
16
17pub struct OpenAiClient {
19 pub(crate) provider_name: String,
20 pub(crate) api_key: SecretString,
21 pub(crate) model: String,
22 pub(crate) base_url: String,
23 pub(crate) chat_completions_path: String,
24 pub(crate) headers: HashMap<String, String>,
25 pub(crate) temperature: Option<f32>,
26 pub(crate) max_tokens: Option<usize>,
27 pub(crate) http: Arc<dyn HttpClient>,
28 pub(crate) retry_config: RetryConfig,
29}
30
31impl OpenAiClient {
32 pub(crate) fn parse_tool_arguments(tool_name: &str, arguments: &str) -> serde_json::Value {
33 if arguments.trim().is_empty() {
34 return serde_json::Value::Object(Default::default());
35 }
36
37 serde_json::from_str(arguments).unwrap_or_else(|e| {
38 tracing::warn!(
39 "Failed to parse tool arguments JSON for tool '{}': {}",
40 tool_name,
41 e
42 );
43 serde_json::json!({
44 "__parse_error": format!(
45 "Malformed tool arguments: {}. Raw input: {}",
46 e, arguments
47 )
48 })
49 })
50 }
51
52 fn merge_stream_text(text_content: &mut String, incoming: &str) -> Option<String> {
53 if incoming.is_empty() {
54 return None;
55 }
56 if text_content.is_empty() {
57 text_content.push_str(incoming);
58 return Some(incoming.to_string());
59 }
60 if incoming == text_content.as_str() || text_content.ends_with(incoming) {
61 return None;
62 }
63 if let Some(suffix) = incoming.strip_prefix(text_content.as_str()) {
64 if suffix.is_empty() {
65 return None;
66 }
67 text_content.push_str(suffix);
68 return Some(suffix.to_string());
69 }
70 text_content.push_str(incoming);
71 Some(incoming.to_string())
72 }
73
74 pub fn new(api_key: String, model: String) -> Self {
75 Self {
76 provider_name: "openai".to_string(),
77 api_key: SecretString::new(api_key),
78 model,
79 base_url: "https://api.openai.com".to_string(),
80 chat_completions_path: "/v1/chat/completions".to_string(),
81 headers: HashMap::new(),
82 temperature: None,
83 max_tokens: None,
84 http: default_http_client(),
85 retry_config: RetryConfig::default(),
86 }
87 }
88
89 pub fn with_base_url(mut self, base_url: String) -> Self {
90 self.base_url = normalize_base_url(&base_url);
91 self
92 }
93
94 pub fn with_provider_name(mut self, provider_name: impl Into<String>) -> Self {
95 self.provider_name = provider_name.into();
96 self
97 }
98
99 pub fn with_chat_completions_path(mut self, path: impl Into<String>) -> Self {
100 let path = path.into();
101 self.chat_completions_path = if path.starts_with('/') {
102 path
103 } else {
104 format!("/{}", path)
105 };
106 self
107 }
108
109 pub fn with_temperature(mut self, temperature: f32) -> Self {
110 self.temperature = Some(temperature);
111 self
112 }
113
114 pub fn with_headers(mut self, headers: HashMap<String, String>) -> Self {
115 self.headers = headers;
116 self
117 }
118
119 pub fn with_max_tokens(mut self, max_tokens: usize) -> Self {
120 self.max_tokens = Some(max_tokens);
121 self
122 }
123
124 pub fn with_retry_config(mut self, retry_config: RetryConfig) -> Self {
125 self.retry_config = retry_config;
126 self
127 }
128
129 pub fn with_http_client(mut self, http: Arc<dyn HttpClient>) -> Self {
130 self.http = http;
131 self
132 }
133
134 pub(crate) fn request_headers(&self) -> Vec<(String, String)> {
135 let mut headers = Vec::with_capacity(self.headers.len() + 1);
136 let has_authorization = self
137 .headers
138 .keys()
139 .any(|key| key.eq_ignore_ascii_case("authorization"));
140 if !has_authorization {
141 headers.push((
142 "Authorization".to_string(),
143 format!("Bearer {}", self.api_key.expose()),
144 ));
145 }
146 headers.extend(
147 self.headers
148 .iter()
149 .map(|(key, value)| (key.clone(), value.clone())),
150 );
151 headers
152 }
153
154 pub(crate) fn convert_messages(&self, messages: &[Message]) -> Vec<serde_json::Value> {
155 messages
156 .iter()
157 .map(|msg| {
158 let content: serde_json::Value = if msg.content.len() == 1 {
159 match &msg.content[0] {
160 ContentBlock::Text { text } => serde_json::json!(text),
161 ContentBlock::ToolResult {
162 tool_use_id,
163 content,
164 ..
165 } => {
166 let content_str = match content {
167 ToolResultContentField::Text(s) => s.clone(),
168 ToolResultContentField::Blocks(blocks) => blocks
169 .iter()
170 .filter_map(|b| {
171 if let ToolResultContent::Text { text } = b {
172 Some(text.clone())
173 } else {
174 None
175 }
176 })
177 .collect::<Vec<_>>()
178 .join("\n"),
179 };
180 return serde_json::json!({
181 "role": "tool",
182 "tool_call_id": tool_use_id,
183 "content": content_str,
184 });
185 }
186 _ => serde_json::json!(""),
187 }
188 } else {
189 serde_json::json!(msg
190 .content
191 .iter()
192 .map(|block| {
193 match block {
194 ContentBlock::Text { text } => serde_json::json!({
195 "type": "text",
196 "text": text,
197 }),
198 ContentBlock::Image { source } => serde_json::json!({
199 "type": "image_url",
200 "image_url": {
201 "url": format!(
202 "data:{};base64,{}",
203 source.media_type, source.data
204 ),
205 }
206 }),
207 ContentBlock::ToolUse { id, name, input } => serde_json::json!({
208 "type": "function",
209 "id": id,
210 "function": {
211 "name": name,
212 "arguments": input.to_string(),
213 }
214 }),
215 _ => serde_json::json!({}),
216 }
217 })
218 .collect::<Vec<_>>())
219 };
220
221 if msg.role == "assistant" {
224 let rc = msg.reasoning_content.as_deref().unwrap_or("");
225 let tool_calls: Vec<_> = msg.tool_calls();
226 if !tool_calls.is_empty() {
227 return serde_json::json!({
228 "role": "assistant",
229 "content": msg.text(),
230 "reasoning_content": rc,
231 "tool_calls": tool_calls.iter().map(|tc| {
232 serde_json::json!({
233 "id": tc.id,
234 "type": "function",
235 "function": {
236 "name": tc.name,
237 "arguments": tc.args.to_string(),
238 }
239 })
240 }).collect::<Vec<_>>(),
241 });
242 }
243 return serde_json::json!({
244 "role": "assistant",
245 "content": content,
246 "reasoning_content": rc,
247 });
248 }
249
250 serde_json::json!({
251 "role": msg.role,
252 "content": content,
253 })
254 })
255 .collect()
256 }
257
258 pub(crate) fn convert_tools(&self, tools: &[ToolDefinition]) -> Vec<serde_json::Value> {
259 tools
260 .iter()
261 .map(|t| {
262 serde_json::json!({
263 "type": "function",
264 "function": {
265 "name": t.name,
266 "description": t.description,
267 "parameters": t.parameters,
268 }
269 })
270 })
271 .collect()
272 }
273}
274
275#[async_trait]
276impl LlmClient for OpenAiClient {
277 async fn complete(
278 &self,
279 messages: &[Message],
280 system: Option<&str>,
281 tools: &[ToolDefinition],
282 ) -> Result<LlmResponse> {
283 {
284 let request_started_at = Instant::now();
285 let mut openai_messages = Vec::new();
286
287 if let Some(sys) = system {
288 openai_messages.push(serde_json::json!({
289 "role": "system",
290 "content": sys,
291 }));
292 }
293
294 openai_messages.extend(self.convert_messages(messages));
295
296 let mut request = serde_json::json!({
297 "model": self.model,
298 "messages": openai_messages,
299 });
300
301 if let Some(temp) = self.temperature {
302 request["temperature"] = serde_json::json!(temp);
303 }
304 if let Some(max) = self.max_tokens {
305 request["max_tokens"] = serde_json::json!(max);
306 }
307
308 if !tools.is_empty() {
309 request["tools"] = serde_json::json!(self.convert_tools(tools));
310 }
311
312 let url = format!("{}{}", self.base_url, self.chat_completions_path);
313 let request_headers = self.request_headers();
314
315 let response = crate::retry::with_retry(&self.retry_config, |_attempt| {
316 let http = &self.http;
317 let url = &url;
318 let request_headers = request_headers.clone();
319 let request = &request;
320 async move {
321 let headers = request_headers
322 .iter()
323 .map(|(key, value)| (key.as_str(), value.as_str()))
324 .collect::<Vec<_>>();
325 let cancel_token = tokio_util::sync::CancellationToken::new();
327 match http.post(url, headers, request, cancel_token).await {
328 Ok(resp) => {
329 let status = reqwest::StatusCode::from_u16(resp.status)
330 .unwrap_or(reqwest::StatusCode::INTERNAL_SERVER_ERROR);
331 if status.is_success() {
332 AttemptOutcome::Success(resp.body)
333 } else if self.retry_config.is_retryable_status(status) {
334 AttemptOutcome::Retryable {
335 status,
336 body: resp.body,
337 retry_after: None,
338 }
339 } else {
340 AttemptOutcome::Fatal(anyhow::anyhow!(
341 "OpenAI API error at {} ({}): {}",
342 url,
343 status,
344 resp.body
345 ))
346 }
347 }
348 Err(e) => {
349 eprintln!("[DEBUG] HTTP error: {:?}", e);
350 AttemptOutcome::Fatal(e)
351 }
352 }
353 }
354 })
355 .await?;
356
357 let parsed: OpenAiResponse =
358 serde_json::from_str(&response).context("Failed to parse OpenAI response")?;
359
360 let choice = parsed.choices.into_iter().next().context("No choices")?;
361
362 let mut content = vec![];
363
364 let reasoning_content = choice.message.reasoning_content;
365
366 let text_content = choice.message.content;
367
368 if let Some(text) = text_content {
369 if !text.is_empty() {
370 content.push(ContentBlock::Text { text });
371 }
372 }
373
374 if let Some(tool_calls) = choice.message.tool_calls {
375 for tc in tool_calls {
376 content.push(ContentBlock::ToolUse {
377 id: tc.id,
378 name: tc.function.name.clone(),
379 input: Self::parse_tool_arguments(
380 &tc.function.name,
381 &tc.function.arguments,
382 ),
383 });
384 }
385 }
386
387 let llm_response = LlmResponse {
388 message: Message {
389 role: "assistant".to_string(),
390 content,
391 reasoning_content,
392 },
393 usage: TokenUsage {
394 prompt_tokens: parsed.usage.prompt_tokens,
395 completion_tokens: parsed.usage.completion_tokens,
396 total_tokens: parsed.usage.total_tokens,
397 cache_read_tokens: parsed
398 .usage
399 .prompt_tokens_details
400 .as_ref()
401 .and_then(|d| d.cached_tokens),
402 cache_write_tokens: None,
403 },
404 stop_reason: choice.finish_reason,
405 meta: Some(LlmResponseMeta {
406 provider: Some(self.provider_name.clone()),
407 request_model: Some(self.model.clone()),
408 request_url: Some(url.clone()),
409 response_id: parsed.id,
410 response_model: parsed.model,
411 response_object: parsed.object,
412 first_token_ms: None,
413 duration_ms: Some(request_started_at.elapsed().as_millis() as u64),
414 }),
415 };
416
417 crate::telemetry::record_llm_usage(
418 llm_response.usage.prompt_tokens,
419 llm_response.usage.completion_tokens,
420 llm_response.usage.total_tokens,
421 llm_response.stop_reason.as_deref(),
422 );
423
424 Ok(llm_response)
425 }
426 }
427
428 async fn complete_streaming(
429 &self,
430 messages: &[Message],
431 system: Option<&str>,
432 tools: &[ToolDefinition],
433 cancel_token: tokio_util::sync::CancellationToken,
434 ) -> Result<mpsc::Receiver<StreamEvent>> {
435 {
436 let request_started_at = Instant::now();
437 let mut openai_messages = Vec::new();
438
439 if let Some(sys) = system {
440 openai_messages.push(serde_json::json!({
441 "role": "system",
442 "content": sys,
443 }));
444 }
445
446 openai_messages.extend(self.convert_messages(messages));
447
448 let mut request = serde_json::json!({
449 "model": self.model,
450 "messages": openai_messages,
451 "stream": true,
452 "stream_options": { "include_usage": true },
453 });
454
455 if let Some(temp) = self.temperature {
456 request["temperature"] = serde_json::json!(temp);
457 }
458 if let Some(max) = self.max_tokens {
459 request["max_tokens"] = serde_json::json!(max);
460 }
461
462 if !tools.is_empty() {
463 request["tools"] = serde_json::json!(self.convert_tools(tools));
464 }
465
466 let url = format!("{}{}", self.base_url, self.chat_completions_path);
467 let request_headers = self.request_headers();
468
469 let streaming_resp = crate::retry::with_retry(&self.retry_config, |_attempt| {
470 let http = &self.http;
471 let url = &url;
472 let request_headers = request_headers.clone();
473 let request = &request;
474 let cancel_token = cancel_token.clone();
475 async move {
476 let headers = request_headers
477 .iter()
478 .map(|(key, value)| (key.as_str(), value.as_str()))
479 .collect::<Vec<_>>();
480 let resp = tokio::select! {
482 _ = cancel_token.cancelled() => {
483 return AttemptOutcome::Fatal(anyhow::anyhow!("HTTP request cancelled"));
484 }
485 result = http.post_streaming(url, headers, request, cancel_token.clone()) => {
486 match result {
487 Ok(r) => r,
488 Err(e) => {
489 return AttemptOutcome::Fatal(anyhow::anyhow!("HTTP request failed: {}", e));
490 }
491 }
492 }
493 };
494 let status = reqwest::StatusCode::from_u16(resp.status)
495 .unwrap_or(reqwest::StatusCode::INTERNAL_SERVER_ERROR);
496 if status.is_success() {
497 AttemptOutcome::Success(resp)
498 } else {
499 let retry_after = resp
500 .retry_after
501 .as_deref()
502 .and_then(|v| RetryConfig::parse_retry_after(Some(v)));
503 if self.retry_config.is_retryable_status(status) {
504 AttemptOutcome::Retryable {
505 status,
506 body: resp.error_body,
507 retry_after,
508 }
509 } else {
510 AttemptOutcome::Fatal(anyhow::anyhow!(
511 "OpenAI API error at {} ({}): {}",
512 url,
513 status,
514 resp.error_body
515 ))
516 }
517 }
518 }
519 })
520 .await?;
521
522 let (tx, rx) = mpsc::channel(100);
523
524 let mut stream = streaming_resp.byte_stream;
525 let provider_name = self.provider_name.clone();
526 let request_model = self.model.clone();
527 let request_url = url.clone();
528 tokio::spawn(async move {
529 let mut buffer = String::new();
530 let mut content_blocks: Vec<ContentBlock> = Vec::new();
531 let mut text_content = String::new();
532 let mut reasoning_content_accum = String::new();
533 let mut tool_calls: std::collections::BTreeMap<usize, (String, String, String)> =
534 std::collections::BTreeMap::new();
535 let mut usage = TokenUsage::default();
536 let mut finish_reason = None;
537 let mut response_id = None;
538 let mut response_model = None;
539 let mut response_object = None;
540 let mut first_token_ms = None;
541 let mut saw_done = false;
542 let mut parsed_any_event = false;
543
544 while let Some(chunk_result) = stream.next().await {
545 let chunk = match chunk_result {
546 Ok(c) => c,
547 Err(e) => {
548 tracing::error!("Stream error: {}", e);
549 break;
550 }
551 };
552
553 buffer.push_str(&String::from_utf8_lossy(&chunk));
554
555 while let Some(event_end) = buffer.find("\n\n") {
556 let event_data: String = buffer.drain(..event_end).collect();
557 buffer.drain(..2);
558
559 for line in event_data.lines() {
560 if let Some(data) = line.strip_prefix("data: ") {
561 if data == "[DONE]" {
562 saw_done = true;
563 if !text_content.is_empty() {
564 content_blocks.push(ContentBlock::Text {
565 text: text_content.clone(),
566 });
567 }
568 for (_, (id, name, args)) in tool_calls.iter() {
569 content_blocks.push(ContentBlock::ToolUse {
570 id: id.clone(),
571 name: name.clone(),
572 input: Self::parse_tool_arguments(name, args),
573 });
574 }
575 tool_calls.clear();
576 crate::telemetry::record_llm_usage(
577 usage.prompt_tokens,
578 usage.completion_tokens,
579 usage.total_tokens,
580 finish_reason.as_deref(),
581 );
582 let response = LlmResponse {
583 message: Message {
584 role: "assistant".to_string(),
585 content: std::mem::take(&mut content_blocks),
586 reasoning_content: if reasoning_content_accum.is_empty()
587 {
588 None
589 } else {
590 Some(std::mem::take(&mut reasoning_content_accum))
591 },
592 },
593 usage: usage.clone(),
594 stop_reason: std::mem::take(&mut finish_reason),
595 meta: Some(LlmResponseMeta {
596 provider: Some(provider_name.clone()),
597 request_model: Some(request_model.clone()),
598 request_url: Some(request_url.clone()),
599 response_id: response_id.clone(),
600 response_model: response_model.clone(),
601 response_object: response_object.clone(),
602 first_token_ms,
603 duration_ms: Some(
604 request_started_at.elapsed().as_millis() as u64,
605 ),
606 }),
607 };
608 let _ = tx.send(StreamEvent::Done(response)).await;
609 continue;
610 }
611
612 if let Ok(event) = serde_json::from_str::<OpenAiStreamChunk>(data) {
613 parsed_any_event = true;
614 if response_id.is_none() {
615 response_id = event.id.clone();
616 }
617 if response_model.is_none() {
618 response_model = event.model.clone();
619 }
620 if response_object.is_none() {
621 response_object = event.object.clone();
622 }
623 if let Some(u) = event.usage {
624 usage.prompt_tokens = u.prompt_tokens;
625 usage.completion_tokens = u.completion_tokens;
626 usage.total_tokens = u.total_tokens;
627 usage.cache_read_tokens = u
628 .prompt_tokens_details
629 .as_ref()
630 .and_then(|d| d.cached_tokens);
631 }
632
633 if let Some(choice) = event.choices.into_iter().next() {
634 if let Some(reason) = choice.finish_reason {
635 finish_reason = Some(reason);
636 }
637
638 let delta_content = choice
639 .delta
640 .as_ref()
641 .and_then(|delta| delta.content.clone());
642
643 if let Some(message) = choice.message {
644 if let Some(reasoning) = message.reasoning_content {
645 reasoning_content_accum.push_str(&reasoning);
646 }
647 if delta_content.is_none() {
648 if let Some(content) = message
649 .content
650 .filter(|value| !value.is_empty())
651 {
652 if first_token_ms.is_none() {
653 first_token_ms = Some(
654 request_started_at.elapsed().as_millis()
655 as u64,
656 );
657 }
658 if let Some(delta) = Self::merge_stream_text(
659 &mut text_content,
660 &content,
661 ) {
662 let _ = tx
663 .send(StreamEvent::TextDelta(delta))
664 .await;
665 }
666 }
667 }
668 if let Some(tcs) = message.tool_calls {
669 for (index, tc) in tcs.into_iter().enumerate() {
670 tool_calls.insert(
671 index,
672 (
673 tc.id,
674 tc.function.name,
675 tc.function.arguments,
676 ),
677 );
678 }
679 }
680 }
681
682 if let Some(delta) = choice.delta {
683 if let Some(ref rc) = delta.reasoning_content {
684 reasoning_content_accum.push_str(rc);
685 }
686
687 if let Some(content) = delta.content {
688 if first_token_ms.is_none() {
689 first_token_ms = Some(
690 request_started_at.elapsed().as_millis()
691 as u64,
692 );
693 }
694 if let Some(delta) = Self::merge_stream_text(
695 &mut text_content,
696 &content,
697 ) {
698 let _ = tx
699 .send(StreamEvent::TextDelta(delta))
700 .await;
701 }
702 }
703
704 if let Some(tcs) = delta.tool_calls {
705 for tc in tcs {
706 let entry = tool_calls
707 .entry(tc.index)
708 .or_insert_with(|| {
709 (
710 String::new(),
711 String::new(),
712 String::new(),
713 )
714 });
715
716 if let Some(id) = tc.id {
717 entry.0 = id;
718 }
719 if let Some(func) = tc.function {
720 if let Some(name) = func.name {
721 if first_token_ms.is_none() {
722 first_token_ms = Some(
723 request_started_at
724 .elapsed()
725 .as_millis()
726 as u64,
727 );
728 }
729 entry.1 = name.clone();
730 let _ = tx
731 .send(StreamEvent::ToolUseStart {
732 id: entry.0.clone(),
733 name,
734 })
735 .await;
736 }
737 if let Some(args) = func.arguments {
738 entry.2.push_str(&args);
739 let _ = tx
740 .send(
741 StreamEvent::ToolUseInputDelta(
742 args,
743 ),
744 )
745 .await;
746 }
747 }
748 }
749 }
750 }
751 }
752 }
753 }
754 }
755 }
756 }
757
758 if saw_done {
759 return;
760 }
761
762 let trailing = buffer.trim();
763 if !trailing.is_empty() {
764 if let Ok(event) = serde_json::from_str::<OpenAiStreamChunk>(trailing) {
765 parsed_any_event = true;
766 if response_id.is_none() {
767 response_id = event.id.clone();
768 }
769 if response_model.is_none() {
770 response_model = event.model.clone();
771 }
772 if response_object.is_none() {
773 response_object = event.object.clone();
774 }
775 if let Some(u) = event.usage {
776 usage.prompt_tokens = u.prompt_tokens;
777 usage.completion_tokens = u.completion_tokens;
778 usage.total_tokens = u.total_tokens;
779 usage.cache_read_tokens = u
780 .prompt_tokens_details
781 .as_ref()
782 .and_then(|d| d.cached_tokens);
783 }
784 if let Some(choice) = event.choices.into_iter().next() {
785 if let Some(reason) = choice.finish_reason {
786 finish_reason = Some(reason);
787 }
788 let delta_content = choice
789 .delta
790 .as_ref()
791 .and_then(|delta| delta.content.clone());
792 if let Some(message) = choice.message {
793 if let Some(reasoning) = message.reasoning_content {
794 reasoning_content_accum.push_str(&reasoning);
795 }
796 if delta_content.is_none() {
797 if let Some(content) =
798 message.content.filter(|value| !value.is_empty())
799 {
800 if first_token_ms.is_none() {
801 first_token_ms = Some(
802 request_started_at.elapsed().as_millis() as u64,
803 );
804 }
805 if let Some(delta) =
806 Self::merge_stream_text(&mut text_content, &content)
807 {
808 let _ = tx.send(StreamEvent::TextDelta(delta)).await;
809 }
810 }
811 }
812 if let Some(tcs) = message.tool_calls {
813 for (index, tc) in tcs.into_iter().enumerate() {
814 tool_calls.insert(
815 index,
816 (tc.id, tc.function.name, tc.function.arguments),
817 );
818 }
819 }
820 }
821 if let Some(delta) = choice.delta {
822 if let Some(ref rc) = delta.reasoning_content {
823 reasoning_content_accum.push_str(rc);
824 }
825 if let Some(content) = delta.content {
826 if first_token_ms.is_none() {
827 first_token_ms =
828 Some(request_started_at.elapsed().as_millis() as u64);
829 }
830 if let Some(delta) =
831 Self::merge_stream_text(&mut text_content, &content)
832 {
833 let _ = tx.send(StreamEvent::TextDelta(delta)).await;
834 }
835 }
836 }
837 }
838 } else if let Ok(response) = serde_json::from_str::<OpenAiResponse>(trailing) {
839 parsed_any_event = true;
840 response_id = response.id.clone();
841 response_model = response.model.clone();
842 response_object = response.object.clone();
843 usage.prompt_tokens = response.usage.prompt_tokens;
844 usage.completion_tokens = response.usage.completion_tokens;
845 usage.total_tokens = response.usage.total_tokens;
846 usage.cache_read_tokens = response
847 .usage
848 .prompt_tokens_details
849 .as_ref()
850 .and_then(|d| d.cached_tokens);
851
852 if let Some(choice) = response.choices.into_iter().next() {
853 finish_reason = choice.finish_reason;
854 if let Some(text) =
855 choice.message.content.filter(|text| !text.is_empty())
856 {
857 if first_token_ms.is_none() {
858 first_token_ms =
859 Some(request_started_at.elapsed().as_millis() as u64);
860 }
861 let _ = Self::merge_stream_text(&mut text_content, &text);
862 }
863 if let Some(reasoning) = choice.message.reasoning_content {
864 reasoning_content_accum.push_str(&reasoning);
865 }
866 if let Some(final_tool_calls) = choice.message.tool_calls {
867 for tc in final_tool_calls {
868 tool_calls.insert(
869 tool_calls.len(),
870 (tc.id, tc.function.name, tc.function.arguments),
871 );
872 }
873 }
874 }
875 }
876 }
877
878 if parsed_any_event
879 || !text_content.is_empty()
880 || !tool_calls.is_empty()
881 || !content_blocks.is_empty()
882 {
883 tracing::warn!(
884 provider = %provider_name,
885 model = %request_model,
886 "OpenAI-compatible stream ended without [DONE]; finalizing buffered response"
887 );
888 if !text_content.is_empty() {
889 content_blocks.push(ContentBlock::Text {
890 text: text_content.clone(),
891 });
892 }
893 for (_, (id, name, args)) in tool_calls.iter() {
894 content_blocks.push(ContentBlock::ToolUse {
895 id: id.clone(),
896 name: name.clone(),
897 input: Self::parse_tool_arguments(name, args),
898 });
899 }
900 tool_calls.clear();
901 crate::telemetry::record_llm_usage(
902 usage.prompt_tokens,
903 usage.completion_tokens,
904 usage.total_tokens,
905 finish_reason.as_deref(),
906 );
907 let response = LlmResponse {
908 message: Message {
909 role: "assistant".to_string(),
910 content: std::mem::take(&mut content_blocks),
911 reasoning_content: if reasoning_content_accum.is_empty() {
912 None
913 } else {
914 Some(std::mem::take(&mut reasoning_content_accum))
915 },
916 },
917 usage: usage.clone(),
918 stop_reason: std::mem::take(&mut finish_reason),
919 meta: Some(LlmResponseMeta {
920 provider: Some(provider_name.clone()),
921 request_model: Some(request_model.clone()),
922 request_url: Some(request_url.clone()),
923 response_id: response_id.clone(),
924 response_model: response_model.clone(),
925 response_object: response_object.clone(),
926 first_token_ms,
927 duration_ms: Some(request_started_at.elapsed().as_millis() as u64),
928 }),
929 };
930 let _ = tx.send(StreamEvent::Done(response)).await;
931 } else {
932 tracing::warn!(
933 provider = %provider_name,
934 model = %request_model,
935 trailing = %trailing.chars().take(400).collect::<String>(),
936 "OpenAI-compatible stream ended without any parseable events"
937 );
938 }
939 });
940
941 Ok(rx)
942 }
943 }
944}
945
946#[derive(Debug, Deserialize)]
948pub(crate) struct OpenAiResponse {
949 #[serde(default)]
950 pub(crate) id: Option<String>,
951 #[serde(default)]
952 pub(crate) object: Option<String>,
953 #[serde(default)]
954 pub(crate) model: Option<String>,
955 pub(crate) choices: Vec<OpenAiChoice>,
956 pub(crate) usage: OpenAiUsage,
957}
958
959#[derive(Debug, Deserialize)]
960pub(crate) struct OpenAiChoice {
961 pub(crate) message: OpenAiMessage,
962 pub(crate) finish_reason: Option<String>,
963}
964
965#[derive(Debug, Deserialize)]
966pub(crate) struct OpenAiMessage {
967 pub(crate) reasoning_content: Option<String>,
968 pub(crate) content: Option<String>,
969 pub(crate) tool_calls: Option<Vec<OpenAiToolCall>>,
970}
971
972#[derive(Debug, Deserialize)]
973pub(crate) struct OpenAiToolCall {
974 pub(crate) id: String,
975 pub(crate) function: OpenAiFunction,
976}
977
978#[derive(Debug, Deserialize)]
979pub(crate) struct OpenAiFunction {
980 pub(crate) name: String,
981 pub(crate) arguments: String,
982}
983
984#[derive(Debug, Deserialize)]
985pub(crate) struct OpenAiUsage {
986 #[serde(default)]
987 pub(crate) prompt_tokens: usize,
988 #[serde(default)]
989 pub(crate) completion_tokens: usize,
990 #[serde(default)]
991 pub(crate) total_tokens: usize,
992 #[serde(default)]
994 pub(crate) prompt_tokens_details: Option<OpenAiPromptTokensDetails>,
995}
996
997#[derive(Debug, Deserialize)]
998pub(crate) struct OpenAiPromptTokensDetails {
999 #[serde(default)]
1000 pub(crate) cached_tokens: Option<usize>,
1001}
1002
1003#[derive(Debug, Deserialize)]
1005pub(crate) struct OpenAiStreamChunk {
1006 #[serde(default)]
1007 pub(crate) id: Option<String>,
1008 #[serde(default)]
1009 pub(crate) object: Option<String>,
1010 #[serde(default)]
1011 pub(crate) model: Option<String>,
1012 pub(crate) choices: Vec<OpenAiStreamChoice>,
1013 pub(crate) usage: Option<OpenAiUsage>,
1014}
1015
1016#[derive(Debug, Deserialize)]
1017pub(crate) struct OpenAiStreamChoice {
1018 pub(crate) message: Option<OpenAiMessage>,
1019 pub(crate) delta: Option<OpenAiDelta>,
1020 pub(crate) finish_reason: Option<String>,
1021}
1022
1023#[derive(Debug, Deserialize)]
1024pub(crate) struct OpenAiDelta {
1025 pub(crate) reasoning_content: Option<String>,
1026 pub(crate) content: Option<String>,
1027 pub(crate) tool_calls: Option<Vec<OpenAiToolCallDelta>>,
1028}
1029
1030#[derive(Debug, Deserialize)]
1031pub(crate) struct OpenAiToolCallDelta {
1032 pub(crate) index: usize,
1033 pub(crate) id: Option<String>,
1034 pub(crate) function: Option<OpenAiFunctionDelta>,
1035}
1036
1037#[derive(Debug, Deserialize)]
1038pub(crate) struct OpenAiFunctionDelta {
1039 pub(crate) name: Option<String>,
1040 pub(crate) arguments: Option<String>,
1041}