1use super::http::{default_http_client, normalize_base_url, HttpClient};
4use super::types::*;
5use super::LlmClient;
6use crate::llm::types::{ToolResultContent, ToolResultContentField};
7use crate::retry::{AttemptOutcome, RetryConfig};
8use anyhow::{Context, Result};
9use async_trait::async_trait;
10use futures::StreamExt;
11use serde::Deserialize;
12use std::collections::HashMap;
13use std::sync::Arc;
14use std::time::Instant;
15use tokio::sync::mpsc;
16
17pub struct OpenAiClient {
19 pub(crate) provider_name: String,
20 pub(crate) api_key: SecretString,
21 pub(crate) model: String,
22 pub(crate) base_url: String,
23 pub(crate) chat_completions_path: String,
24 pub(crate) headers: HashMap<String, String>,
25 pub(crate) temperature: Option<f32>,
26 pub(crate) max_tokens: Option<usize>,
27 pub(crate) http: Arc<dyn HttpClient>,
28 pub(crate) retry_config: RetryConfig,
29}
30
31impl OpenAiClient {
32 pub(crate) fn parse_tool_arguments(tool_name: &str, arguments: &str) -> serde_json::Value {
33 if arguments.trim().is_empty() {
34 return serde_json::Value::Object(Default::default());
35 }
36
37 serde_json::from_str(arguments).unwrap_or_else(|e| {
38 tracing::warn!(
39 "Failed to parse tool arguments JSON for tool '{}': {}",
40 tool_name,
41 e
42 );
43 serde_json::json!({
44 "__parse_error": format!(
45 "Malformed tool arguments: {}. Raw input: {}",
46 e, arguments
47 )
48 })
49 })
50 }
51
52 fn merge_stream_text(text_content: &mut String, incoming: &str) -> Option<String> {
53 if incoming.is_empty() {
54 return None;
55 }
56 if text_content.is_empty() {
57 text_content.push_str(incoming);
58 return Some(incoming.to_string());
59 }
60 if incoming == text_content.as_str() || text_content.ends_with(incoming) {
61 return None;
62 }
63 if let Some(suffix) = incoming.strip_prefix(text_content.as_str()) {
64 if suffix.is_empty() {
65 return None;
66 }
67 text_content.push_str(suffix);
68 return Some(suffix.to_string());
69 }
70 text_content.push_str(incoming);
71 Some(incoming.to_string())
72 }
73
74 pub fn new(api_key: String, model: String) -> Self {
75 Self {
76 provider_name: "openai".to_string(),
77 api_key: SecretString::new(api_key),
78 model,
79 base_url: "https://api.openai.com".to_string(),
80 chat_completions_path: "/v1/chat/completions".to_string(),
81 headers: HashMap::new(),
82 temperature: None,
83 max_tokens: None,
84 http: default_http_client(),
85 retry_config: RetryConfig::default(),
86 }
87 }
88
89 pub fn with_base_url(mut self, base_url: String) -> Self {
90 self.base_url = normalize_base_url(&base_url);
91 self
92 }
93
94 pub fn with_provider_name(mut self, provider_name: impl Into<String>) -> Self {
95 self.provider_name = provider_name.into();
96 self
97 }
98
99 pub fn with_chat_completions_path(mut self, path: impl Into<String>) -> Self {
100 let path = path.into();
101 self.chat_completions_path = if path.starts_with('/') {
102 path
103 } else {
104 format!("/{}", path)
105 };
106 self
107 }
108
109 pub fn with_temperature(mut self, temperature: f32) -> Self {
110 self.temperature = Some(temperature);
111 self
112 }
113
114 pub fn with_headers(mut self, headers: HashMap<String, String>) -> Self {
115 self.headers = headers;
116 self
117 }
118
119 pub fn with_max_tokens(mut self, max_tokens: usize) -> Self {
120 self.max_tokens = Some(max_tokens);
121 self
122 }
123
124 pub fn with_retry_config(mut self, retry_config: RetryConfig) -> Self {
125 self.retry_config = retry_config;
126 self
127 }
128
129 pub fn with_http_client(mut self, http: Arc<dyn HttpClient>) -> Self {
130 self.http = http;
131 self
132 }
133
134 pub(crate) fn request_headers(&self) -> Vec<(String, String)> {
135 let mut headers = Vec::with_capacity(self.headers.len() + 1);
136 let has_authorization = self
137 .headers
138 .keys()
139 .any(|key| key.eq_ignore_ascii_case("authorization"));
140 if !has_authorization {
141 headers.push((
142 "Authorization".to_string(),
143 format!("Bearer {}", self.api_key.expose()),
144 ));
145 }
146 headers.extend(
147 self.headers
148 .iter()
149 .map(|(key, value)| (key.clone(), value.clone())),
150 );
151 headers
152 }
153
154 pub(crate) fn convert_messages(&self, messages: &[Message]) -> Vec<serde_json::Value> {
155 messages
156 .iter()
157 .map(|msg| {
158 let content: serde_json::Value = if msg.content.len() == 1 {
159 match &msg.content[0] {
160 ContentBlock::Text { text } => serde_json::json!(text),
161 ContentBlock::ToolResult {
162 tool_use_id,
163 content,
164 ..
165 } => {
166 let content_str = match content {
167 ToolResultContentField::Text(s) => s.clone(),
168 ToolResultContentField::Blocks(blocks) => blocks
169 .iter()
170 .filter_map(|b| {
171 if let ToolResultContent::Text { text } = b {
172 Some(text.clone())
173 } else {
174 None
175 }
176 })
177 .collect::<Vec<_>>()
178 .join("\n"),
179 };
180 return serde_json::json!({
181 "role": "tool",
182 "tool_call_id": tool_use_id,
183 "content": content_str,
184 });
185 }
186 _ => serde_json::json!(""),
187 }
188 } else {
189 serde_json::json!(msg
190 .content
191 .iter()
192 .map(|block| {
193 match block {
194 ContentBlock::Text { text } => serde_json::json!({
195 "type": "text",
196 "text": text,
197 }),
198 ContentBlock::Image { source } => serde_json::json!({
199 "type": "image_url",
200 "image_url": {
201 "url": format!(
202 "data:{};base64,{}",
203 source.media_type, source.data
204 ),
205 }
206 }),
207 ContentBlock::ToolUse { id, name, input } => serde_json::json!({
208 "type": "function",
209 "id": id,
210 "function": {
211 "name": name,
212 "arguments": input.to_string(),
213 }
214 }),
215 _ => serde_json::json!({}),
216 }
217 })
218 .collect::<Vec<_>>())
219 };
220
221 if msg.role == "assistant" {
224 let rc = msg.reasoning_content.as_deref().unwrap_or("");
225 let tool_calls: Vec<_> = msg.tool_calls();
226 if !tool_calls.is_empty() {
227 return serde_json::json!({
228 "role": "assistant",
229 "content": msg.text(),
230 "reasoning_content": rc,
231 "tool_calls": tool_calls.iter().map(|tc| {
232 serde_json::json!({
233 "id": tc.id,
234 "type": "function",
235 "function": {
236 "name": tc.name,
237 "arguments": tc.args.to_string(),
238 }
239 })
240 }).collect::<Vec<_>>(),
241 });
242 }
243 return serde_json::json!({
244 "role": "assistant",
245 "content": content,
246 "reasoning_content": rc,
247 });
248 }
249
250 serde_json::json!({
251 "role": msg.role,
252 "content": content,
253 })
254 })
255 .collect()
256 }
257
258 pub(crate) fn convert_tools(&self, tools: &[ToolDefinition]) -> Vec<serde_json::Value> {
259 tools
260 .iter()
261 .map(|t| {
262 serde_json::json!({
263 "type": "function",
264 "function": {
265 "name": t.name,
266 "description": t.description,
267 "parameters": t.parameters,
268 }
269 })
270 })
271 .collect()
272 }
273}
274
275#[async_trait]
276impl LlmClient for OpenAiClient {
277 async fn complete(
278 &self,
279 messages: &[Message],
280 system: Option<&str>,
281 tools: &[ToolDefinition],
282 ) -> Result<LlmResponse> {
283 {
284 let request_started_at = Instant::now();
285 let mut openai_messages = Vec::new();
286
287 if let Some(sys) = system {
288 openai_messages.push(serde_json::json!({
289 "role": "system",
290 "content": sys,
291 }));
292 }
293
294 openai_messages.extend(self.convert_messages(messages));
295
296 let mut request = serde_json::json!({
297 "model": self.model,
298 "messages": openai_messages,
299 });
300
301 if let Some(temp) = self.temperature {
302 request["temperature"] = serde_json::json!(temp);
303 }
304 if let Some(max) = self.max_tokens {
305 request["max_tokens"] = serde_json::json!(max);
306 }
307
308 if !tools.is_empty() {
309 request["tools"] = serde_json::json!(self.convert_tools(tools));
310 }
311
312 let url = format!("{}{}", self.base_url, self.chat_completions_path);
313 let request_headers = self.request_headers();
314
315 let response = crate::retry::with_retry(&self.retry_config, |_attempt| {
316 let http = &self.http;
317 let url = &url;
318 let request_headers = request_headers.clone();
319 let request = &request;
320 async move {
321 let headers = request_headers
322 .iter()
323 .map(|(key, value)| (key.as_str(), value.as_str()))
324 .collect::<Vec<_>>();
325 let cancel_token = tokio_util::sync::CancellationToken::new();
327 match http.post(url, headers, request, cancel_token).await {
328 Ok(resp) => {
329 let status = reqwest::StatusCode::from_u16(resp.status)
330 .unwrap_or(reqwest::StatusCode::INTERNAL_SERVER_ERROR);
331 if status.is_success() {
332 AttemptOutcome::Success(resp.body)
333 } else if self.retry_config.is_retryable_status(status) {
334 AttemptOutcome::Retryable {
335 status,
336 body: resp.body,
337 retry_after: None,
338 }
339 } else {
340 AttemptOutcome::Fatal(anyhow::anyhow!(
341 "OpenAI API error at {} ({}): {}",
342 url,
343 status,
344 resp.body
345 ))
346 }
347 }
348 Err(e) => {
349 eprintln!("[DEBUG] HTTP error: {:?}", e);
350 AttemptOutcome::Fatal(e)
351 }
352 }
353 }
354 })
355 .await?;
356
357 let parsed: OpenAiResponse =
358 serde_json::from_str(&response).context("Failed to parse OpenAI response")?;
359
360 let choice = parsed.choices.into_iter().next().context("No choices")?;
361
362 let mut content = vec![];
363
364 let reasoning_content = choice.message.reasoning_content;
365
366 let text_content = choice.message.content;
367
368 if let Some(text) = text_content {
369 if !text.is_empty() {
370 content.push(ContentBlock::Text { text });
371 }
372 }
373
374 if let Some(tool_calls) = choice.message.tool_calls {
375 for tc in tool_calls {
376 content.push(ContentBlock::ToolUse {
377 id: tc.id,
378 name: tc.function.name.clone(),
379 input: Self::parse_tool_arguments(
380 &tc.function.name,
381 &tc.function.arguments,
382 ),
383 });
384 }
385 }
386
387 let llm_response = LlmResponse {
388 message: Message {
389 role: "assistant".to_string(),
390 content,
391 reasoning_content,
392 },
393 usage: TokenUsage {
394 prompt_tokens: parsed.usage.prompt_tokens,
395 completion_tokens: parsed.usage.completion_tokens,
396 total_tokens: {
397 let t = parsed.usage.total_tokens;
398 if t == 0 {
400 parsed.usage.total_characters.unwrap_or(0)
401 } else {
402 t
403 }
404 },
405 cache_read_tokens: parsed
406 .usage
407 .prompt_tokens_details
408 .as_ref()
409 .and_then(|d| d.cached_tokens),
410 cache_write_tokens: None,
411 },
412 stop_reason: choice.finish_reason,
413 meta: Some(LlmResponseMeta {
414 provider: Some(self.provider_name.clone()),
415 request_model: Some(self.model.clone()),
416 request_url: Some(url.clone()),
417 response_id: parsed.id,
418 response_model: parsed.model,
419 response_object: parsed.object,
420 first_token_ms: None,
421 duration_ms: Some(request_started_at.elapsed().as_millis() as u64),
422 }),
423 };
424
425 crate::telemetry::record_llm_usage(
426 llm_response.usage.prompt_tokens,
427 llm_response.usage.completion_tokens,
428 llm_response.usage.total_tokens,
429 llm_response.stop_reason.as_deref(),
430 );
431
432 Ok(llm_response)
433 }
434 }
435
436 async fn complete_streaming(
437 &self,
438 messages: &[Message],
439 system: Option<&str>,
440 tools: &[ToolDefinition],
441 cancel_token: tokio_util::sync::CancellationToken,
442 ) -> Result<mpsc::Receiver<StreamEvent>> {
443 {
444 let request_started_at = Instant::now();
445 let mut openai_messages = Vec::new();
446
447 if let Some(sys) = system {
448 openai_messages.push(serde_json::json!({
449 "role": "system",
450 "content": sys,
451 }));
452 }
453
454 openai_messages.extend(self.convert_messages(messages));
455
456 let mut request = serde_json::json!({
457 "model": self.model,
458 "messages": openai_messages,
459 "stream": true,
460 "stream_options": { "include_usage": true },
461 });
462
463 if let Some(temp) = self.temperature {
464 request["temperature"] = serde_json::json!(temp);
465 }
466 if let Some(max) = self.max_tokens {
467 request["max_tokens"] = serde_json::json!(max);
468 }
469
470 if !tools.is_empty() {
471 request["tools"] = serde_json::json!(self.convert_tools(tools));
472 }
473
474 let url = format!("{}{}", self.base_url, self.chat_completions_path);
475 let request_headers = self.request_headers();
476
477 let streaming_resp = crate::retry::with_retry(&self.retry_config, |_attempt| {
478 let http = &self.http;
479 let url = &url;
480 let request_headers = request_headers.clone();
481 let request = &request;
482 let cancel_token = cancel_token.clone();
483 async move {
484 let headers = request_headers
485 .iter()
486 .map(|(key, value)| (key.as_str(), value.as_str()))
487 .collect::<Vec<_>>();
488 let resp = tokio::select! {
490 _ = cancel_token.cancelled() => {
491 return AttemptOutcome::Fatal(anyhow::anyhow!("HTTP request cancelled"));
492 }
493 result = http.post_streaming(url, headers, request, cancel_token.clone()) => {
494 match result {
495 Ok(r) => r,
496 Err(e) => {
497 return AttemptOutcome::Fatal(anyhow::anyhow!("HTTP request failed: {}", e));
498 }
499 }
500 }
501 };
502 let status = reqwest::StatusCode::from_u16(resp.status)
503 .unwrap_or(reqwest::StatusCode::INTERNAL_SERVER_ERROR);
504 if status.is_success() {
505 AttemptOutcome::Success(resp)
506 } else {
507 let retry_after = resp
508 .retry_after
509 .as_deref()
510 .and_then(|v| RetryConfig::parse_retry_after(Some(v)));
511 if self.retry_config.is_retryable_status(status) {
512 AttemptOutcome::Retryable {
513 status,
514 body: resp.error_body,
515 retry_after,
516 }
517 } else {
518 AttemptOutcome::Fatal(anyhow::anyhow!(
519 "OpenAI API error at {} ({}): {}",
520 url,
521 status,
522 resp.error_body
523 ))
524 }
525 }
526 }
527 })
528 .await?;
529
530 let (tx, rx) = mpsc::channel(100);
531
532 let mut stream = streaming_resp.byte_stream;
533 let provider_name = self.provider_name.clone();
534 let request_model = self.model.clone();
535 let request_url = url.clone();
536 tokio::spawn(async move {
537 let mut buffer = String::new();
538 let mut content_blocks: Vec<ContentBlock> = Vec::new();
539 let mut text_content = String::new();
540 let mut reasoning_content_accum = String::new();
541 let mut tool_calls: std::collections::BTreeMap<usize, (String, String, String)> =
542 std::collections::BTreeMap::new();
543 let mut usage = TokenUsage::default();
544 let mut finish_reason = None;
545 let mut response_id = None;
546 let mut response_model = None;
547 let mut response_object = None;
548 let mut first_token_ms = None;
549 let mut saw_done = false;
550 let mut parsed_any_event = false;
551
552 while let Some(chunk_result) = stream.next().await {
553 let chunk = match chunk_result {
554 Ok(c) => c,
555 Err(e) => {
556 tracing::error!("Stream error: {}", e);
557 break;
558 }
559 };
560
561 buffer.push_str(&String::from_utf8_lossy(&chunk));
562
563 while let Some(event_end) = buffer.find("\n\n") {
564 let event_data: String = buffer.drain(..event_end).collect();
565 buffer.drain(..2);
566
567 for line in event_data.lines() {
568 if let Some(data) = line.strip_prefix("data: ") {
569 if data == "[DONE]" {
570 saw_done = true;
571 if !text_content.is_empty() {
572 content_blocks.push(ContentBlock::Text {
573 text: text_content.clone(),
574 });
575 }
576 for (_, (id, name, args)) in tool_calls.iter() {
577 content_blocks.push(ContentBlock::ToolUse {
578 id: id.clone(),
579 name: name.clone(),
580 input: Self::parse_tool_arguments(name, args),
581 });
582 }
583 tool_calls.clear();
584 crate::telemetry::record_llm_usage(
585 usage.prompt_tokens,
586 usage.completion_tokens,
587 usage.total_tokens,
588 finish_reason.as_deref(),
589 );
590 let response = LlmResponse {
591 message: Message {
592 role: "assistant".to_string(),
593 content: std::mem::take(&mut content_blocks),
594 reasoning_content: if reasoning_content_accum.is_empty()
595 {
596 None
597 } else {
598 Some(std::mem::take(&mut reasoning_content_accum))
599 },
600 },
601 usage: usage.clone(),
602 stop_reason: std::mem::take(&mut finish_reason),
603 meta: Some(LlmResponseMeta {
604 provider: Some(provider_name.clone()),
605 request_model: Some(request_model.clone()),
606 request_url: Some(request_url.clone()),
607 response_id: response_id.clone(),
608 response_model: response_model.clone(),
609 response_object: response_object.clone(),
610 first_token_ms,
611 duration_ms: Some(
612 request_started_at.elapsed().as_millis() as u64,
613 ),
614 }),
615 };
616 let _ = tx.send(StreamEvent::Done(response)).await;
617 continue;
618 }
619
620 if let Ok(event) = serde_json::from_str::<OpenAiStreamChunk>(data) {
621 parsed_any_event = true;
622 if response_id.is_none() {
623 response_id = event.id.clone();
624 }
625 if response_model.is_none() {
626 response_model = event.model.clone();
627 }
628 if response_object.is_none() {
629 response_object = event.object.clone();
630 }
631 if let Some(u) = event.usage {
632 usage.prompt_tokens = u.prompt_tokens;
633 usage.completion_tokens = u.completion_tokens;
634 usage.total_tokens = u.total_tokens;
635 if usage.total_tokens == 0 {
637 usage.total_tokens = u.total_characters.unwrap_or(0);
638 }
639 usage.cache_read_tokens = u
640 .prompt_tokens_details
641 .as_ref()
642 .and_then(|d| d.cached_tokens);
643 }
644
645 if let Some(choice) = event.choices.into_iter().next() {
646 if let Some(reason) = choice.finish_reason {
647 finish_reason = Some(reason);
648 }
649
650 let delta_content = choice
651 .delta
652 .as_ref()
653 .and_then(|delta| delta.content.clone());
654
655 if let Some(message) = choice.message {
656 if let Some(reasoning) = message.reasoning_content {
657 reasoning_content_accum.push_str(&reasoning);
658 let delta_content_empty = delta_content
661 .as_deref()
662 .is_none_or(str::is_empty);
663 if delta_content_empty {
664 if first_token_ms.is_none() {
665 first_token_ms = Some(
666 request_started_at.elapsed().as_millis()
667 as u64,
668 );
669 }
670 if let Some(delta) = Self::merge_stream_text(
671 &mut text_content,
672 &reasoning,
673 ) {
674 let _ = tx
675 .send(StreamEvent::ReasoningDelta(
676 delta,
677 ))
678 .await;
679 }
680 }
681 }
682 if delta_content.as_deref().is_none_or(str::is_empty) {
683 if let Some(content) = message
684 .content
685 .filter(|value| !value.is_empty())
686 {
687 if first_token_ms.is_none() {
688 first_token_ms = Some(
689 request_started_at.elapsed().as_millis()
690 as u64,
691 );
692 }
693 if let Some(delta) = Self::merge_stream_text(
694 &mut text_content,
695 &content,
696 ) {
697 let _ = tx
698 .send(StreamEvent::TextDelta(delta))
699 .await;
700 }
701 }
702 }
703 if let Some(tcs) = message.tool_calls {
704 for (index, tc) in tcs.into_iter().enumerate() {
705 tool_calls.insert(
706 index,
707 (
708 tc.id,
709 tc.function.name,
710 tc.function.arguments,
711 ),
712 );
713 }
714 }
715 }
716
717 if let Some(delta) = choice.delta {
718 if let Some(ref rc) = delta.reasoning_content {
719 reasoning_content_accum.push_str(rc);
720 let content_empty = delta
723 .content
724 .as_deref()
725 .is_none_or(|s| s.is_empty());
726 if content_empty {
727 if first_token_ms.is_none() {
728 first_token_ms = Some(
729 request_started_at.elapsed().as_millis()
730 as u64,
731 );
732 }
733 if let Some(delta) = Self::merge_stream_text(
734 &mut text_content,
735 rc,
736 ) {
737 let _ = tx
738 .send(StreamEvent::ReasoningDelta(
739 delta,
740 ))
741 .await;
742 }
743 }
744 }
745
746 if let Some(content) = delta.content {
747 if first_token_ms.is_none() {
748 first_token_ms = Some(
749 request_started_at.elapsed().as_millis()
750 as u64,
751 );
752 }
753 if let Some(delta) = Self::merge_stream_text(
754 &mut text_content,
755 &content,
756 ) {
757 let _ = tx
758 .send(StreamEvent::TextDelta(delta))
759 .await;
760 }
761 }
762
763 if let Some(tcs) = delta.tool_calls {
764 for tc in tcs {
765 let entry = tool_calls
766 .entry(tc.index)
767 .or_insert_with(|| {
768 (
769 String::new(),
770 String::new(),
771 String::new(),
772 )
773 });
774
775 if let Some(id) = tc.id {
776 entry.0 = id;
777 }
778 if let Some(func) = tc.function {
779 if let Some(name) = func.name {
780 if first_token_ms.is_none() {
781 first_token_ms = Some(
782 request_started_at
783 .elapsed()
784 .as_millis()
785 as u64,
786 );
787 }
788 entry.1 = name.clone();
789 let _ = tx
790 .send(StreamEvent::ToolUseStart {
791 id: entry.0.clone(),
792 name,
793 })
794 .await;
795 }
796 if let Some(args) = func.arguments {
797 entry.2.push_str(&args);
798 let _ = tx
799 .send(
800 StreamEvent::ToolUseInputDelta(
801 args,
802 ),
803 )
804 .await;
805 }
806 }
807 }
808 }
809 }
810 }
811 }
812 }
813 }
814 }
815 }
816
817 if saw_done {
818 return;
819 }
820
821 let trailing = buffer.trim();
822 if !trailing.is_empty() {
823 if let Ok(event) = serde_json::from_str::<OpenAiStreamChunk>(trailing) {
824 parsed_any_event = true;
825 if response_id.is_none() {
826 response_id = event.id.clone();
827 }
828 if response_model.is_none() {
829 response_model = event.model.clone();
830 }
831 if response_object.is_none() {
832 response_object = event.object.clone();
833 }
834 if let Some(u) = event.usage {
835 usage.prompt_tokens = u.prompt_tokens;
836 usage.completion_tokens = u.completion_tokens;
837 usage.total_tokens = u.total_tokens;
838 usage.cache_read_tokens = u
839 .prompt_tokens_details
840 .as_ref()
841 .and_then(|d| d.cached_tokens);
842 }
843 if let Some(choice) = event.choices.into_iter().next() {
844 if let Some(reason) = choice.finish_reason {
845 finish_reason = Some(reason);
846 }
847 let delta_content = choice
848 .delta
849 .as_ref()
850 .and_then(|delta| delta.content.clone());
851 if let Some(message) = choice.message {
852 if let Some(reasoning) = message.reasoning_content {
853 reasoning_content_accum.push_str(&reasoning);
854 }
855 if delta_content.as_deref().is_none_or(str::is_empty) {
856 if let Some(content) =
857 message.content.filter(|value| !value.is_empty())
858 {
859 if first_token_ms.is_none() {
860 first_token_ms = Some(
861 request_started_at.elapsed().as_millis() as u64,
862 );
863 }
864 if let Some(delta) =
865 Self::merge_stream_text(&mut text_content, &content)
866 {
867 let _ = tx.send(StreamEvent::TextDelta(delta)).await;
868 }
869 }
870 }
871 if let Some(tcs) = message.tool_calls {
872 for (index, tc) in tcs.into_iter().enumerate() {
873 tool_calls.insert(
874 index,
875 (tc.id, tc.function.name, tc.function.arguments),
876 );
877 }
878 }
879 }
880 if let Some(delta) = choice.delta {
881 if let Some(ref rc) = delta.reasoning_content {
882 reasoning_content_accum.push_str(rc);
883 let content_empty =
886 delta.content.as_deref().is_none_or(|s| s.is_empty());
887 if content_empty {
888 if first_token_ms.is_none() {
889 first_token_ms = Some(
890 request_started_at.elapsed().as_millis() as u64,
891 );
892 }
893 if let Some(delta) =
894 Self::merge_stream_text(&mut text_content, rc)
895 {
896 let _ =
897 tx.send(StreamEvent::ReasoningDelta(delta)).await;
898 }
899 }
900 }
901 if let Some(content) = delta.content {
902 if first_token_ms.is_none() {
903 first_token_ms =
904 Some(request_started_at.elapsed().as_millis() as u64);
905 }
906 if let Some(delta) =
907 Self::merge_stream_text(&mut text_content, &content)
908 {
909 let _ = tx.send(StreamEvent::TextDelta(delta)).await;
910 }
911 }
912 }
913 }
914 } else if let Ok(response) = serde_json::from_str::<OpenAiResponse>(trailing) {
915 parsed_any_event = true;
916 response_id = response.id.clone();
917 response_model = response.model.clone();
918 response_object = response.object.clone();
919 usage.prompt_tokens = response.usage.prompt_tokens;
920 usage.completion_tokens = response.usage.completion_tokens;
921 usage.total_tokens = response.usage.total_tokens;
922 if usage.total_tokens == 0 {
924 usage.total_tokens = response.usage.total_characters.unwrap_or(0);
925 }
926 usage.cache_read_tokens = response
927 .usage
928 .prompt_tokens_details
929 .as_ref()
930 .and_then(|d| d.cached_tokens);
931
932 if let Some(choice) = response.choices.into_iter().next() {
933 finish_reason = choice.finish_reason;
934 if let Some(text) =
935 choice.message.content.filter(|text| !text.is_empty())
936 {
937 if first_token_ms.is_none() {
938 first_token_ms =
939 Some(request_started_at.elapsed().as_millis() as u64);
940 }
941 let _ = Self::merge_stream_text(&mut text_content, &text);
942 }
943 if let Some(reasoning) = choice.message.reasoning_content {
944 reasoning_content_accum.push_str(&reasoning);
945 }
946 if let Some(final_tool_calls) = choice.message.tool_calls {
947 for tc in final_tool_calls {
948 tool_calls.insert(
949 tool_calls.len(),
950 (tc.id, tc.function.name, tc.function.arguments),
951 );
952 }
953 }
954 }
955 }
956 }
957
958 if parsed_any_event
959 || !text_content.is_empty()
960 || !tool_calls.is_empty()
961 || !content_blocks.is_empty()
962 {
963 tracing::warn!(
964 provider = %provider_name,
965 model = %request_model,
966 "OpenAI-compatible stream ended without [DONE]; finalizing buffered response"
967 );
968 if !text_content.is_empty() {
969 content_blocks.push(ContentBlock::Text {
970 text: text_content.clone(),
971 });
972 }
973 for (_, (id, name, args)) in tool_calls.iter() {
974 content_blocks.push(ContentBlock::ToolUse {
975 id: id.clone(),
976 name: name.clone(),
977 input: Self::parse_tool_arguments(name, args),
978 });
979 }
980 tool_calls.clear();
981 crate::telemetry::record_llm_usage(
982 usage.prompt_tokens,
983 usage.completion_tokens,
984 usage.total_tokens,
985 finish_reason.as_deref(),
986 );
987 let response = LlmResponse {
988 message: Message {
989 role: "assistant".to_string(),
990 content: std::mem::take(&mut content_blocks),
991 reasoning_content: if reasoning_content_accum.is_empty() {
992 None
993 } else {
994 Some(std::mem::take(&mut reasoning_content_accum))
995 },
996 },
997 usage: usage.clone(),
998 stop_reason: std::mem::take(&mut finish_reason),
999 meta: Some(LlmResponseMeta {
1000 provider: Some(provider_name.clone()),
1001 request_model: Some(request_model.clone()),
1002 request_url: Some(request_url.clone()),
1003 response_id: response_id.clone(),
1004 response_model: response_model.clone(),
1005 response_object: response_object.clone(),
1006 first_token_ms,
1007 duration_ms: Some(request_started_at.elapsed().as_millis() as u64),
1008 }),
1009 };
1010 let _ = tx.send(StreamEvent::Done(response)).await;
1011 } else {
1012 tracing::warn!(
1013 provider = %provider_name,
1014 model = %request_model,
1015 trailing = %trailing.chars().take(400).collect::<String>(),
1016 "OpenAI-compatible stream ended without any parseable events"
1017 );
1018 }
1019 });
1020
1021 Ok(rx)
1022 }
1023 }
1024}
1025
1026#[derive(Debug, Deserialize)]
1028pub(crate) struct OpenAiResponse {
1029 #[serde(default)]
1030 pub(crate) id: Option<String>,
1031 #[serde(default)]
1032 pub(crate) object: Option<String>,
1033 #[serde(default)]
1034 pub(crate) model: Option<String>,
1035 pub(crate) choices: Vec<OpenAiChoice>,
1036 pub(crate) usage: OpenAiUsage,
1037}
1038
1039#[derive(Debug, Deserialize)]
1040pub(crate) struct OpenAiChoice {
1041 pub(crate) message: OpenAiMessage,
1042 pub(crate) finish_reason: Option<String>,
1043}
1044
1045#[derive(Debug, Deserialize)]
1046pub(crate) struct OpenAiMessage {
1047 pub(crate) reasoning_content: Option<String>,
1048 pub(crate) content: Option<String>,
1049 pub(crate) tool_calls: Option<Vec<OpenAiToolCall>>,
1050}
1051
1052#[derive(Debug, Deserialize)]
1053pub(crate) struct OpenAiToolCall {
1054 pub(crate) id: String,
1055 pub(crate) function: OpenAiFunction,
1056}
1057
1058#[derive(Debug, Deserialize)]
1059pub(crate) struct OpenAiFunction {
1060 pub(crate) name: String,
1061 pub(crate) arguments: String,
1062}
1063
1064#[derive(Debug, Deserialize)]
1065pub(crate) struct OpenAiUsage {
1066 #[serde(default)]
1067 pub(crate) prompt_tokens: usize,
1068 #[serde(default)]
1069 pub(crate) completion_tokens: usize,
1070 #[serde(default)]
1071 pub(crate) total_tokens: usize,
1072 #[serde(default)]
1074 pub(crate) total_characters: Option<usize>,
1075 #[serde(default)]
1077 pub(crate) prompt_tokens_details: Option<OpenAiPromptTokensDetails>,
1078}
1079
1080#[derive(Debug, Deserialize)]
1081pub(crate) struct OpenAiPromptTokensDetails {
1082 #[serde(default)]
1083 pub(crate) cached_tokens: Option<usize>,
1084}
1085
1086#[derive(Debug, Deserialize)]
1088pub(crate) struct OpenAiStreamChunk {
1089 #[serde(default)]
1090 pub(crate) id: Option<String>,
1091 #[serde(default)]
1092 pub(crate) object: Option<String>,
1093 #[serde(default)]
1094 pub(crate) model: Option<String>,
1095 pub(crate) choices: Vec<OpenAiStreamChoice>,
1096 pub(crate) usage: Option<OpenAiUsage>,
1097}
1098
1099#[derive(Debug, Deserialize)]
1100pub(crate) struct OpenAiStreamChoice {
1101 pub(crate) message: Option<OpenAiMessage>,
1102 pub(crate) delta: Option<OpenAiDelta>,
1103 pub(crate) finish_reason: Option<String>,
1104}
1105
1106#[derive(Debug, Deserialize)]
1107pub(crate) struct OpenAiDelta {
1108 pub(crate) reasoning_content: Option<String>,
1109 pub(crate) content: Option<String>,
1110 pub(crate) tool_calls: Option<Vec<OpenAiToolCallDelta>>,
1111}
1112
1113#[derive(Debug, Deserialize)]
1114pub(crate) struct OpenAiToolCallDelta {
1115 pub(crate) index: usize,
1116 pub(crate) id: Option<String>,
1117 pub(crate) function: Option<OpenAiFunctionDelta>,
1118}
1119
1120#[derive(Debug, Deserialize)]
1121pub(crate) struct OpenAiFunctionDelta {
1122 pub(crate) name: Option<String>,
1123 pub(crate) arguments: Option<String>,
1124}