1use super::http::{default_http_client, normalize_base_url, HttpClient};
4use super::types::*;
5use super::LlmClient;
6use crate::llm::types::{ToolResultContent, ToolResultContentField};
7use crate::retry::{AttemptOutcome, RetryConfig};
8use anyhow::{Context, Result};
9use async_trait::async_trait;
10use futures::StreamExt;
11use serde::Deserialize;
12use std::collections::HashMap;
13use std::sync::Arc;
14use std::time::Instant;
15use tokio::sync::mpsc;
16
17pub struct OpenAiClient {
19 pub(crate) provider_name: String,
20 pub(crate) api_key: SecretString,
21 pub(crate) model: String,
22 pub(crate) base_url: String,
23 pub(crate) chat_completions_path: String,
24 pub(crate) headers: HashMap<String, String>,
25 pub(crate) temperature: Option<f32>,
26 pub(crate) max_tokens: Option<usize>,
27 pub(crate) http: Arc<dyn HttpClient>,
28 pub(crate) retry_config: RetryConfig,
29}
30
31impl OpenAiClient {
32 pub(crate) fn parse_tool_arguments(tool_name: &str, arguments: &str) -> serde_json::Value {
33 if arguments.trim().is_empty() {
34 return serde_json::Value::Object(Default::default());
35 }
36
37 serde_json::from_str(arguments).unwrap_or_else(|e| {
38 tracing::warn!(
39 "Failed to parse tool arguments JSON for tool '{}': {}",
40 tool_name,
41 e
42 );
43 serde_json::json!({
44 "__parse_error": format!(
45 "Malformed tool arguments: {}. Raw input: {}",
46 e, arguments
47 )
48 })
49 })
50 }
51
52 fn merge_stream_text(text_content: &mut String, incoming: &str) -> Option<String> {
53 if incoming.is_empty() {
54 return None;
55 }
56 if text_content.is_empty() {
57 text_content.push_str(incoming);
58 return Some(incoming.to_string());
59 }
60 if incoming == text_content.as_str() || text_content.ends_with(incoming) {
61 return None;
62 }
63 if incoming.starts_with(text_content.as_str()) && incoming.len() > text_content.len() {
66 let suffix = &incoming[text_content.len()..];
67 if !suffix.is_empty() {
68 *text_content = incoming.to_string();
69 return Some(suffix.to_string());
70 }
71 return None;
72 }
73 if let Some(suffix) = incoming.strip_prefix(text_content.as_str()) {
74 if suffix.is_empty() {
75 return None;
76 }
77 text_content.push_str(suffix);
78 return Some(suffix.to_string());
79 }
80 text_content.push_str(incoming);
81 Some(incoming.to_string())
82 }
83
84 pub fn new(api_key: String, model: String) -> Self {
85 Self {
86 provider_name: "openai".to_string(),
87 api_key: SecretString::new(api_key),
88 model,
89 base_url: "https://api.openai.com".to_string(),
90 chat_completions_path: "/v1/chat/completions".to_string(),
91 headers: HashMap::new(),
92 temperature: None,
93 max_tokens: None,
94 http: default_http_client(),
95 retry_config: RetryConfig::default(),
96 }
97 }
98
99 pub fn with_base_url(mut self, base_url: String) -> Self {
100 self.base_url = normalize_base_url(&base_url);
101 self
102 }
103
104 pub fn with_provider_name(mut self, provider_name: impl Into<String>) -> Self {
105 self.provider_name = provider_name.into();
106 self
107 }
108
109 pub fn with_chat_completions_path(mut self, path: impl Into<String>) -> Self {
110 let path = path.into();
111 self.chat_completions_path = if path.starts_with('/') {
112 path
113 } else {
114 format!("/{}", path)
115 };
116 self
117 }
118
119 pub fn with_temperature(mut self, temperature: f32) -> Self {
120 self.temperature = Some(temperature);
121 self
122 }
123
124 pub fn with_headers(mut self, headers: HashMap<String, String>) -> Self {
125 self.headers = headers;
126 self
127 }
128
129 pub fn with_max_tokens(mut self, max_tokens: usize) -> Self {
130 self.max_tokens = Some(max_tokens);
131 self
132 }
133
134 pub fn with_retry_config(mut self, retry_config: RetryConfig) -> Self {
135 self.retry_config = retry_config;
136 self
137 }
138
139 pub fn with_http_client(mut self, http: Arc<dyn HttpClient>) -> Self {
140 self.http = http;
141 self
142 }
143
144 pub(crate) fn request_headers(&self) -> Vec<(String, String)> {
145 let mut headers = Vec::with_capacity(self.headers.len() + 1);
146 let has_authorization = self
147 .headers
148 .keys()
149 .any(|key| key.eq_ignore_ascii_case("authorization"));
150 if !has_authorization {
151 headers.push((
152 "Authorization".to_string(),
153 format!("Bearer {}", self.api_key.expose()),
154 ));
155 }
156 headers.extend(
157 self.headers
158 .iter()
159 .map(|(key, value)| (key.clone(), value.clone())),
160 );
161 headers
162 }
163
164 pub(crate) fn convert_messages(&self, messages: &[Message]) -> Vec<serde_json::Value> {
165 messages
166 .iter()
167 .map(|msg| {
168 let content: serde_json::Value = if msg.content.len() == 1 {
169 match &msg.content[0] {
170 ContentBlock::Text { text } => serde_json::json!(text),
171 ContentBlock::ToolResult {
172 tool_use_id,
173 content,
174 ..
175 } => {
176 let content_str = match content {
177 ToolResultContentField::Text(s) => s.clone(),
178 ToolResultContentField::Blocks(blocks) => blocks
179 .iter()
180 .filter_map(|b| {
181 if let ToolResultContent::Text { text } = b {
182 Some(text.clone())
183 } else {
184 None
185 }
186 })
187 .collect::<Vec<_>>()
188 .join("\n"),
189 };
190 return serde_json::json!({
191 "role": "tool",
192 "tool_call_id": tool_use_id,
193 "content": content_str,
194 });
195 }
196 _ => serde_json::json!(""),
197 }
198 } else {
199 serde_json::json!(msg
200 .content
201 .iter()
202 .map(|block| {
203 match block {
204 ContentBlock::Text { text } => serde_json::json!({
205 "type": "text",
206 "text": text,
207 }),
208 ContentBlock::Image { source } => serde_json::json!({
209 "type": "image_url",
210 "image_url": {
211 "url": format!(
212 "data:{};base64,{}",
213 source.media_type, source.data
214 ),
215 }
216 }),
217 ContentBlock::ToolUse { id, name, input } => serde_json::json!({
218 "type": "function",
219 "id": id,
220 "function": {
221 "name": name,
222 "arguments": input.to_string(),
223 }
224 }),
225 _ => serde_json::json!({}),
226 }
227 })
228 .collect::<Vec<_>>())
229 };
230
231 if msg.role == "assistant" {
234 let rc = msg.reasoning_content.as_deref().unwrap_or("");
235 let tool_calls: Vec<_> = msg.tool_calls();
236 if !tool_calls.is_empty() {
237 return serde_json::json!({
238 "role": "assistant",
239 "content": msg.text(),
240 "reasoning_content": rc,
241 "tool_calls": tool_calls.iter().map(|tc| {
242 serde_json::json!({
243 "id": tc.id,
244 "type": "function",
245 "function": {
246 "name": tc.name,
247 "arguments": tc.args.to_string(),
248 }
249 })
250 }).collect::<Vec<_>>(),
251 });
252 }
253 return serde_json::json!({
254 "role": "assistant",
255 "content": content,
256 "reasoning_content": rc,
257 });
258 }
259
260 serde_json::json!({
261 "role": msg.role,
262 "content": content,
263 })
264 })
265 .collect()
266 }
267
268 pub(crate) fn convert_tools(&self, tools: &[ToolDefinition]) -> Vec<serde_json::Value> {
269 tools
270 .iter()
271 .map(|t| {
272 serde_json::json!({
273 "type": "function",
274 "function": {
275 "name": t.name,
276 "description": t.description,
277 "parameters": t.parameters,
278 }
279 })
280 })
281 .collect()
282 }
283}
284
285#[async_trait]
286impl LlmClient for OpenAiClient {
287 async fn complete(
288 &self,
289 messages: &[Message],
290 system: Option<&str>,
291 tools: &[ToolDefinition],
292 ) -> Result<LlmResponse> {
293 {
294 let request_started_at = Instant::now();
295 let mut openai_messages = Vec::new();
296
297 if let Some(sys) = system {
298 openai_messages.push(serde_json::json!({
299 "role": "system",
300 "content": sys,
301 }));
302 }
303
304 openai_messages.extend(self.convert_messages(messages));
305
306 let mut request = serde_json::json!({
307 "model": self.model,
308 "messages": openai_messages,
309 });
310
311 if let Some(temp) = self.temperature {
312 request["temperature"] = serde_json::json!(temp);
313 }
314 if let Some(max) = self.max_tokens {
315 request["max_tokens"] = serde_json::json!(max);
316 }
317
318 if !tools.is_empty() {
319 request["tools"] = serde_json::json!(self.convert_tools(tools));
320 }
321
322 let url = format!("{}{}", self.base_url, self.chat_completions_path);
323 let request_headers = self.request_headers();
324
325 let response = crate::retry::with_retry(&self.retry_config, |_attempt| {
326 let http = &self.http;
327 let url = &url;
328 let request_headers = request_headers.clone();
329 let request = &request;
330 async move {
331 let headers = request_headers
332 .iter()
333 .map(|(key, value)| (key.as_str(), value.as_str()))
334 .collect::<Vec<_>>();
335 let cancel_token = tokio_util::sync::CancellationToken::new();
337 match http.post(url, headers, request, cancel_token).await {
338 Ok(resp) => {
339 let status = reqwest::StatusCode::from_u16(resp.status)
340 .unwrap_or(reqwest::StatusCode::INTERNAL_SERVER_ERROR);
341 if status.is_success() {
342 AttemptOutcome::Success(resp.body)
343 } else if self.retry_config.is_retryable_status(status) {
344 AttemptOutcome::Retryable {
345 status,
346 body: resp.body,
347 retry_after: None,
348 }
349 } else {
350 AttemptOutcome::Fatal(anyhow::anyhow!(
351 "OpenAI API error at {} ({}): {}",
352 url,
353 status,
354 resp.body
355 ))
356 }
357 }
358 Err(e) => {
359 eprintln!("[DEBUG] HTTP error: {:?}", e);
360 AttemptOutcome::Fatal(e)
361 }
362 }
363 }
364 })
365 .await?;
366
367 let parsed: OpenAiResponse =
368 serde_json::from_str(&response).context("Failed to parse OpenAI response")?;
369
370 let choice = parsed.choices.into_iter().next().context("No choices")?;
371
372 let mut content = vec![];
373
374 let reasoning_content = choice.message.reasoning_content;
375
376 let text_content = choice.message.content;
377
378 if let Some(text) = text_content {
379 if !text.is_empty() {
380 content.push(ContentBlock::Text { text });
381 }
382 }
383
384 if let Some(tool_calls) = choice.message.tool_calls {
385 for tc in tool_calls {
386 content.push(ContentBlock::ToolUse {
387 id: tc.id,
388 name: tc.function.name.clone(),
389 input: Self::parse_tool_arguments(
390 &tc.function.name,
391 &tc.function.arguments,
392 ),
393 });
394 }
395 }
396
397 let llm_response = LlmResponse {
398 message: Message {
399 role: "assistant".to_string(),
400 content,
401 reasoning_content,
402 },
403 usage: TokenUsage {
404 prompt_tokens: parsed.usage.prompt_tokens,
405 completion_tokens: parsed.usage.completion_tokens,
406 total_tokens: {
407 let t = parsed.usage.total_tokens;
408 if t == 0 {
410 parsed.usage.total_characters.unwrap_or(0)
411 } else {
412 t
413 }
414 },
415 cache_read_tokens: parsed
416 .usage
417 .prompt_tokens_details
418 .as_ref()
419 .and_then(|d| d.cached_tokens),
420 cache_write_tokens: None,
421 },
422 stop_reason: choice.finish_reason,
423 meta: Some(LlmResponseMeta {
424 provider: Some(self.provider_name.clone()),
425 request_model: Some(self.model.clone()),
426 request_url: Some(url.clone()),
427 response_id: parsed.id,
428 response_model: parsed.model,
429 response_object: parsed.object,
430 first_token_ms: None,
431 duration_ms: Some(request_started_at.elapsed().as_millis() as u64),
432 }),
433 };
434
435 crate::telemetry::record_llm_usage(
436 llm_response.usage.prompt_tokens,
437 llm_response.usage.completion_tokens,
438 llm_response.usage.total_tokens,
439 llm_response.stop_reason.as_deref(),
440 );
441
442 Ok(llm_response)
443 }
444 }
445
446 async fn complete_streaming(
447 &self,
448 messages: &[Message],
449 system: Option<&str>,
450 tools: &[ToolDefinition],
451 cancel_token: tokio_util::sync::CancellationToken,
452 ) -> Result<mpsc::Receiver<StreamEvent>> {
453 {
454 let request_started_at = Instant::now();
455 let mut openai_messages = Vec::new();
456
457 if let Some(sys) = system {
458 openai_messages.push(serde_json::json!({
459 "role": "system",
460 "content": sys,
461 }));
462 }
463
464 openai_messages.extend(self.convert_messages(messages));
465
466 let mut request = serde_json::json!({
467 "model": self.model,
468 "messages": openai_messages,
469 "stream": true,
470 "stream_options": { "include_usage": true },
471 });
472
473 if let Some(temp) = self.temperature {
474 request["temperature"] = serde_json::json!(temp);
475 }
476 if let Some(max) = self.max_tokens {
477 request["max_tokens"] = serde_json::json!(max);
478 }
479
480 if !tools.is_empty() {
481 request["tools"] = serde_json::json!(self.convert_tools(tools));
482 }
483
484 let url = format!("{}{}", self.base_url, self.chat_completions_path);
485 let request_headers = self.request_headers();
486
487 let streaming_resp = crate::retry::with_retry(&self.retry_config, |_attempt| {
488 let http = &self.http;
489 let url = &url;
490 let request_headers = request_headers.clone();
491 let request = &request;
492 let cancel_token = cancel_token.clone();
493 async move {
494 let headers = request_headers
495 .iter()
496 .map(|(key, value)| (key.as_str(), value.as_str()))
497 .collect::<Vec<_>>();
498 let resp = tokio::select! {
500 _ = cancel_token.cancelled() => {
501 return AttemptOutcome::Fatal(anyhow::anyhow!("HTTP request cancelled"));
502 }
503 result = http.post_streaming(url, headers, request, cancel_token.clone()) => {
504 match result {
505 Ok(r) => r,
506 Err(e) => {
507 return AttemptOutcome::Fatal(anyhow::anyhow!("HTTP request failed: {}", e));
508 }
509 }
510 }
511 };
512 let status = reqwest::StatusCode::from_u16(resp.status)
513 .unwrap_or(reqwest::StatusCode::INTERNAL_SERVER_ERROR);
514 if status.is_success() {
515 AttemptOutcome::Success(resp)
516 } else {
517 let retry_after = resp
518 .retry_after
519 .as_deref()
520 .and_then(|v| RetryConfig::parse_retry_after(Some(v)));
521 if self.retry_config.is_retryable_status(status) {
522 AttemptOutcome::Retryable {
523 status,
524 body: resp.error_body,
525 retry_after,
526 }
527 } else {
528 AttemptOutcome::Fatal(anyhow::anyhow!(
529 "OpenAI API error at {} ({}): {}",
530 url,
531 status,
532 resp.error_body
533 ))
534 }
535 }
536 }
537 })
538 .await?;
539
540 let (tx, rx) = mpsc::channel(100);
541
542 let mut stream = streaming_resp.byte_stream;
543 let provider_name = self.provider_name.clone();
544 let request_model = self.model.clone();
545 let request_url = url.clone();
546 tokio::spawn(async move {
547 let mut buffer = String::new();
548 let mut content_blocks: Vec<ContentBlock> = Vec::new();
549 let mut text_content = String::new();
550 let mut reasoning_content_accum = String::new();
551 let mut tool_calls: std::collections::BTreeMap<usize, (String, String, String)> =
552 std::collections::BTreeMap::new();
553 let mut usage = TokenUsage::default();
554 let mut finish_reason = None;
555 let mut response_id = None;
556 let mut response_model = None;
557 let mut response_object = None;
558 let mut first_token_ms = None;
559 let mut saw_done = false;
560 let mut parsed_any_event = false;
561
562 while let Some(chunk_result) = stream.next().await {
563 let chunk = match chunk_result {
564 Ok(c) => c,
565 Err(e) => {
566 tracing::error!("Stream error: {}", e);
567 break;
568 }
569 };
570
571 buffer.push_str(&String::from_utf8_lossy(&chunk));
572
573 while let Some(event_end) = buffer.find("\n\n") {
574 let event_data: String = buffer.drain(..event_end).collect();
575 buffer.drain(..2);
576
577 for line in event_data.lines() {
578 if let Some(data) = line.strip_prefix("data: ") {
579 if data == "[DONE]" {
580 saw_done = true;
581 if !text_content.is_empty() {
582 content_blocks.push(ContentBlock::Text {
583 text: text_content.clone(),
584 });
585 }
586 for (_, (id, name, args)) in tool_calls.iter() {
587 content_blocks.push(ContentBlock::ToolUse {
588 id: id.clone(),
589 name: name.clone(),
590 input: Self::parse_tool_arguments(name, args),
591 });
592 }
593 tool_calls.clear();
594 crate::telemetry::record_llm_usage(
595 usage.prompt_tokens,
596 usage.completion_tokens,
597 usage.total_tokens,
598 finish_reason.as_deref(),
599 );
600 let response = LlmResponse {
601 message: Message {
602 role: "assistant".to_string(),
603 content: std::mem::take(&mut content_blocks),
604 reasoning_content: if reasoning_content_accum.is_empty()
605 {
606 None
607 } else {
608 Some(std::mem::take(&mut reasoning_content_accum))
609 },
610 },
611 usage: usage.clone(),
612 stop_reason: std::mem::take(&mut finish_reason),
613 meta: Some(LlmResponseMeta {
614 provider: Some(provider_name.clone()),
615 request_model: Some(request_model.clone()),
616 request_url: Some(request_url.clone()),
617 response_id: response_id.clone(),
618 response_model: response_model.clone(),
619 response_object: response_object.clone(),
620 first_token_ms,
621 duration_ms: Some(
622 request_started_at.elapsed().as_millis() as u64,
623 ),
624 }),
625 };
626 let _ = tx.send(StreamEvent::Done(response)).await;
627 continue;
628 }
629
630 if let Ok(event) = serde_json::from_str::<OpenAiStreamChunk>(data) {
631 parsed_any_event = true;
632 if response_id.is_none() {
633 response_id = event.id.clone();
634 }
635 if response_model.is_none() {
636 response_model = event.model.clone();
637 }
638 if response_object.is_none() {
639 response_object = event.object.clone();
640 }
641 if let Some(u) = event.usage {
642 usage.prompt_tokens = u.prompt_tokens;
643 usage.completion_tokens = u.completion_tokens;
644 usage.total_tokens = u.total_tokens;
645 if usage.total_tokens == 0 {
647 usage.total_tokens = u.total_characters.unwrap_or(0);
648 }
649 usage.cache_read_tokens = u
650 .prompt_tokens_details
651 .as_ref()
652 .and_then(|d| d.cached_tokens);
653 }
654
655 if let Some(choice) = event.choices.into_iter().next() {
656 if let Some(reason) = choice.finish_reason {
657 finish_reason = Some(reason);
658 }
659
660 if let Some(message) = choice.message {
661 let skip_content = !text_content.is_empty();
664 if let Some(reasoning) = message.reasoning_content {
665 reasoning_content_accum.push_str(&reasoning);
666 if first_token_ms.is_none() {
667 first_token_ms = Some(
668 request_started_at.elapsed().as_millis()
669 as u64,
670 );
671 }
672 if let Some(delta) = Self::merge_stream_text(
673 &mut text_content,
674 &reasoning,
675 ) {
676 let _ = tx
677 .send(StreamEvent::ReasoningDelta(delta))
678 .await;
679 }
680 }
681 if !skip_content {
682 if let Some(content) = message
683 .content
684 .filter(|value| !value.is_empty())
685 {
686 if first_token_ms.is_none() {
687 first_token_ms = Some(
688 request_started_at.elapsed().as_millis()
689 as u64,
690 );
691 }
692 if let Some(delta) = Self::merge_stream_text(
693 &mut text_content,
694 &content,
695 ) {
696 let _ = tx
697 .send(StreamEvent::TextDelta(delta))
698 .await;
699 }
700 }
701 }
702 if let Some(tcs) = message.tool_calls {
703 for (index, tc) in tcs.into_iter().enumerate() {
704 tool_calls.insert(
705 index,
706 (
707 tc.id,
708 tc.function.name,
709 tc.function.arguments,
710 ),
711 );
712 }
713 }
714 } else if let Some(delta) = choice.delta {
715 if let Some(ref rc) = delta.reasoning_content {
716 reasoning_content_accum.push_str(rc);
717 if first_token_ms.is_none() {
718 first_token_ms = Some(
719 request_started_at.elapsed().as_millis()
720 as u64,
721 );
722 }
723 if let Some(delta) =
724 Self::merge_stream_text(&mut text_content, rc)
725 {
726 let _ = tx
727 .send(StreamEvent::ReasoningDelta(delta))
728 .await;
729 }
730 }
731
732 if let Some(content) = delta.content {
733 if first_token_ms.is_none() {
734 first_token_ms = Some(
735 request_started_at.elapsed().as_millis()
736 as u64,
737 );
738 }
739 if let Some(delta) = Self::merge_stream_text(
740 &mut text_content,
741 &content,
742 ) {
743 let _ = tx
744 .send(StreamEvent::TextDelta(delta))
745 .await;
746 }
747 }
748
749 if let Some(tcs) = delta.tool_calls {
750 for tc in tcs {
751 let entry = tool_calls
752 .entry(tc.index)
753 .or_insert_with(|| {
754 (
755 String::new(),
756 String::new(),
757 String::new(),
758 )
759 });
760
761 if let Some(id) = tc.id {
762 entry.0 = id;
763 }
764 if let Some(func) = tc.function {
765 if let Some(name) = func.name {
766 if first_token_ms.is_none() {
767 first_token_ms = Some(
768 request_started_at
769 .elapsed()
770 .as_millis()
771 as u64,
772 );
773 }
774 entry.1 = name.clone();
775 let _ = tx
776 .send(StreamEvent::ToolUseStart {
777 id: entry.0.clone(),
778 name,
779 })
780 .await;
781 }
782 if let Some(args) = func.arguments {
783 entry.2.push_str(&args);
784 let _ = tx
785 .send(
786 StreamEvent::ToolUseInputDelta(
787 args,
788 ),
789 )
790 .await;
791 }
792 }
793 }
794 }
795 }
796 }
797 }
798 }
799 }
800 }
801 }
802
803 if saw_done {
804 return;
805 }
806
807 let trailing = buffer.trim();
808 if !trailing.is_empty() {
809 if let Ok(event) = serde_json::from_str::<OpenAiStreamChunk>(trailing) {
810 parsed_any_event = true;
811 if response_id.is_none() {
812 response_id = event.id.clone();
813 }
814 if response_model.is_none() {
815 response_model = event.model.clone();
816 }
817 if response_object.is_none() {
818 response_object = event.object.clone();
819 }
820 if let Some(u) = event.usage {
821 usage.prompt_tokens = u.prompt_tokens;
822 usage.completion_tokens = u.completion_tokens;
823 usage.total_tokens = u.total_tokens;
824 usage.cache_read_tokens = u
825 .prompt_tokens_details
826 .as_ref()
827 .and_then(|d| d.cached_tokens);
828 }
829 if let Some(choice) = event.choices.into_iter().next() {
830 if let Some(reason) = choice.finish_reason {
831 finish_reason = Some(reason);
832 }
833 let skip_content = !text_content.is_empty();
836 if let Some(message) = choice.message {
837 if let Some(reasoning) = message.reasoning_content {
838 reasoning_content_accum.push_str(&reasoning);
839 if first_token_ms.is_none() {
840 first_token_ms =
841 Some(request_started_at.elapsed().as_millis() as u64);
842 }
843 if let Some(delta) =
844 Self::merge_stream_text(&mut text_content, &reasoning)
845 {
846 let _ = tx.send(StreamEvent::ReasoningDelta(delta)).await;
847 }
848 }
849 if !skip_content {
850 if let Some(content) =
851 message.content.filter(|value| !value.is_empty())
852 {
853 if first_token_ms.is_none() {
854 first_token_ms = Some(
855 request_started_at.elapsed().as_millis() as u64,
856 );
857 }
858 if let Some(delta) =
859 Self::merge_stream_text(&mut text_content, &content)
860 {
861 let _ = tx.send(StreamEvent::TextDelta(delta)).await;
862 }
863 }
864 }
865 if let Some(tcs) = message.tool_calls {
866 for (index, tc) in tcs.into_iter().enumerate() {
867 tool_calls.insert(
868 index,
869 (tc.id, tc.function.name, tc.function.arguments),
870 );
871 }
872 }
873 } else if let Some(delta) = choice.delta {
874 if let Some(ref rc) = delta.reasoning_content {
875 reasoning_content_accum.push_str(rc);
876 if first_token_ms.is_none() {
877 first_token_ms =
878 Some(request_started_at.elapsed().as_millis() as u64);
879 }
880 if let Some(delta) =
881 Self::merge_stream_text(&mut text_content, rc)
882 {
883 let _ = tx.send(StreamEvent::ReasoningDelta(delta)).await;
884 }
885 }
886 if let Some(content) = delta.content {
887 if first_token_ms.is_none() {
888 first_token_ms =
889 Some(request_started_at.elapsed().as_millis() as u64);
890 }
891 if let Some(delta) =
892 Self::merge_stream_text(&mut text_content, &content)
893 {
894 let _ = tx.send(StreamEvent::TextDelta(delta)).await;
895 }
896 }
897 }
898 }
899 } else if let Ok(response) = serde_json::from_str::<OpenAiResponse>(trailing) {
900 parsed_any_event = true;
901 response_id = response.id.clone();
902 response_model = response.model.clone();
903 response_object = response.object.clone();
904 usage.prompt_tokens = response.usage.prompt_tokens;
905 usage.completion_tokens = response.usage.completion_tokens;
906 usage.total_tokens = response.usage.total_tokens;
907 if usage.total_tokens == 0 {
909 usage.total_tokens = response.usage.total_characters.unwrap_or(0);
910 }
911 usage.cache_read_tokens = response
912 .usage
913 .prompt_tokens_details
914 .as_ref()
915 .and_then(|d| d.cached_tokens);
916
917 if let Some(choice) = response.choices.into_iter().next() {
918 finish_reason = choice.finish_reason;
919 if let Some(text) =
920 choice.message.content.filter(|text| !text.is_empty())
921 {
922 if first_token_ms.is_none() {
923 first_token_ms =
924 Some(request_started_at.elapsed().as_millis() as u64);
925 }
926 let _ = Self::merge_stream_text(&mut text_content, &text);
927 }
928 if let Some(reasoning) = choice.message.reasoning_content {
929 reasoning_content_accum.push_str(&reasoning);
930 }
931 if let Some(final_tool_calls) = choice.message.tool_calls {
932 for tc in final_tool_calls {
933 tool_calls.insert(
934 tool_calls.len(),
935 (tc.id, tc.function.name, tc.function.arguments),
936 );
937 }
938 }
939 }
940 }
941 }
942
943 if parsed_any_event
944 || !text_content.is_empty()
945 || !tool_calls.is_empty()
946 || !content_blocks.is_empty()
947 {
948 tracing::warn!(
949 provider = %provider_name,
950 model = %request_model,
951 "OpenAI-compatible stream ended without [DONE]; finalizing buffered response"
952 );
953 if !text_content.is_empty() {
954 content_blocks.push(ContentBlock::Text {
955 text: text_content.clone(),
956 });
957 }
958 for (_, (id, name, args)) in tool_calls.iter() {
959 content_blocks.push(ContentBlock::ToolUse {
960 id: id.clone(),
961 name: name.clone(),
962 input: Self::parse_tool_arguments(name, args),
963 });
964 }
965 tool_calls.clear();
966 crate::telemetry::record_llm_usage(
967 usage.prompt_tokens,
968 usage.completion_tokens,
969 usage.total_tokens,
970 finish_reason.as_deref(),
971 );
972 let response = LlmResponse {
973 message: Message {
974 role: "assistant".to_string(),
975 content: std::mem::take(&mut content_blocks),
976 reasoning_content: if reasoning_content_accum.is_empty() {
977 None
978 } else {
979 Some(std::mem::take(&mut reasoning_content_accum))
980 },
981 },
982 usage: usage.clone(),
983 stop_reason: std::mem::take(&mut finish_reason),
984 meta: Some(LlmResponseMeta {
985 provider: Some(provider_name.clone()),
986 request_model: Some(request_model.clone()),
987 request_url: Some(request_url.clone()),
988 response_id: response_id.clone(),
989 response_model: response_model.clone(),
990 response_object: response_object.clone(),
991 first_token_ms,
992 duration_ms: Some(request_started_at.elapsed().as_millis() as u64),
993 }),
994 };
995 let _ = tx.send(StreamEvent::Done(response)).await;
996 } else {
997 tracing::warn!(
998 provider = %provider_name,
999 model = %request_model,
1000 trailing = %trailing.chars().take(400).collect::<String>(),
1001 "OpenAI-compatible stream ended without any parseable events"
1002 );
1003 }
1004 });
1005
1006 Ok(rx)
1007 }
1008 }
1009}
1010
1011#[derive(Debug, Deserialize)]
1013pub(crate) struct OpenAiResponse {
1014 #[serde(default)]
1015 pub(crate) id: Option<String>,
1016 #[serde(default)]
1017 pub(crate) object: Option<String>,
1018 #[serde(default)]
1019 pub(crate) model: Option<String>,
1020 pub(crate) choices: Vec<OpenAiChoice>,
1021 pub(crate) usage: OpenAiUsage,
1022}
1023
1024#[derive(Debug, Deserialize)]
1025pub(crate) struct OpenAiChoice {
1026 pub(crate) message: OpenAiMessage,
1027 pub(crate) finish_reason: Option<String>,
1028}
1029
1030#[derive(Debug, Deserialize)]
1031pub(crate) struct OpenAiMessage {
1032 pub(crate) reasoning_content: Option<String>,
1033 pub(crate) content: Option<String>,
1034 pub(crate) tool_calls: Option<Vec<OpenAiToolCall>>,
1035}
1036
1037#[derive(Debug, Deserialize)]
1038pub(crate) struct OpenAiToolCall {
1039 pub(crate) id: String,
1040 pub(crate) function: OpenAiFunction,
1041}
1042
1043#[derive(Debug, Deserialize)]
1044pub(crate) struct OpenAiFunction {
1045 pub(crate) name: String,
1046 pub(crate) arguments: String,
1047}
1048
1049#[derive(Debug, Deserialize)]
1050pub(crate) struct OpenAiUsage {
1051 #[serde(default)]
1052 pub(crate) prompt_tokens: usize,
1053 #[serde(default)]
1054 pub(crate) completion_tokens: usize,
1055 #[serde(default)]
1056 pub(crate) total_tokens: usize,
1057 #[serde(default)]
1059 pub(crate) total_characters: Option<usize>,
1060 #[serde(default)]
1062 pub(crate) prompt_tokens_details: Option<OpenAiPromptTokensDetails>,
1063}
1064
1065#[derive(Debug, Deserialize)]
1066pub(crate) struct OpenAiPromptTokensDetails {
1067 #[serde(default)]
1068 pub(crate) cached_tokens: Option<usize>,
1069}
1070
1071#[derive(Debug, Deserialize)]
1073pub(crate) struct OpenAiStreamChunk {
1074 #[serde(default)]
1075 pub(crate) id: Option<String>,
1076 #[serde(default)]
1077 pub(crate) object: Option<String>,
1078 #[serde(default)]
1079 pub(crate) model: Option<String>,
1080 pub(crate) choices: Vec<OpenAiStreamChoice>,
1081 pub(crate) usage: Option<OpenAiUsage>,
1082}
1083
1084#[derive(Debug, Deserialize)]
1085pub(crate) struct OpenAiStreamChoice {
1086 pub(crate) message: Option<OpenAiMessage>,
1087 pub(crate) delta: Option<OpenAiDelta>,
1088 pub(crate) finish_reason: Option<String>,
1089}
1090
1091#[derive(Debug, Deserialize)]
1092pub(crate) struct OpenAiDelta {
1093 pub(crate) reasoning_content: Option<String>,
1094 pub(crate) content: Option<String>,
1095 pub(crate) tool_calls: Option<Vec<OpenAiToolCallDelta>>,
1096}
1097
1098#[derive(Debug, Deserialize)]
1099pub(crate) struct OpenAiToolCallDelta {
1100 pub(crate) index: usize,
1101 pub(crate) id: Option<String>,
1102 pub(crate) function: Option<OpenAiFunctionDelta>,
1103}
1104
1105#[derive(Debug, Deserialize)]
1106pub(crate) struct OpenAiFunctionDelta {
1107 pub(crate) name: Option<String>,
1108 pub(crate) arguments: Option<String>,
1109}