1use super::http::{default_http_client, normalize_base_url, HttpClient};
4use super::types::*;
5use super::LlmClient;
6use crate::llm::types::{ToolResultContent, ToolResultContentField};
7use crate::retry::{AttemptOutcome, RetryConfig};
8use anyhow::{Context, Result};
9use async_trait::async_trait;
10use futures::StreamExt;
11use serde::Deserialize;
12use std::collections::HashMap;
13use std::sync::Arc;
14use std::time::Instant;
15use tokio::sync::mpsc;
16
17pub struct OpenAiClient {
19 pub(crate) provider_name: String,
20 pub(crate) api_key: SecretString,
21 pub(crate) model: String,
22 pub(crate) base_url: String,
23 pub(crate) chat_completions_path: String,
24 pub(crate) headers: HashMap<String, String>,
25 pub(crate) temperature: Option<f32>,
26 pub(crate) max_tokens: Option<usize>,
27 pub(crate) http: Arc<dyn HttpClient>,
28 pub(crate) retry_config: RetryConfig,
29}
30
31impl OpenAiClient {
32 pub(crate) fn parse_tool_arguments(tool_name: &str, arguments: &str) -> serde_json::Value {
33 if arguments.trim().is_empty() {
34 return serde_json::Value::Object(Default::default());
35 }
36
37 serde_json::from_str(arguments).unwrap_or_else(|e| {
38 tracing::warn!(
39 "Failed to parse tool arguments JSON for tool '{}': {}",
40 tool_name,
41 e
42 );
43 serde_json::json!({
44 "__parse_error": format!(
45 "Malformed tool arguments: {}. Raw input: {}",
46 e, arguments
47 )
48 })
49 })
50 }
51
52 fn merge_stream_text(text_content: &mut String, incoming: &str) -> Option<String> {
53 if incoming.is_empty() {
54 return None;
55 }
56 if text_content.is_empty() {
57 text_content.push_str(incoming);
58 return Some(incoming.to_string());
59 }
60 if incoming == text_content.as_str() || text_content.ends_with(incoming) {
61 return None;
62 }
63 if let Some(suffix) = incoming.strip_prefix(text_content.as_str()) {
64 if suffix.is_empty() {
65 return None;
66 }
67 text_content.push_str(suffix);
68 return Some(suffix.to_string());
69 }
70 text_content.push_str(incoming);
71 Some(incoming.to_string())
72 }
73
74 pub fn new(api_key: String, model: String) -> Self {
75 Self {
76 provider_name: "openai".to_string(),
77 api_key: SecretString::new(api_key),
78 model,
79 base_url: "https://api.openai.com".to_string(),
80 chat_completions_path: "/v1/chat/completions".to_string(),
81 headers: HashMap::new(),
82 temperature: None,
83 max_tokens: None,
84 http: default_http_client(),
85 retry_config: RetryConfig::default(),
86 }
87 }
88
89 pub fn with_base_url(mut self, base_url: String) -> Self {
90 self.base_url = normalize_base_url(&base_url);
91 self
92 }
93
94 pub fn with_provider_name(mut self, provider_name: impl Into<String>) -> Self {
95 self.provider_name = provider_name.into();
96 self
97 }
98
99 pub fn with_chat_completions_path(mut self, path: impl Into<String>) -> Self {
100 let path = path.into();
101 self.chat_completions_path = if path.starts_with('/') {
102 path
103 } else {
104 format!("/{}", path)
105 };
106 self
107 }
108
109 pub fn with_temperature(mut self, temperature: f32) -> Self {
110 self.temperature = Some(temperature);
111 self
112 }
113
114 pub fn with_headers(mut self, headers: HashMap<String, String>) -> Self {
115 self.headers = headers;
116 self
117 }
118
119 pub fn with_max_tokens(mut self, max_tokens: usize) -> Self {
120 self.max_tokens = Some(max_tokens);
121 self
122 }
123
124 pub fn with_retry_config(mut self, retry_config: RetryConfig) -> Self {
125 self.retry_config = retry_config;
126 self
127 }
128
129 pub fn with_http_client(mut self, http: Arc<dyn HttpClient>) -> Self {
130 self.http = http;
131 self
132 }
133
134 pub(crate) fn request_headers(&self) -> Vec<(String, String)> {
135 let mut headers = Vec::with_capacity(self.headers.len() + 1);
136 let has_authorization = self
137 .headers
138 .keys()
139 .any(|key| key.eq_ignore_ascii_case("authorization"));
140 if !has_authorization {
141 headers.push((
142 "Authorization".to_string(),
143 format!("Bearer {}", self.api_key.expose()),
144 ));
145 }
146 headers.extend(
147 self.headers
148 .iter()
149 .map(|(key, value)| (key.clone(), value.clone())),
150 );
151 headers
152 }
153
154 pub(crate) fn convert_messages(&self, messages: &[Message]) -> Vec<serde_json::Value> {
155 messages
156 .iter()
157 .map(|msg| {
158 let content: serde_json::Value = if msg.content.len() == 1 {
159 match &msg.content[0] {
160 ContentBlock::Text { text } => serde_json::json!(text),
161 ContentBlock::ToolResult {
162 tool_use_id,
163 content,
164 ..
165 } => {
166 let content_str = match content {
167 ToolResultContentField::Text(s) => s.clone(),
168 ToolResultContentField::Blocks(blocks) => blocks
169 .iter()
170 .filter_map(|b| {
171 if let ToolResultContent::Text { text } = b {
172 Some(text.clone())
173 } else {
174 None
175 }
176 })
177 .collect::<Vec<_>>()
178 .join("\n"),
179 };
180 return serde_json::json!({
181 "role": "tool",
182 "tool_call_id": tool_use_id,
183 "content": content_str,
184 });
185 }
186 _ => serde_json::json!(""),
187 }
188 } else {
189 serde_json::json!(msg
190 .content
191 .iter()
192 .map(|block| {
193 match block {
194 ContentBlock::Text { text } => serde_json::json!({
195 "type": "text",
196 "text": text,
197 }),
198 ContentBlock::Image { source } => serde_json::json!({
199 "type": "image_url",
200 "image_url": {
201 "url": format!(
202 "data:{};base64,{}",
203 source.media_type, source.data
204 ),
205 }
206 }),
207 ContentBlock::ToolUse { id, name, input } => serde_json::json!({
208 "type": "function",
209 "id": id,
210 "function": {
211 "name": name,
212 "arguments": input.to_string(),
213 }
214 }),
215 _ => serde_json::json!({}),
216 }
217 })
218 .collect::<Vec<_>>())
219 };
220
221 if msg.role == "assistant" {
224 let rc = msg.reasoning_content.as_deref().unwrap_or("");
225 let tool_calls: Vec<_> = msg.tool_calls();
226 if !tool_calls.is_empty() {
227 return serde_json::json!({
228 "role": "assistant",
229 "content": msg.text(),
230 "reasoning_content": rc,
231 "tool_calls": tool_calls.iter().map(|tc| {
232 serde_json::json!({
233 "id": tc.id,
234 "type": "function",
235 "function": {
236 "name": tc.name,
237 "arguments": tc.args.to_string(),
238 }
239 })
240 }).collect::<Vec<_>>(),
241 });
242 }
243 return serde_json::json!({
244 "role": "assistant",
245 "content": content,
246 "reasoning_content": rc,
247 });
248 }
249
250 serde_json::json!({
251 "role": msg.role,
252 "content": content,
253 })
254 })
255 .collect()
256 }
257
258 pub(crate) fn convert_tools(&self, tools: &[ToolDefinition]) -> Vec<serde_json::Value> {
259 tools
260 .iter()
261 .map(|t| {
262 serde_json::json!({
263 "type": "function",
264 "function": {
265 "name": t.name,
266 "description": t.description,
267 "parameters": t.parameters,
268 }
269 })
270 })
271 .collect()
272 }
273}
274
275#[async_trait]
276impl LlmClient for OpenAiClient {
277 async fn complete(
278 &self,
279 messages: &[Message],
280 system: Option<&str>,
281 tools: &[ToolDefinition],
282 ) -> Result<LlmResponse> {
283 {
284 let request_started_at = Instant::now();
285 let mut openai_messages = Vec::new();
286
287 if let Some(sys) = system {
288 openai_messages.push(serde_json::json!({
289 "role": "system",
290 "content": sys,
291 }));
292 }
293
294 openai_messages.extend(self.convert_messages(messages));
295
296 let mut request = serde_json::json!({
297 "model": self.model,
298 "messages": openai_messages,
299 });
300
301 if let Some(temp) = self.temperature {
302 request["temperature"] = serde_json::json!(temp);
303 }
304 if let Some(max) = self.max_tokens {
305 request["max_tokens"] = serde_json::json!(max);
306 }
307
308 if !tools.is_empty() {
309 request["tools"] = serde_json::json!(self.convert_tools(tools));
310 }
311
312 let url = format!("{}{}", self.base_url, self.chat_completions_path);
313 let request_headers = self.request_headers();
314
315 let response = crate::retry::with_retry(&self.retry_config, |_attempt| {
316 let http = &self.http;
317 let url = &url;
318 let request_headers = request_headers.clone();
319 let request = &request;
320 async move {
321 let headers = request_headers
322 .iter()
323 .map(|(key, value)| (key.as_str(), value.as_str()))
324 .collect::<Vec<_>>();
325 match http.post(url, headers, request).await {
326 Ok(resp) => {
327 let status = reqwest::StatusCode::from_u16(resp.status)
328 .unwrap_or(reqwest::StatusCode::INTERNAL_SERVER_ERROR);
329 if status.is_success() {
330 AttemptOutcome::Success(resp.body)
331 } else if self.retry_config.is_retryable_status(status) {
332 AttemptOutcome::Retryable {
333 status,
334 body: resp.body,
335 retry_after: None,
336 }
337 } else {
338 AttemptOutcome::Fatal(anyhow::anyhow!(
339 "OpenAI API error at {} ({}): {}",
340 url,
341 status,
342 resp.body
343 ))
344 }
345 }
346 Err(e) => {
347 eprintln!("[DEBUG] HTTP error: {:?}", e);
348 AttemptOutcome::Fatal(e)
349 }
350 }
351 }
352 })
353 .await?;
354
355 let parsed: OpenAiResponse =
356 serde_json::from_str(&response).context("Failed to parse OpenAI response")?;
357
358 let choice = parsed.choices.into_iter().next().context("No choices")?;
359
360 let mut content = vec![];
361
362 let reasoning_content = choice.message.reasoning_content;
363
364 let text_content = choice.message.content;
365
366 if let Some(text) = text_content {
367 if !text.is_empty() {
368 content.push(ContentBlock::Text { text });
369 }
370 }
371
372 if let Some(tool_calls) = choice.message.tool_calls {
373 for tc in tool_calls {
374 content.push(ContentBlock::ToolUse {
375 id: tc.id,
376 name: tc.function.name.clone(),
377 input: Self::parse_tool_arguments(
378 &tc.function.name,
379 &tc.function.arguments,
380 ),
381 });
382 }
383 }
384
385 let llm_response = LlmResponse {
386 message: Message {
387 role: "assistant".to_string(),
388 content,
389 reasoning_content,
390 },
391 usage: TokenUsage {
392 prompt_tokens: parsed.usage.prompt_tokens,
393 completion_tokens: parsed.usage.completion_tokens,
394 total_tokens: parsed.usage.total_tokens,
395 cache_read_tokens: parsed
396 .usage
397 .prompt_tokens_details
398 .as_ref()
399 .and_then(|d| d.cached_tokens),
400 cache_write_tokens: None,
401 },
402 stop_reason: choice.finish_reason,
403 meta: Some(LlmResponseMeta {
404 provider: Some(self.provider_name.clone()),
405 request_model: Some(self.model.clone()),
406 request_url: Some(url.clone()),
407 response_id: parsed.id,
408 response_model: parsed.model,
409 response_object: parsed.object,
410 first_token_ms: None,
411 duration_ms: Some(request_started_at.elapsed().as_millis() as u64),
412 }),
413 };
414
415 crate::telemetry::record_llm_usage(
416 llm_response.usage.prompt_tokens,
417 llm_response.usage.completion_tokens,
418 llm_response.usage.total_tokens,
419 llm_response.stop_reason.as_deref(),
420 );
421
422 Ok(llm_response)
423 }
424 }
425
426 async fn complete_streaming(
427 &self,
428 messages: &[Message],
429 system: Option<&str>,
430 tools: &[ToolDefinition],
431 ) -> Result<mpsc::Receiver<StreamEvent>> {
432 {
433 let request_started_at = Instant::now();
434 let mut openai_messages = Vec::new();
435
436 if let Some(sys) = system {
437 openai_messages.push(serde_json::json!({
438 "role": "system",
439 "content": sys,
440 }));
441 }
442
443 openai_messages.extend(self.convert_messages(messages));
444
445 let mut request = serde_json::json!({
446 "model": self.model,
447 "messages": openai_messages,
448 "stream": true,
449 "stream_options": { "include_usage": true },
450 });
451
452 if let Some(temp) = self.temperature {
453 request["temperature"] = serde_json::json!(temp);
454 }
455 if let Some(max) = self.max_tokens {
456 request["max_tokens"] = serde_json::json!(max);
457 }
458
459 if !tools.is_empty() {
460 request["tools"] = serde_json::json!(self.convert_tools(tools));
461 }
462
463 let url = format!("{}{}", self.base_url, self.chat_completions_path);
464 let request_headers = self.request_headers();
465
466 let streaming_resp = crate::retry::with_retry(&self.retry_config, |_attempt| {
467 let http = &self.http;
468 let url = &url;
469 let request_headers = request_headers.clone();
470 let request = &request;
471 async move {
472 let headers = request_headers
473 .iter()
474 .map(|(key, value)| (key.as_str(), value.as_str()))
475 .collect::<Vec<_>>();
476 match http.post_streaming(url, headers, request).await {
477 Ok(resp) => {
478 let status = reqwest::StatusCode::from_u16(resp.status)
479 .unwrap_or(reqwest::StatusCode::INTERNAL_SERVER_ERROR);
480 if status.is_success() {
481 AttemptOutcome::Success(resp)
482 } else {
483 let retry_after = resp
484 .retry_after
485 .as_deref()
486 .and_then(|v| RetryConfig::parse_retry_after(Some(v)));
487 if self.retry_config.is_retryable_status(status) {
488 AttemptOutcome::Retryable {
489 status,
490 body: resp.error_body,
491 retry_after,
492 }
493 } else {
494 AttemptOutcome::Fatal(anyhow::anyhow!(
495 "OpenAI API error at {} ({}): {}",
496 url,
497 status,
498 resp.error_body
499 ))
500 }
501 }
502 }
503 Err(e) => AttemptOutcome::Fatal(anyhow::anyhow!(
504 "Failed to send streaming request: {}",
505 e
506 )),
507 }
508 }
509 })
510 .await?;
511
512 let (tx, rx) = mpsc::channel(100);
513
514 let mut stream = streaming_resp.byte_stream;
515 let provider_name = self.provider_name.clone();
516 let request_model = self.model.clone();
517 let request_url = url.clone();
518 tokio::spawn(async move {
519 let mut buffer = String::new();
520 let mut content_blocks: Vec<ContentBlock> = Vec::new();
521 let mut text_content = String::new();
522 let mut reasoning_content_accum = String::new();
523 let mut tool_calls: std::collections::BTreeMap<usize, (String, String, String)> =
524 std::collections::BTreeMap::new();
525 let mut usage = TokenUsage::default();
526 let mut finish_reason = None;
527 let mut response_id = None;
528 let mut response_model = None;
529 let mut response_object = None;
530 let mut first_token_ms = None;
531 let mut saw_done = false;
532 let mut parsed_any_event = false;
533
534 while let Some(chunk_result) = stream.next().await {
535 let chunk = match chunk_result {
536 Ok(c) => c,
537 Err(e) => {
538 tracing::error!("Stream error: {}", e);
539 break;
540 }
541 };
542
543 buffer.push_str(&String::from_utf8_lossy(&chunk));
544
545 while let Some(event_end) = buffer.find("\n\n") {
546 let event_data: String = buffer.drain(..event_end).collect();
547 buffer.drain(..2);
548
549 for line in event_data.lines() {
550 if let Some(data) = line.strip_prefix("data: ") {
551 if data == "[DONE]" {
552 saw_done = true;
553 if !text_content.is_empty() {
554 content_blocks.push(ContentBlock::Text {
555 text: text_content.clone(),
556 });
557 }
558 for (_, (id, name, args)) in tool_calls.iter() {
559 content_blocks.push(ContentBlock::ToolUse {
560 id: id.clone(),
561 name: name.clone(),
562 input: Self::parse_tool_arguments(name, args),
563 });
564 }
565 tool_calls.clear();
566 crate::telemetry::record_llm_usage(
567 usage.prompt_tokens,
568 usage.completion_tokens,
569 usage.total_tokens,
570 finish_reason.as_deref(),
571 );
572 let response = LlmResponse {
573 message: Message {
574 role: "assistant".to_string(),
575 content: std::mem::take(&mut content_blocks),
576 reasoning_content: if reasoning_content_accum.is_empty()
577 {
578 None
579 } else {
580 Some(std::mem::take(&mut reasoning_content_accum))
581 },
582 },
583 usage: usage.clone(),
584 stop_reason: std::mem::take(&mut finish_reason),
585 meta: Some(LlmResponseMeta {
586 provider: Some(provider_name.clone()),
587 request_model: Some(request_model.clone()),
588 request_url: Some(request_url.clone()),
589 response_id: response_id.clone(),
590 response_model: response_model.clone(),
591 response_object: response_object.clone(),
592 first_token_ms,
593 duration_ms: Some(
594 request_started_at.elapsed().as_millis() as u64,
595 ),
596 }),
597 };
598 let _ = tx.send(StreamEvent::Done(response)).await;
599 continue;
600 }
601
602 if let Ok(event) = serde_json::from_str::<OpenAiStreamChunk>(data) {
603 parsed_any_event = true;
604 if response_id.is_none() {
605 response_id = event.id.clone();
606 }
607 if response_model.is_none() {
608 response_model = event.model.clone();
609 }
610 if response_object.is_none() {
611 response_object = event.object.clone();
612 }
613 if let Some(u) = event.usage {
614 usage.prompt_tokens = u.prompt_tokens;
615 usage.completion_tokens = u.completion_tokens;
616 usage.total_tokens = u.total_tokens;
617 usage.cache_read_tokens = u
618 .prompt_tokens_details
619 .as_ref()
620 .and_then(|d| d.cached_tokens);
621 }
622
623 if let Some(choice) = event.choices.into_iter().next() {
624 if let Some(reason) = choice.finish_reason {
625 finish_reason = Some(reason);
626 }
627
628 let delta_content = choice
629 .delta
630 .as_ref()
631 .and_then(|delta| delta.content.clone());
632
633 if let Some(message) = choice.message {
634 if let Some(reasoning) = message.reasoning_content {
635 reasoning_content_accum.push_str(&reasoning);
636 }
637 if delta_content.is_none() {
638 if let Some(content) = message
639 .content
640 .filter(|value| !value.is_empty())
641 {
642 if first_token_ms.is_none() {
643 first_token_ms = Some(
644 request_started_at.elapsed().as_millis()
645 as u64,
646 );
647 }
648 if let Some(delta) = Self::merge_stream_text(
649 &mut text_content,
650 &content,
651 ) {
652 let _ = tx
653 .send(StreamEvent::TextDelta(delta))
654 .await;
655 }
656 }
657 }
658 if let Some(tcs) = message.tool_calls {
659 for (index, tc) in tcs.into_iter().enumerate() {
660 tool_calls.insert(
661 index,
662 (
663 tc.id,
664 tc.function.name,
665 tc.function.arguments,
666 ),
667 );
668 }
669 }
670 }
671
672 if let Some(delta) = choice.delta {
673 if let Some(ref rc) = delta.reasoning_content {
674 reasoning_content_accum.push_str(rc);
675 }
676
677 if let Some(content) = delta.content {
678 if first_token_ms.is_none() {
679 first_token_ms = Some(
680 request_started_at.elapsed().as_millis()
681 as u64,
682 );
683 }
684 if let Some(delta) = Self::merge_stream_text(
685 &mut text_content,
686 &content,
687 ) {
688 let _ = tx
689 .send(StreamEvent::TextDelta(delta))
690 .await;
691 }
692 }
693
694 if let Some(tcs) = delta.tool_calls {
695 for tc in tcs {
696 let entry = tool_calls
697 .entry(tc.index)
698 .or_insert_with(|| {
699 (
700 String::new(),
701 String::new(),
702 String::new(),
703 )
704 });
705
706 if let Some(id) = tc.id {
707 entry.0 = id;
708 }
709 if let Some(func) = tc.function {
710 if let Some(name) = func.name {
711 if first_token_ms.is_none() {
712 first_token_ms = Some(
713 request_started_at
714 .elapsed()
715 .as_millis()
716 as u64,
717 );
718 }
719 entry.1 = name.clone();
720 let _ = tx
721 .send(StreamEvent::ToolUseStart {
722 id: entry.0.clone(),
723 name,
724 })
725 .await;
726 }
727 if let Some(args) = func.arguments {
728 entry.2.push_str(&args);
729 let _ = tx
730 .send(
731 StreamEvent::ToolUseInputDelta(
732 args,
733 ),
734 )
735 .await;
736 }
737 }
738 }
739 }
740 }
741 }
742 }
743 }
744 }
745 }
746 }
747
748 if saw_done {
749 return;
750 }
751
752 let trailing = buffer.trim();
753 if !trailing.is_empty() {
754 if let Ok(event) = serde_json::from_str::<OpenAiStreamChunk>(trailing) {
755 parsed_any_event = true;
756 if response_id.is_none() {
757 response_id = event.id.clone();
758 }
759 if response_model.is_none() {
760 response_model = event.model.clone();
761 }
762 if response_object.is_none() {
763 response_object = event.object.clone();
764 }
765 if let Some(u) = event.usage {
766 usage.prompt_tokens = u.prompt_tokens;
767 usage.completion_tokens = u.completion_tokens;
768 usage.total_tokens = u.total_tokens;
769 usage.cache_read_tokens = u
770 .prompt_tokens_details
771 .as_ref()
772 .and_then(|d| d.cached_tokens);
773 }
774 if let Some(choice) = event.choices.into_iter().next() {
775 if let Some(reason) = choice.finish_reason {
776 finish_reason = Some(reason);
777 }
778 let delta_content = choice
779 .delta
780 .as_ref()
781 .and_then(|delta| delta.content.clone());
782 if let Some(message) = choice.message {
783 if let Some(reasoning) = message.reasoning_content {
784 reasoning_content_accum.push_str(&reasoning);
785 }
786 if delta_content.is_none() {
787 if let Some(content) =
788 message.content.filter(|value| !value.is_empty())
789 {
790 if first_token_ms.is_none() {
791 first_token_ms = Some(
792 request_started_at.elapsed().as_millis() as u64,
793 );
794 }
795 if let Some(delta) =
796 Self::merge_stream_text(&mut text_content, &content)
797 {
798 let _ = tx.send(StreamEvent::TextDelta(delta)).await;
799 }
800 }
801 }
802 if let Some(tcs) = message.tool_calls {
803 for (index, tc) in tcs.into_iter().enumerate() {
804 tool_calls.insert(
805 index,
806 (tc.id, tc.function.name, tc.function.arguments),
807 );
808 }
809 }
810 }
811 if let Some(delta) = choice.delta {
812 if let Some(ref rc) = delta.reasoning_content {
813 reasoning_content_accum.push_str(rc);
814 }
815 if let Some(content) = delta.content {
816 if first_token_ms.is_none() {
817 first_token_ms =
818 Some(request_started_at.elapsed().as_millis() as u64);
819 }
820 if let Some(delta) =
821 Self::merge_stream_text(&mut text_content, &content)
822 {
823 let _ = tx.send(StreamEvent::TextDelta(delta)).await;
824 }
825 }
826 }
827 }
828 } else if let Ok(response) = serde_json::from_str::<OpenAiResponse>(trailing) {
829 parsed_any_event = true;
830 response_id = response.id.clone();
831 response_model = response.model.clone();
832 response_object = response.object.clone();
833 usage.prompt_tokens = response.usage.prompt_tokens;
834 usage.completion_tokens = response.usage.completion_tokens;
835 usage.total_tokens = response.usage.total_tokens;
836 usage.cache_read_tokens = response
837 .usage
838 .prompt_tokens_details
839 .as_ref()
840 .and_then(|d| d.cached_tokens);
841
842 if let Some(choice) = response.choices.into_iter().next() {
843 finish_reason = choice.finish_reason;
844 if let Some(text) =
845 choice.message.content.filter(|text| !text.is_empty())
846 {
847 if first_token_ms.is_none() {
848 first_token_ms =
849 Some(request_started_at.elapsed().as_millis() as u64);
850 }
851 let _ = Self::merge_stream_text(&mut text_content, &text);
852 }
853 if let Some(reasoning) = choice.message.reasoning_content {
854 reasoning_content_accum.push_str(&reasoning);
855 }
856 if let Some(final_tool_calls) = choice.message.tool_calls {
857 for tc in final_tool_calls {
858 tool_calls.insert(
859 tool_calls.len(),
860 (tc.id, tc.function.name, tc.function.arguments),
861 );
862 }
863 }
864 }
865 }
866 }
867
868 if parsed_any_event
869 || !text_content.is_empty()
870 || !tool_calls.is_empty()
871 || !content_blocks.is_empty()
872 {
873 tracing::warn!(
874 provider = %provider_name,
875 model = %request_model,
876 "OpenAI-compatible stream ended without [DONE]; finalizing buffered response"
877 );
878 if !text_content.is_empty() {
879 content_blocks.push(ContentBlock::Text {
880 text: text_content.clone(),
881 });
882 }
883 for (_, (id, name, args)) in tool_calls.iter() {
884 content_blocks.push(ContentBlock::ToolUse {
885 id: id.clone(),
886 name: name.clone(),
887 input: Self::parse_tool_arguments(name, args),
888 });
889 }
890 tool_calls.clear();
891 crate::telemetry::record_llm_usage(
892 usage.prompt_tokens,
893 usage.completion_tokens,
894 usage.total_tokens,
895 finish_reason.as_deref(),
896 );
897 let response = LlmResponse {
898 message: Message {
899 role: "assistant".to_string(),
900 content: std::mem::take(&mut content_blocks),
901 reasoning_content: if reasoning_content_accum.is_empty() {
902 None
903 } else {
904 Some(std::mem::take(&mut reasoning_content_accum))
905 },
906 },
907 usage: usage.clone(),
908 stop_reason: std::mem::take(&mut finish_reason),
909 meta: Some(LlmResponseMeta {
910 provider: Some(provider_name.clone()),
911 request_model: Some(request_model.clone()),
912 request_url: Some(request_url.clone()),
913 response_id: response_id.clone(),
914 response_model: response_model.clone(),
915 response_object: response_object.clone(),
916 first_token_ms,
917 duration_ms: Some(request_started_at.elapsed().as_millis() as u64),
918 }),
919 };
920 let _ = tx.send(StreamEvent::Done(response)).await;
921 } else {
922 tracing::warn!(
923 provider = %provider_name,
924 model = %request_model,
925 trailing = %trailing.chars().take(400).collect::<String>(),
926 "OpenAI-compatible stream ended without any parseable events"
927 );
928 }
929 });
930
931 Ok(rx)
932 }
933 }
934}
935
936#[derive(Debug, Deserialize)]
938pub(crate) struct OpenAiResponse {
939 #[serde(default)]
940 pub(crate) id: Option<String>,
941 #[serde(default)]
942 pub(crate) object: Option<String>,
943 #[serde(default)]
944 pub(crate) model: Option<String>,
945 pub(crate) choices: Vec<OpenAiChoice>,
946 pub(crate) usage: OpenAiUsage,
947}
948
949#[derive(Debug, Deserialize)]
950pub(crate) struct OpenAiChoice {
951 pub(crate) message: OpenAiMessage,
952 pub(crate) finish_reason: Option<String>,
953}
954
955#[derive(Debug, Deserialize)]
956pub(crate) struct OpenAiMessage {
957 pub(crate) reasoning_content: Option<String>,
958 pub(crate) content: Option<String>,
959 pub(crate) tool_calls: Option<Vec<OpenAiToolCall>>,
960}
961
962#[derive(Debug, Deserialize)]
963pub(crate) struct OpenAiToolCall {
964 pub(crate) id: String,
965 pub(crate) function: OpenAiFunction,
966}
967
968#[derive(Debug, Deserialize)]
969pub(crate) struct OpenAiFunction {
970 pub(crate) name: String,
971 pub(crate) arguments: String,
972}
973
974#[derive(Debug, Deserialize)]
975pub(crate) struct OpenAiUsage {
976 #[serde(default)]
977 pub(crate) prompt_tokens: usize,
978 #[serde(default)]
979 pub(crate) completion_tokens: usize,
980 #[serde(default)]
981 pub(crate) total_tokens: usize,
982 #[serde(default)]
984 pub(crate) prompt_tokens_details: Option<OpenAiPromptTokensDetails>,
985}
986
987#[derive(Debug, Deserialize)]
988pub(crate) struct OpenAiPromptTokensDetails {
989 #[serde(default)]
990 pub(crate) cached_tokens: Option<usize>,
991}
992
993#[derive(Debug, Deserialize)]
995pub(crate) struct OpenAiStreamChunk {
996 #[serde(default)]
997 pub(crate) id: Option<String>,
998 #[serde(default)]
999 pub(crate) object: Option<String>,
1000 #[serde(default)]
1001 pub(crate) model: Option<String>,
1002 pub(crate) choices: Vec<OpenAiStreamChoice>,
1003 pub(crate) usage: Option<OpenAiUsage>,
1004}
1005
1006#[derive(Debug, Deserialize)]
1007pub(crate) struct OpenAiStreamChoice {
1008 pub(crate) message: Option<OpenAiMessage>,
1009 pub(crate) delta: Option<OpenAiDelta>,
1010 pub(crate) finish_reason: Option<String>,
1011}
1012
1013#[derive(Debug, Deserialize)]
1014pub(crate) struct OpenAiDelta {
1015 pub(crate) reasoning_content: Option<String>,
1016 pub(crate) content: Option<String>,
1017 pub(crate) tool_calls: Option<Vec<OpenAiToolCallDelta>>,
1018}
1019
1020#[derive(Debug, Deserialize)]
1021pub(crate) struct OpenAiToolCallDelta {
1022 pub(crate) index: usize,
1023 pub(crate) id: Option<String>,
1024 pub(crate) function: Option<OpenAiFunctionDelta>,
1025}
1026
1027#[derive(Debug, Deserialize)]
1028pub(crate) struct OpenAiFunctionDelta {
1029 pub(crate) name: Option<String>,
1030 pub(crate) arguments: Option<String>,
1031}