1use super::http::{default_http_client, normalize_base_url, HttpClient};
4use super::types::*;
5use super::LlmClient;
6use crate::llm::types::{ToolResultContent, ToolResultContentField};
7use crate::retry::{AttemptOutcome, RetryConfig};
8use anyhow::{Context, Result};
9use async_trait::async_trait;
10use futures::StreamExt;
11use serde::Deserialize;
12use std::sync::Arc;
13use std::time::Instant;
14use tokio::sync::mpsc;
15
16pub struct OpenAiClient {
18 pub(crate) provider_name: String,
19 pub(crate) api_key: SecretString,
20 pub(crate) model: String,
21 pub(crate) base_url: String,
22 pub(crate) chat_completions_path: String,
23 pub(crate) temperature: Option<f32>,
24 pub(crate) max_tokens: Option<usize>,
25 pub(crate) http: Arc<dyn HttpClient>,
26 pub(crate) retry_config: RetryConfig,
27}
28
29impl OpenAiClient {
30 pub(crate) fn parse_tool_arguments(tool_name: &str, arguments: &str) -> serde_json::Value {
31 if arguments.trim().is_empty() {
32 return serde_json::Value::Object(Default::default());
33 }
34
35 serde_json::from_str(arguments).unwrap_or_else(|e| {
36 tracing::warn!(
37 "Failed to parse tool arguments JSON for tool '{}': {}",
38 tool_name,
39 e
40 );
41 serde_json::json!({
42 "__parse_error": format!(
43 "Malformed tool arguments: {}. Raw input: {}",
44 e, arguments
45 )
46 })
47 })
48 }
49
50 fn merge_stream_text(text_content: &mut String, incoming: &str) -> Option<String> {
51 if incoming.is_empty() {
52 return None;
53 }
54 if text_content.is_empty() {
55 text_content.push_str(incoming);
56 return Some(incoming.to_string());
57 }
58 if incoming == text_content.as_str() || text_content.ends_with(incoming) {
59 return None;
60 }
61 if let Some(suffix) = incoming.strip_prefix(text_content.as_str()) {
62 if suffix.is_empty() {
63 return None;
64 }
65 text_content.push_str(suffix);
66 return Some(suffix.to_string());
67 }
68 text_content.push_str(incoming);
69 Some(incoming.to_string())
70 }
71
72 pub fn new(api_key: String, model: String) -> Self {
73 Self {
74 provider_name: "openai".to_string(),
75 api_key: SecretString::new(api_key),
76 model,
77 base_url: "https://api.openai.com".to_string(),
78 chat_completions_path: "/v1/chat/completions".to_string(),
79 temperature: None,
80 max_tokens: None,
81 http: default_http_client(),
82 retry_config: RetryConfig::default(),
83 }
84 }
85
86 pub fn with_base_url(mut self, base_url: String) -> Self {
87 self.base_url = normalize_base_url(&base_url);
88 self
89 }
90
91 pub fn with_provider_name(mut self, provider_name: impl Into<String>) -> Self {
92 self.provider_name = provider_name.into();
93 self
94 }
95
96 pub fn with_chat_completions_path(mut self, path: impl Into<String>) -> Self {
97 let path = path.into();
98 self.chat_completions_path = if path.starts_with('/') {
99 path
100 } else {
101 format!("/{}", path)
102 };
103 self
104 }
105
106 pub fn with_temperature(mut self, temperature: f32) -> Self {
107 self.temperature = Some(temperature);
108 self
109 }
110
111 pub fn with_max_tokens(mut self, max_tokens: usize) -> Self {
112 self.max_tokens = Some(max_tokens);
113 self
114 }
115
116 pub fn with_retry_config(mut self, retry_config: RetryConfig) -> Self {
117 self.retry_config = retry_config;
118 self
119 }
120
121 pub fn with_http_client(mut self, http: Arc<dyn HttpClient>) -> Self {
122 self.http = http;
123 self
124 }
125
126 pub(crate) fn convert_messages(&self, messages: &[Message]) -> Vec<serde_json::Value> {
127 messages
128 .iter()
129 .map(|msg| {
130 let content: serde_json::Value = if msg.content.len() == 1 {
131 match &msg.content[0] {
132 ContentBlock::Text { text } => serde_json::json!(text),
133 ContentBlock::ToolResult {
134 tool_use_id,
135 content,
136 ..
137 } => {
138 let content_str = match content {
139 ToolResultContentField::Text(s) => s.clone(),
140 ToolResultContentField::Blocks(blocks) => blocks
141 .iter()
142 .filter_map(|b| {
143 if let ToolResultContent::Text { text } = b {
144 Some(text.clone())
145 } else {
146 None
147 }
148 })
149 .collect::<Vec<_>>()
150 .join("\n"),
151 };
152 return serde_json::json!({
153 "role": "tool",
154 "tool_call_id": tool_use_id,
155 "content": content_str,
156 });
157 }
158 _ => serde_json::json!(""),
159 }
160 } else {
161 serde_json::json!(msg
162 .content
163 .iter()
164 .map(|block| {
165 match block {
166 ContentBlock::Text { text } => serde_json::json!({
167 "type": "text",
168 "text": text,
169 }),
170 ContentBlock::Image { source } => serde_json::json!({
171 "type": "image_url",
172 "image_url": {
173 "url": format!(
174 "data:{};base64,{}",
175 source.media_type, source.data
176 ),
177 }
178 }),
179 ContentBlock::ToolUse { id, name, input } => serde_json::json!({
180 "type": "function",
181 "id": id,
182 "function": {
183 "name": name,
184 "arguments": input.to_string(),
185 }
186 }),
187 _ => serde_json::json!({}),
188 }
189 })
190 .collect::<Vec<_>>())
191 };
192
193 if msg.role == "assistant" {
196 let rc = msg.reasoning_content.as_deref().unwrap_or("");
197 let tool_calls: Vec<_> = msg.tool_calls();
198 if !tool_calls.is_empty() {
199 return serde_json::json!({
200 "role": "assistant",
201 "content": msg.text(),
202 "reasoning_content": rc,
203 "tool_calls": tool_calls.iter().map(|tc| {
204 serde_json::json!({
205 "id": tc.id,
206 "type": "function",
207 "function": {
208 "name": tc.name,
209 "arguments": tc.args.to_string(),
210 }
211 })
212 }).collect::<Vec<_>>(),
213 });
214 }
215 return serde_json::json!({
216 "role": "assistant",
217 "content": content,
218 "reasoning_content": rc,
219 });
220 }
221
222 serde_json::json!({
223 "role": msg.role,
224 "content": content,
225 })
226 })
227 .collect()
228 }
229
230 pub(crate) fn convert_tools(&self, tools: &[ToolDefinition]) -> Vec<serde_json::Value> {
231 tools
232 .iter()
233 .map(|t| {
234 serde_json::json!({
235 "type": "function",
236 "function": {
237 "name": t.name,
238 "description": t.description,
239 "parameters": t.parameters,
240 }
241 })
242 })
243 .collect()
244 }
245}
246
247#[async_trait]
248impl LlmClient for OpenAiClient {
249 async fn complete(
250 &self,
251 messages: &[Message],
252 system: Option<&str>,
253 tools: &[ToolDefinition],
254 ) -> Result<LlmResponse> {
255 {
256 let request_started_at = Instant::now();
257 let mut openai_messages = Vec::new();
258
259 if let Some(sys) = system {
260 openai_messages.push(serde_json::json!({
261 "role": "system",
262 "content": sys,
263 }));
264 }
265
266 openai_messages.extend(self.convert_messages(messages));
267
268 let mut request = serde_json::json!({
269 "model": self.model,
270 "messages": openai_messages,
271 });
272
273 if let Some(temp) = self.temperature {
274 request["temperature"] = serde_json::json!(temp);
275 }
276 if let Some(max) = self.max_tokens {
277 request["max_tokens"] = serde_json::json!(max);
278 }
279
280 if !tools.is_empty() {
281 request["tools"] = serde_json::json!(self.convert_tools(tools));
282 }
283
284 let url = format!("{}{}", self.base_url, self.chat_completions_path);
285 let auth_header = format!("Bearer {}", self.api_key.expose());
286 let headers = vec![("Authorization", auth_header.as_str())];
287
288 let response = crate::retry::with_retry(&self.retry_config, |_attempt| {
289 let http = &self.http;
290 let url = &url;
291 let headers = headers.clone();
292 let request = &request;
293 async move {
294 match http.post(url, headers, request).await {
295 Ok(resp) => {
296 let status = reqwest::StatusCode::from_u16(resp.status)
297 .unwrap_or(reqwest::StatusCode::INTERNAL_SERVER_ERROR);
298 if status.is_success() {
299 AttemptOutcome::Success(resp.body)
300 } else if self.retry_config.is_retryable_status(status) {
301 AttemptOutcome::Retryable {
302 status,
303 body: resp.body,
304 retry_after: None,
305 }
306 } else {
307 AttemptOutcome::Fatal(anyhow::anyhow!(
308 "OpenAI API error at {} ({}): {}",
309 url,
310 status,
311 resp.body
312 ))
313 }
314 }
315 Err(e) => {
316 eprintln!("[DEBUG] HTTP error: {:?}", e);
317 AttemptOutcome::Fatal(e)
318 }
319 }
320 }
321 })
322 .await?;
323
324 let parsed: OpenAiResponse =
325 serde_json::from_str(&response).context("Failed to parse OpenAI response")?;
326
327 let choice = parsed.choices.into_iter().next().context("No choices")?;
328
329 let mut content = vec![];
330
331 let reasoning_content = choice.message.reasoning_content;
332
333 let text_content = choice.message.content;
334
335 if let Some(text) = text_content {
336 if !text.is_empty() {
337 content.push(ContentBlock::Text { text });
338 }
339 }
340
341 if let Some(tool_calls) = choice.message.tool_calls {
342 for tc in tool_calls {
343 content.push(ContentBlock::ToolUse {
344 id: tc.id,
345 name: tc.function.name.clone(),
346 input: Self::parse_tool_arguments(
347 &tc.function.name,
348 &tc.function.arguments,
349 ),
350 });
351 }
352 }
353
354 let llm_response = LlmResponse {
355 message: Message {
356 role: "assistant".to_string(),
357 content,
358 reasoning_content,
359 },
360 usage: TokenUsage {
361 prompt_tokens: parsed.usage.prompt_tokens,
362 completion_tokens: parsed.usage.completion_tokens,
363 total_tokens: parsed.usage.total_tokens,
364 cache_read_tokens: parsed
365 .usage
366 .prompt_tokens_details
367 .as_ref()
368 .and_then(|d| d.cached_tokens),
369 cache_write_tokens: None,
370 },
371 stop_reason: choice.finish_reason,
372 meta: Some(LlmResponseMeta {
373 provider: Some(self.provider_name.clone()),
374 request_model: Some(self.model.clone()),
375 request_url: Some(url.clone()),
376 response_id: parsed.id,
377 response_model: parsed.model,
378 response_object: parsed.object,
379 first_token_ms: None,
380 duration_ms: Some(request_started_at.elapsed().as_millis() as u64),
381 }),
382 };
383
384 crate::telemetry::record_llm_usage(
385 llm_response.usage.prompt_tokens,
386 llm_response.usage.completion_tokens,
387 llm_response.usage.total_tokens,
388 llm_response.stop_reason.as_deref(),
389 );
390
391 Ok(llm_response)
392 }
393 }
394
395 async fn complete_streaming(
396 &self,
397 messages: &[Message],
398 system: Option<&str>,
399 tools: &[ToolDefinition],
400 ) -> Result<mpsc::Receiver<StreamEvent>> {
401 {
402 let request_started_at = Instant::now();
403 let mut openai_messages = Vec::new();
404
405 if let Some(sys) = system {
406 openai_messages.push(serde_json::json!({
407 "role": "system",
408 "content": sys,
409 }));
410 }
411
412 openai_messages.extend(self.convert_messages(messages));
413
414 let mut request = serde_json::json!({
415 "model": self.model,
416 "messages": openai_messages,
417 "stream": true,
418 "stream_options": { "include_usage": true },
419 });
420
421 if let Some(temp) = self.temperature {
422 request["temperature"] = serde_json::json!(temp);
423 }
424 if let Some(max) = self.max_tokens {
425 request["max_tokens"] = serde_json::json!(max);
426 }
427
428 if !tools.is_empty() {
429 request["tools"] = serde_json::json!(self.convert_tools(tools));
430 }
431
432 let url = format!("{}{}", self.base_url, self.chat_completions_path);
433 let auth_header = format!("Bearer {}", self.api_key.expose());
434 let headers = vec![("Authorization", auth_header.as_str())];
435
436 let streaming_resp = crate::retry::with_retry(&self.retry_config, |_attempt| {
437 let http = &self.http;
438 let url = &url;
439 let headers = headers.clone();
440 let request = &request;
441 async move {
442 match http.post_streaming(url, headers, request).await {
443 Ok(resp) => {
444 let status = reqwest::StatusCode::from_u16(resp.status)
445 .unwrap_or(reqwest::StatusCode::INTERNAL_SERVER_ERROR);
446 if status.is_success() {
447 AttemptOutcome::Success(resp)
448 } else {
449 let retry_after = resp
450 .retry_after
451 .as_deref()
452 .and_then(|v| RetryConfig::parse_retry_after(Some(v)));
453 if self.retry_config.is_retryable_status(status) {
454 AttemptOutcome::Retryable {
455 status,
456 body: resp.error_body,
457 retry_after,
458 }
459 } else {
460 AttemptOutcome::Fatal(anyhow::anyhow!(
461 "OpenAI API error at {} ({}): {}",
462 url,
463 status,
464 resp.error_body
465 ))
466 }
467 }
468 }
469 Err(e) => AttemptOutcome::Fatal(anyhow::anyhow!(
470 "Failed to send streaming request: {}",
471 e
472 )),
473 }
474 }
475 })
476 .await?;
477
478 let (tx, rx) = mpsc::channel(100);
479
480 let mut stream = streaming_resp.byte_stream;
481 let provider_name = self.provider_name.clone();
482 let request_model = self.model.clone();
483 let request_url = url.clone();
484 tokio::spawn(async move {
485 let mut buffer = String::new();
486 let mut content_blocks: Vec<ContentBlock> = Vec::new();
487 let mut text_content = String::new();
488 let mut reasoning_content_accum = String::new();
489 let mut tool_calls: std::collections::BTreeMap<usize, (String, String, String)> =
490 std::collections::BTreeMap::new();
491 let mut usage = TokenUsage::default();
492 let mut finish_reason = None;
493 let mut response_id = None;
494 let mut response_model = None;
495 let mut response_object = None;
496 let mut first_token_ms = None;
497 let mut saw_done = false;
498 let mut parsed_any_event = false;
499
500 while let Some(chunk_result) = stream.next().await {
501 let chunk = match chunk_result {
502 Ok(c) => c,
503 Err(e) => {
504 tracing::error!("Stream error: {}", e);
505 break;
506 }
507 };
508
509 buffer.push_str(&String::from_utf8_lossy(&chunk));
510
511 while let Some(event_end) = buffer.find("\n\n") {
512 let event_data: String = buffer.drain(..event_end).collect();
513 buffer.drain(..2);
514
515 for line in event_data.lines() {
516 if let Some(data) = line.strip_prefix("data: ") {
517 if data == "[DONE]" {
518 saw_done = true;
519 if !text_content.is_empty() {
520 content_blocks.push(ContentBlock::Text {
521 text: text_content.clone(),
522 });
523 }
524 for (_, (id, name, args)) in tool_calls.iter() {
525 content_blocks.push(ContentBlock::ToolUse {
526 id: id.clone(),
527 name: name.clone(),
528 input: Self::parse_tool_arguments(name, args),
529 });
530 }
531 tool_calls.clear();
532 crate::telemetry::record_llm_usage(
533 usage.prompt_tokens,
534 usage.completion_tokens,
535 usage.total_tokens,
536 finish_reason.as_deref(),
537 );
538 let response = LlmResponse {
539 message: Message {
540 role: "assistant".to_string(),
541 content: std::mem::take(&mut content_blocks),
542 reasoning_content: if reasoning_content_accum.is_empty()
543 {
544 None
545 } else {
546 Some(std::mem::take(&mut reasoning_content_accum))
547 },
548 },
549 usage: usage.clone(),
550 stop_reason: std::mem::take(&mut finish_reason),
551 meta: Some(LlmResponseMeta {
552 provider: Some(provider_name.clone()),
553 request_model: Some(request_model.clone()),
554 request_url: Some(request_url.clone()),
555 response_id: response_id.clone(),
556 response_model: response_model.clone(),
557 response_object: response_object.clone(),
558 first_token_ms,
559 duration_ms: Some(
560 request_started_at.elapsed().as_millis() as u64,
561 ),
562 }),
563 };
564 let _ = tx.send(StreamEvent::Done(response)).await;
565 continue;
566 }
567
568 if let Ok(event) = serde_json::from_str::<OpenAiStreamChunk>(data) {
569 parsed_any_event = true;
570 if response_id.is_none() {
571 response_id = event.id.clone();
572 }
573 if response_model.is_none() {
574 response_model = event.model.clone();
575 }
576 if response_object.is_none() {
577 response_object = event.object.clone();
578 }
579 if let Some(u) = event.usage {
580 usage.prompt_tokens = u.prompt_tokens;
581 usage.completion_tokens = u.completion_tokens;
582 usage.total_tokens = u.total_tokens;
583 usage.cache_read_tokens = u
584 .prompt_tokens_details
585 .as_ref()
586 .and_then(|d| d.cached_tokens);
587 }
588
589 if let Some(choice) = event.choices.into_iter().next() {
590 if let Some(reason) = choice.finish_reason {
591 finish_reason = Some(reason);
592 }
593
594 let delta_content = choice
595 .delta
596 .as_ref()
597 .and_then(|delta| delta.content.clone());
598
599 if let Some(message) = choice.message {
600 if let Some(reasoning) = message.reasoning_content {
601 reasoning_content_accum.push_str(&reasoning);
602 }
603 if delta_content.is_none() {
604 if let Some(content) = message
605 .content
606 .filter(|value| !value.is_empty())
607 {
608 if first_token_ms.is_none() {
609 first_token_ms = Some(
610 request_started_at.elapsed().as_millis()
611 as u64,
612 );
613 }
614 if let Some(delta) = Self::merge_stream_text(
615 &mut text_content,
616 &content,
617 ) {
618 let _ = tx
619 .send(StreamEvent::TextDelta(delta))
620 .await;
621 }
622 }
623 }
624 if let Some(tcs) = message.tool_calls {
625 for (index, tc) in tcs.into_iter().enumerate() {
626 tool_calls.insert(
627 index,
628 (
629 tc.id,
630 tc.function.name,
631 tc.function.arguments,
632 ),
633 );
634 }
635 }
636 }
637
638 if let Some(delta) = choice.delta {
639 if let Some(ref rc) = delta.reasoning_content {
640 reasoning_content_accum.push_str(rc);
641 }
642
643 if let Some(content) = delta.content {
644 if first_token_ms.is_none() {
645 first_token_ms = Some(
646 request_started_at.elapsed().as_millis()
647 as u64,
648 );
649 }
650 if let Some(delta) = Self::merge_stream_text(
651 &mut text_content,
652 &content,
653 ) {
654 let _ = tx
655 .send(StreamEvent::TextDelta(delta))
656 .await;
657 }
658 }
659
660 if let Some(tcs) = delta.tool_calls {
661 for tc in tcs {
662 let entry = tool_calls
663 .entry(tc.index)
664 .or_insert_with(|| {
665 (
666 String::new(),
667 String::new(),
668 String::new(),
669 )
670 });
671
672 if let Some(id) = tc.id {
673 entry.0 = id;
674 }
675 if let Some(func) = tc.function {
676 if let Some(name) = func.name {
677 if first_token_ms.is_none() {
678 first_token_ms = Some(
679 request_started_at
680 .elapsed()
681 .as_millis()
682 as u64,
683 );
684 }
685 entry.1 = name.clone();
686 let _ = tx
687 .send(StreamEvent::ToolUseStart {
688 id: entry.0.clone(),
689 name,
690 })
691 .await;
692 }
693 if let Some(args) = func.arguments {
694 entry.2.push_str(&args);
695 let _ = tx
696 .send(
697 StreamEvent::ToolUseInputDelta(
698 args,
699 ),
700 )
701 .await;
702 }
703 }
704 }
705 }
706 }
707 }
708 }
709 }
710 }
711 }
712 }
713
714 if saw_done {
715 return;
716 }
717
718 let trailing = buffer.trim();
719 if !trailing.is_empty() {
720 if let Ok(event) = serde_json::from_str::<OpenAiStreamChunk>(trailing) {
721 parsed_any_event = true;
722 if response_id.is_none() {
723 response_id = event.id.clone();
724 }
725 if response_model.is_none() {
726 response_model = event.model.clone();
727 }
728 if response_object.is_none() {
729 response_object = event.object.clone();
730 }
731 if let Some(u) = event.usage {
732 usage.prompt_tokens = u.prompt_tokens;
733 usage.completion_tokens = u.completion_tokens;
734 usage.total_tokens = u.total_tokens;
735 usage.cache_read_tokens = u
736 .prompt_tokens_details
737 .as_ref()
738 .and_then(|d| d.cached_tokens);
739 }
740 if let Some(choice) = event.choices.into_iter().next() {
741 if let Some(reason) = choice.finish_reason {
742 finish_reason = Some(reason);
743 }
744 let delta_content = choice
745 .delta
746 .as_ref()
747 .and_then(|delta| delta.content.clone());
748 if let Some(message) = choice.message {
749 if let Some(reasoning) = message.reasoning_content {
750 reasoning_content_accum.push_str(&reasoning);
751 }
752 if delta_content.is_none() {
753 if let Some(content) =
754 message.content.filter(|value| !value.is_empty())
755 {
756 if first_token_ms.is_none() {
757 first_token_ms = Some(
758 request_started_at.elapsed().as_millis() as u64,
759 );
760 }
761 if let Some(delta) =
762 Self::merge_stream_text(&mut text_content, &content)
763 {
764 let _ = tx.send(StreamEvent::TextDelta(delta)).await;
765 }
766 }
767 }
768 if let Some(tcs) = message.tool_calls {
769 for (index, tc) in tcs.into_iter().enumerate() {
770 tool_calls.insert(
771 index,
772 (tc.id, tc.function.name, tc.function.arguments),
773 );
774 }
775 }
776 }
777 if let Some(delta) = choice.delta {
778 if let Some(ref rc) = delta.reasoning_content {
779 reasoning_content_accum.push_str(rc);
780 }
781 if let Some(content) = delta.content {
782 if first_token_ms.is_none() {
783 first_token_ms =
784 Some(request_started_at.elapsed().as_millis() as u64);
785 }
786 if let Some(delta) =
787 Self::merge_stream_text(&mut text_content, &content)
788 {
789 let _ = tx.send(StreamEvent::TextDelta(delta)).await;
790 }
791 }
792 }
793 }
794 } else if let Ok(response) = serde_json::from_str::<OpenAiResponse>(trailing) {
795 parsed_any_event = true;
796 response_id = response.id.clone();
797 response_model = response.model.clone();
798 response_object = response.object.clone();
799 usage.prompt_tokens = response.usage.prompt_tokens;
800 usage.completion_tokens = response.usage.completion_tokens;
801 usage.total_tokens = response.usage.total_tokens;
802 usage.cache_read_tokens = response
803 .usage
804 .prompt_tokens_details
805 .as_ref()
806 .and_then(|d| d.cached_tokens);
807
808 if let Some(choice) = response.choices.into_iter().next() {
809 finish_reason = choice.finish_reason;
810 if let Some(text) =
811 choice.message.content.filter(|text| !text.is_empty())
812 {
813 if first_token_ms.is_none() {
814 first_token_ms =
815 Some(request_started_at.elapsed().as_millis() as u64);
816 }
817 let _ = Self::merge_stream_text(&mut text_content, &text);
818 }
819 if let Some(reasoning) = choice.message.reasoning_content {
820 reasoning_content_accum.push_str(&reasoning);
821 }
822 if let Some(final_tool_calls) = choice.message.tool_calls {
823 for tc in final_tool_calls {
824 tool_calls.insert(
825 tool_calls.len(),
826 (tc.id, tc.function.name, tc.function.arguments),
827 );
828 }
829 }
830 }
831 }
832 }
833
834 if parsed_any_event
835 || !text_content.is_empty()
836 || !tool_calls.is_empty()
837 || !content_blocks.is_empty()
838 {
839 tracing::warn!(
840 provider = %provider_name,
841 model = %request_model,
842 "OpenAI-compatible stream ended without [DONE]; finalizing buffered response"
843 );
844 if !text_content.is_empty() {
845 content_blocks.push(ContentBlock::Text {
846 text: text_content.clone(),
847 });
848 }
849 for (_, (id, name, args)) in tool_calls.iter() {
850 content_blocks.push(ContentBlock::ToolUse {
851 id: id.clone(),
852 name: name.clone(),
853 input: Self::parse_tool_arguments(name, args),
854 });
855 }
856 tool_calls.clear();
857 crate::telemetry::record_llm_usage(
858 usage.prompt_tokens,
859 usage.completion_tokens,
860 usage.total_tokens,
861 finish_reason.as_deref(),
862 );
863 let response = LlmResponse {
864 message: Message {
865 role: "assistant".to_string(),
866 content: std::mem::take(&mut content_blocks),
867 reasoning_content: if reasoning_content_accum.is_empty() {
868 None
869 } else {
870 Some(std::mem::take(&mut reasoning_content_accum))
871 },
872 },
873 usage: usage.clone(),
874 stop_reason: std::mem::take(&mut finish_reason),
875 meta: Some(LlmResponseMeta {
876 provider: Some(provider_name.clone()),
877 request_model: Some(request_model.clone()),
878 request_url: Some(request_url.clone()),
879 response_id: response_id.clone(),
880 response_model: response_model.clone(),
881 response_object: response_object.clone(),
882 first_token_ms,
883 duration_ms: Some(request_started_at.elapsed().as_millis() as u64),
884 }),
885 };
886 let _ = tx.send(StreamEvent::Done(response)).await;
887 } else {
888 tracing::warn!(
889 provider = %provider_name,
890 model = %request_model,
891 trailing = %trailing.chars().take(400).collect::<String>(),
892 "OpenAI-compatible stream ended without any parseable events"
893 );
894 }
895 });
896
897 Ok(rx)
898 }
899 }
900}
901
902#[derive(Debug, Deserialize)]
904pub(crate) struct OpenAiResponse {
905 #[serde(default)]
906 pub(crate) id: Option<String>,
907 #[serde(default)]
908 pub(crate) object: Option<String>,
909 #[serde(default)]
910 pub(crate) model: Option<String>,
911 pub(crate) choices: Vec<OpenAiChoice>,
912 pub(crate) usage: OpenAiUsage,
913}
914
915#[derive(Debug, Deserialize)]
916pub(crate) struct OpenAiChoice {
917 pub(crate) message: OpenAiMessage,
918 pub(crate) finish_reason: Option<String>,
919}
920
921#[derive(Debug, Deserialize)]
922pub(crate) struct OpenAiMessage {
923 pub(crate) reasoning_content: Option<String>,
924 pub(crate) content: Option<String>,
925 pub(crate) tool_calls: Option<Vec<OpenAiToolCall>>,
926}
927
928#[derive(Debug, Deserialize)]
929pub(crate) struct OpenAiToolCall {
930 pub(crate) id: String,
931 pub(crate) function: OpenAiFunction,
932}
933
934#[derive(Debug, Deserialize)]
935pub(crate) struct OpenAiFunction {
936 pub(crate) name: String,
937 pub(crate) arguments: String,
938}
939
940#[derive(Debug, Deserialize)]
941pub(crate) struct OpenAiUsage {
942 #[serde(default)]
943 pub(crate) prompt_tokens: usize,
944 #[serde(default)]
945 pub(crate) completion_tokens: usize,
946 #[serde(default)]
947 pub(crate) total_tokens: usize,
948 #[serde(default)]
950 pub(crate) prompt_tokens_details: Option<OpenAiPromptTokensDetails>,
951}
952
953#[derive(Debug, Deserialize)]
954pub(crate) struct OpenAiPromptTokensDetails {
955 #[serde(default)]
956 pub(crate) cached_tokens: Option<usize>,
957}
958
959#[derive(Debug, Deserialize)]
961pub(crate) struct OpenAiStreamChunk {
962 #[serde(default)]
963 pub(crate) id: Option<String>,
964 #[serde(default)]
965 pub(crate) object: Option<String>,
966 #[serde(default)]
967 pub(crate) model: Option<String>,
968 pub(crate) choices: Vec<OpenAiStreamChoice>,
969 pub(crate) usage: Option<OpenAiUsage>,
970}
971
972#[derive(Debug, Deserialize)]
973pub(crate) struct OpenAiStreamChoice {
974 pub(crate) message: Option<OpenAiMessage>,
975 pub(crate) delta: Option<OpenAiDelta>,
976 pub(crate) finish_reason: Option<String>,
977}
978
979#[derive(Debug, Deserialize)]
980pub(crate) struct OpenAiDelta {
981 pub(crate) reasoning_content: Option<String>,
982 pub(crate) content: Option<String>,
983 pub(crate) tool_calls: Option<Vec<OpenAiToolCallDelta>>,
984}
985
986#[derive(Debug, Deserialize)]
987pub(crate) struct OpenAiToolCallDelta {
988 pub(crate) index: usize,
989 pub(crate) id: Option<String>,
990 pub(crate) function: Option<OpenAiFunctionDelta>,
991}
992
993#[derive(Debug, Deserialize)]
994pub(crate) struct OpenAiFunctionDelta {
995 pub(crate) name: Option<String>,
996 pub(crate) arguments: Option<String>,
997}