1use crate::llm::error_display;
2use crate::llm::provider::{
3 AssistantPhase, ContentPart, LLMError, LLMResponse, LLMStreamEvent, Message, MessageContent,
4 MessageRole, ToolCall,
5};
6pub use crate::llm::providers::ReasoningBuffer;
7use crate::llm::providers::common::{
8 extract_reasoning_text_from_serialized_details, map_finish_reason_common,
9};
10mod responses_stream;
11mod tag_sanitizer;
12use crate::llm::providers::split_reasoning_from_text;
13pub use responses_stream::{ResponsesNormalizedStreamOptions, create_responses_normalized_stream};
14use serde_json::{Map, Value};
15pub use tag_sanitizer::TagStreamSanitizer;
16
17#[derive(Debug, thiserror::Error)]
18pub enum StreamAssemblyError {
19 #[error("missing field `{0}` in stream payload")]
20 MissingField(&'static str),
21 #[error("invalid stream payload: {0}")]
22 InvalidPayload(String),
23}
24
25impl StreamAssemblyError {
26 #[cold]
27 pub fn into_llm_error(self, provider: &str) -> LLMError {
28 let message = self.to_string();
29 let formatted = error_display::format_llm_error(provider, &message);
30 LLMError::Provider {
31 message: formatted,
32 metadata: None,
33 }
34 }
35}
36
37pub trait StreamTelemetry: Send + Sync {
38 fn on_content_delta(&self, _delta: &str) {}
39 fn on_reasoning_delta(&self, _delta: &str) {}
40 fn on_reasoning_stage(&self, _stage: &str) {}
41 fn on_tool_call_delta(&self) {}
42}
43
44#[derive(Default)]
45pub struct NoopStreamTelemetry;
46
47impl StreamTelemetry for NoopStreamTelemetry {}
48
49#[derive(Debug, Clone, PartialEq, Eq)]
50pub enum StreamFragment {
51 Content(String),
52 Reasoning(String),
53}
54
55#[derive(Default, Debug)]
56pub struct StreamDelta {
57 fragments: Vec<StreamFragment>,
58}
59
60impl StreamDelta {
61 pub fn push_content(&mut self, text: &str) {
62 if text.is_empty() {
63 return;
64 }
65
66 match self.fragments.last_mut() {
67 Some(StreamFragment::Content(existing)) => existing.push_str(text),
68 _ => self
69 .fragments
70 .push(StreamFragment::Content(text.to_string())),
71 }
72 }
73
74 pub fn push_reasoning(&mut self, text: &str) {
75 if text.is_empty() {
76 return;
77 }
78
79 match self.fragments.last_mut() {
80 Some(StreamFragment::Reasoning(existing)) => existing.push_str(text),
81 _ => self
82 .fragments
83 .push(StreamFragment::Reasoning(text.to_string())),
84 }
85 }
86
87 pub fn is_empty(&self) -> bool {
88 self.fragments.is_empty()
89 }
90
91 pub fn into_fragments(self) -> Vec<StreamFragment> {
92 self.fragments
93 }
94
95 pub fn extend(&mut self, other: StreamDelta) {
96 self.fragments.extend(other.fragments);
97 }
98}
99
100#[derive(Default, Clone)]
101pub struct ToolCallBuilder {
102 id: Option<String>,
103 namespace: Option<String>,
104 name: Option<String>,
105 arguments: String,
106}
107
108impl ToolCallBuilder {
109 pub fn apply_delta(&mut self, delta: &Value) {
110 if let Some(id) = delta.get("id").and_then(|value| value.as_str()) {
111 self.id = Some(id.to_string());
112 }
113
114 if let Some(namespace) = delta.get("namespace").and_then(|value| value.as_str()) {
115 self.namespace = Some(namespace.to_string());
116 }
117
118 if let Some(function) = delta.get("function") {
119 if let Some(namespace) = function.get("namespace").and_then(|value| value.as_str()) {
120 self.namespace = Some(namespace.to_string());
121 }
122
123 if let Some(name) = function.get("name").and_then(|value| value.as_str()) {
124 self.name = Some(name.to_string());
125 }
126
127 if let Some(arguments_value) = function.get("arguments") {
128 if let Some(arguments) = arguments_value.as_str() {
129 self.arguments.push_str(arguments);
130 } else if arguments_value.is_object() || arguments_value.is_array() {
131 self.arguments.push_str(&arguments_value.to_string());
132 }
133 }
134 }
135 }
136
137 pub fn finalize(self, fallback_index: usize) -> Option<ToolCall> {
138 let name = self.name?;
139 let id = self
140 .id
141 .unwrap_or_else(|| format!("tool_call_{fallback_index}"));
142 let arguments = if self.arguments.is_empty() {
143 "{}".to_string()
144 } else {
145 self.arguments
146 };
147
148 Some(ToolCall::function_with_namespace(
149 id,
150 self.namespace,
151 name,
152 arguments,
153 ))
154 }
155}
156
157pub fn update_tool_calls(builders: &mut Vec<ToolCallBuilder>, deltas: &[Value]) {
158 for (position, delta) in deltas.iter().enumerate() {
159 let index = delta
160 .get("index")
161 .and_then(|value| value.as_u64())
162 .map(|value| value as usize)
163 .unwrap_or(position);
164
165 if builders.len() <= index {
166 builders.resize_with(index + 1, ToolCallBuilder::default);
167 }
168 let Some(builder) = builders.get_mut(index) else {
169 continue;
170 };
171
172 builder.apply_delta(delta);
173 }
174}
175
176pub fn finalize_tool_calls(builders: Vec<ToolCallBuilder>) -> Option<Vec<ToolCall>> {
177 let calls: Vec<ToolCall> = builders
178 .into_iter()
179 .enumerate()
180 .filter_map(|(index, builder)| builder.finalize(index))
181 .collect();
182
183 (!calls.is_empty()).then_some(calls)
184}
185
186#[derive(Debug, Clone, PartialEq, Eq)]
187pub(crate) enum FunctionOutputContentItem {
188 InputText { text: String },
189 InputImage { image_url: String },
190}
191
192impl FunctionOutputContentItem {
193 fn from_value(value: &Value) -> Option<Self> {
194 let item_type = value.get("type").and_then(Value::as_str)?;
195 match item_type {
196 "input_text" | "output_text" => Some(Self::InputText {
197 text: value.get("text").and_then(Value::as_str)?.to_string(),
198 }),
199 "input_image" => Some(Self::InputImage {
200 image_url: value.get("image_url").and_then(Value::as_str)?.to_string(),
201 }),
202 _ => None,
203 }
204 }
205
206 pub(crate) fn to_function_output_json(&self) -> Value {
207 match self {
208 Self::InputText { text } => serde_json::json!({
209 "type": "input_text",
210 "text": text
211 }),
212 Self::InputImage { image_url } => serde_json::json!({
213 "type": "input_image",
214 "image_url": image_url
215 }),
216 }
217 }
218
219 pub(crate) fn to_tool_result_json(&self) -> Value {
220 match self {
221 Self::InputText { text } => serde_json::json!({
222 "type": "output_text",
223 "text": text
224 }),
225 Self::InputImage { image_url } => serde_json::json!({
226 "type": "input_image",
227 "image_url": image_url
228 }),
229 }
230 }
231}
232
233fn parse_function_output_content_items_array(
234 items: &[Value],
235) -> Option<Vec<FunctionOutputContentItem>> {
236 items
237 .iter()
238 .map(FunctionOutputContentItem::from_value)
239 .collect::<Option<Vec<_>>>()
240}
241
242pub(crate) fn parse_function_output_content_items_value(
243 value: &Value,
244) -> Option<Vec<FunctionOutputContentItem>> {
245 match value {
246 Value::Array(items) => parse_function_output_content_items_array(items),
247 Value::Object(obj) => ["content_items", "content", "output", "body"]
248 .iter()
249 .find_map(|key| obj.get(*key))
250 .and_then(parse_function_output_content_items_value),
251 Value::String(text) => parse_function_output_content_items_text(text),
252 Value::Null | Value::Bool(_) | Value::Number(_) => None,
253 }
254}
255
256pub(crate) fn parse_function_output_content_items_text(
257 text: &str,
258) -> Option<Vec<FunctionOutputContentItem>> {
259 let trimmed = text.trim();
260 if !(trimmed.starts_with('[') || trimmed.starts_with('{')) {
261 return None;
262 }
263 let parsed: Value = serde_json::from_str(trimmed).ok()?;
264 parse_function_output_content_items_value(&parsed)
265}
266
267fn function_output_items_from_parts(parts: &[ContentPart]) -> Vec<FunctionOutputContentItem> {
268 let mut items = Vec::new();
269 for part in parts {
270 match part {
271 ContentPart::Text { text } => {
272 if text.trim().is_empty() {
273 continue;
274 }
275 items.push(FunctionOutputContentItem::InputText { text: text.clone() });
276 }
277 ContentPart::Image {
278 data, mime_type, ..
279 } => {
280 items.push(FunctionOutputContentItem::InputImage {
281 image_url: format!("data:{};base64,{}", mime_type, data),
282 });
283 }
284 ContentPart::File { .. } => {}
285 }
286 }
287 items
288}
289
290fn function_output_value_from_items(items: Vec<FunctionOutputContentItem>) -> Value {
291 if items.is_empty() {
292 return Value::String(String::new());
293 }
294 let has_image = items
295 .iter()
296 .any(|item| matches!(item, FunctionOutputContentItem::InputImage { .. }));
297 if has_image {
298 return Value::Array(
299 items
300 .iter()
301 .map(FunctionOutputContentItem::to_function_output_json)
302 .collect(),
303 );
304 }
305 Value::String(text_from_function_output_items(&items).unwrap_or_default())
306}
307
308pub(crate) fn function_output_value_from_message_content(content: &MessageContent) -> Value {
309 match content {
310 MessageContent::Text(text) => {
311 if let Some(items) = parse_function_output_content_items_text(text) {
312 return function_output_value_from_items(items);
313 }
314 Value::String(text.clone())
315 }
316 MessageContent::Parts(parts) => {
317 let items = function_output_items_from_parts(parts);
318 function_output_value_from_items(items)
319 }
320 }
321}
322
323pub(crate) fn tool_result_content_from_message_content(content: &MessageContent) -> Vec<Value> {
324 match content {
325 MessageContent::Text(text) => {
326 if text.trim().is_empty() {
327 return Vec::new();
328 }
329 if let Some(items) = parse_function_output_content_items_text(text) {
330 return items
331 .iter()
332 .map(FunctionOutputContentItem::to_tool_result_json)
333 .collect();
334 }
335 vec![serde_json::json!({
336 "type": "output_text",
337 "text": text
338 })]
339 }
340 MessageContent::Parts(parts) => function_output_items_from_parts(parts)
341 .iter()
342 .map(FunctionOutputContentItem::to_tool_result_json)
343 .collect(),
344 }
345}
346
347fn text_from_function_output_items(items: &[FunctionOutputContentItem]) -> Option<String> {
348 let mut text = String::new();
349 for item in items {
350 match item {
351 FunctionOutputContentItem::InputText { text: segment } => text.push_str(segment),
352 FunctionOutputContentItem::InputImage { .. } => return None,
353 }
354 }
355 Some(text)
356}
357
358fn function_output_value_to_history_text(value: &Value) -> String {
359 if let Some(text) = value.as_str() {
360 return text.to_string();
361 }
362 if let Some(items) = parse_function_output_content_items_value(value)
363 && let Some(text) = text_from_function_output_items(&items)
364 {
365 return text;
366 }
367 if let Some(text) = value.get("content").and_then(Value::as_str) {
368 return text.to_string();
369 }
370 value.to_string()
371}
372
373fn append_output_item_text(value: &Value, text: &mut String) {
374 if let Some(part_text) = value.get("text").and_then(Value::as_str) {
375 text.push_str(part_text);
376 }
377 if let Some(part_output) = value.get("output").and_then(Value::as_str) {
378 text.push_str(part_output);
379 }
380 if let Some(refusal) = value.get("refusal").and_then(Value::as_str) {
381 text.push_str(refusal);
382 }
383
384 match value {
385 Value::String(s) => text.push_str(s),
386 Value::Array(parts) => {
387 for part in parts {
388 append_output_item_text(part, text);
389 }
390 }
391 Value::Object(_) => {
392 if let Some(content) = value.get("content") {
393 append_output_item_text(content, text);
394 }
395 }
396 _ => {}
397 }
398}
399
400fn output_item_text(content: &Value) -> String {
401 let mut text = String::new();
402 append_output_item_text(content, &mut text);
403 text
404}
405
406fn parse_function_call_item(item: &Value) -> Option<ToolCall> {
407 let function_obj = item.get("function").and_then(Value::as_object);
408 let namespace = item
409 .get("namespace")
410 .and_then(Value::as_str)
411 .or_else(|| function_obj.and_then(|f| f.get("namespace").and_then(Value::as_str)))
412 .map(ToOwned::to_owned);
413 let name = function_obj
414 .and_then(|f| f.get("name").and_then(Value::as_str))
415 .or_else(|| item.get("name").and_then(Value::as_str))?
416 .to_string();
417
418 let id = item
419 .get("id")
420 .and_then(Value::as_str)
421 .or_else(|| item.get("call_id").and_then(Value::as_str))
422 .filter(|value| !value.is_empty())
423 .unwrap_or("tool_call_compacted")
424 .to_string();
425
426 let arguments_value = function_obj
427 .and_then(|f| f.get("arguments"))
428 .or_else(|| item.get("arguments"));
429 let arguments = arguments_value.map_or_else(
430 || "{}".to_string(),
431 |value| {
432 value
433 .as_str()
434 .map(ToOwned::to_owned)
435 .unwrap_or_else(|| value.to_string())
436 },
437 );
438
439 Some(ToolCall::function_with_namespace(
440 id, namespace, name, arguments,
441 ))
442}
443
444fn parse_message_item(item: &Value) -> Option<Message> {
445 let role = item
446 .get("role")
447 .and_then(Value::as_str)
448 .unwrap_or("assistant");
449 let content_value = item.get("content").unwrap_or(&Value::Null);
450 let content = output_item_text(content_value).trim().to_string();
451
452 let tool_calls: Vec<ToolCall> = content_value
453 .as_array()
454 .into_iter()
455 .flatten()
456 .filter_map(|part| {
457 let part_type = part.get("type").and_then(Value::as_str).unwrap_or("");
458 if part_type == "function_call" || part_type == "tool_call" {
459 parse_function_call_item(part)
460 } else {
461 None
462 }
463 })
464 .collect();
465
466 let tool_result = content_value
467 .as_array()
468 .into_iter()
469 .flatten()
470 .find_map(|part| {
471 let part_type = part.get("type").and_then(Value::as_str).unwrap_or("");
472 if part_type != "tool_result" {
473 return None;
474 }
475
476 let tool_call_id = part
477 .get("tool_call_id")
478 .and_then(Value::as_str)
479 .or_else(|| item.get("tool_call_id").and_then(Value::as_str))
480 .or_else(|| item.get("call_id").and_then(Value::as_str))
481 .map(ToOwned::to_owned)?;
482
483 let tool_output = output_item_text(part.get("content").unwrap_or(&Value::Null))
484 .trim()
485 .to_string();
486 Some((tool_call_id, tool_output))
487 });
488
489 let assistant_phase = item
490 .get("phase")
491 .and_then(Value::as_str)
492 .and_then(AssistantPhase::from_wire_str);
493
494 match role {
495 "system" => Some(Message::system(content)),
496 "developer" => Some(Message::system(content)),
497 "user" => Some(Message::user(content)),
498 "assistant" => {
499 if tool_calls.is_empty() {
500 Some(Message::assistant(content).with_phase(assistant_phase))
501 } else {
502 Some(Message::assistant_with_tools(content, tool_calls).with_phase(assistant_phase))
503 }
504 }
505 "tool" => {
506 if let Some((tool_call_id, tool_output)) = tool_result {
507 return Some(Message::tool_response(tool_call_id, tool_output));
508 }
509
510 let tool_call_id = item
511 .get("tool_call_id")
512 .and_then(Value::as_str)
513 .or_else(|| item.get("call_id").and_then(Value::as_str))
514 .map(ToOwned::to_owned)?;
515 Some(Message::tool_response(tool_call_id, content))
516 }
517 _ => Some(Message {
518 role: MessageRole::Assistant,
519 content: MessageContent::text(content),
520 ..Message::default()
521 }),
522 }
523}
524
525#[inline]
526fn preserve_opaque_item(item: &Value) -> Message {
527 Message::assistant(String::new()).with_reasoning_details(Some(vec![item.clone()]))
528}
529
530pub(crate) fn parse_compacted_output_messages(output: &[Value]) -> Vec<Message> {
535 let mut messages = Vec::new();
536
537 for item in output {
538 let item_type = item.get("type").and_then(Value::as_str).unwrap_or("");
539 match item_type {
540 "message" => {
541 if let Some(message) = parse_message_item(item) {
542 messages.push(message);
543 } else {
544 messages.push(preserve_opaque_item(item));
545 }
546 }
547 "function_call" | "tool_call" => {
548 if let Some(tool_call) = parse_function_call_item(item) {
549 messages.push(Message::assistant_with_tools(
550 String::new(),
551 vec![tool_call],
552 ));
553 }
554 }
555 "function_call_output" => {
556 let call_id = item
557 .get("call_id")
558 .and_then(Value::as_str)
559 .or_else(|| item.get("id").and_then(Value::as_str))
560 .filter(|value| !value.is_empty());
561 if let Some(call_id) = call_id {
562 let output_text = item
563 .get("output")
564 .map(function_output_value_to_history_text)
565 .unwrap_or_default();
566 messages.push(Message::tool_response(call_id.to_string(), output_text));
567 } else {
568 messages.push(preserve_opaque_item(item));
569 }
570 }
571 _ => {
572 messages.push(preserve_opaque_item(item));
573 }
574 }
575 }
576
577 messages
578}
579
580pub struct StreamAggregator {
582 pub model: String,
583 pub content: String,
584 pub reasoning: String,
585 pub reasoning_details: Vec<String>,
586 pub reasoning_buffer: ReasoningBuffer,
587 pub tool_builders: Vec<ToolCallBuilder>,
588 pub usage: Option<crate::llm::provider::Usage>,
589 pub finish_reason: crate::llm::provider::FinishReason,
590 pub sanitizer: TagStreamSanitizer,
591 pub compaction: Option<String>,
592}
593
594#[derive(Clone, Copy, Debug, PartialEq, Eq)]
595pub enum OpenAiDeltaOrder {
596 ReasoningFirst,
597 ContentFirst,
598}
599
600fn emit_reasoning_delta(
601 aggregator: &mut StreamAggregator,
602 tx: &tokio::sync::mpsc::UnboundedSender<Result<LLMStreamEvent, LLMError>>,
603 delta: &Value,
604 reasoning_field: Option<&'static str>,
605) {
606 let Some(reasoning_field) = reasoning_field else {
607 return;
608 };
609 let Some(reasoning) = delta.get(reasoning_field).and_then(Value::as_str) else {
610 return;
611 };
612 let Some(delta) = aggregator.handle_reasoning(reasoning) else {
613 return;
614 };
615 let _ = tx.send(Ok(LLMStreamEvent::Reasoning { delta }));
616}
617
618fn emit_content_delta(
619 aggregator: &mut StreamAggregator,
620 tx: &tokio::sync::mpsc::UnboundedSender<Result<LLMStreamEvent, LLMError>>,
621 delta: &Value,
622) {
623 let Some(content) = delta.get("content").and_then(Value::as_str) else {
624 return;
625 };
626 for event in aggregator.handle_content(content) {
627 let _ = tx.send(Ok(event));
628 }
629}
630
631pub fn handle_openai_compatible_chunk(
632 value: &Value,
633 aggregator: &mut StreamAggregator,
634 tx: &tokio::sync::mpsc::UnboundedSender<Result<LLMStreamEvent, LLMError>>,
635 reasoning_field: Option<&'static str>,
636 delta_order: OpenAiDeltaOrder,
637) {
638 if let Some(choices) = value.get("choices").and_then(Value::as_array)
639 && let Some(choice) = choices.first()
640 {
641 if let Some(delta) = choice.get("delta") {
642 match delta_order {
643 OpenAiDeltaOrder::ReasoningFirst => {
644 emit_reasoning_delta(aggregator, tx, delta, reasoning_field);
645 emit_content_delta(aggregator, tx, delta);
646 }
647 OpenAiDeltaOrder::ContentFirst => {
648 emit_content_delta(aggregator, tx, delta);
649 emit_reasoning_delta(aggregator, tx, delta, reasoning_field);
650 }
651 }
652
653 if let Some(tool_calls) = delta.get("tool_calls").and_then(Value::as_array) {
654 aggregator.handle_tool_calls(tool_calls);
655 }
656 }
657
658 if let Some(reason) = choice.get("finish_reason").and_then(Value::as_str) {
659 aggregator.set_finish_reason(map_finish_reason_common(reason));
660 }
661 }
662
663 if let Some(_usage_value) = value.get("usage")
664 && let Some(usage) = crate::llm::providers::common::parse_usage_openai_format(value, true)
665 {
666 aggregator.set_usage(usage);
667 }
668}
669
670impl StreamAggregator {
671 pub fn new(model: String) -> Self {
672 Self {
673 model,
674 content: String::new(),
675 reasoning: String::new(),
676 reasoning_details: Vec::new(),
677 reasoning_buffer: ReasoningBuffer::default(),
678 tool_builders: Vec::new(),
679 usage: None,
680 finish_reason: crate::llm::provider::FinishReason::Stop,
681 sanitizer: TagStreamSanitizer::new(),
682 compaction: None,
683 }
684 }
685
686 pub fn handle_content(&mut self, delta: &str) -> Vec<LLMStreamEvent> {
688 self.content.push_str(delta);
689 self.sanitizer.process_chunk(delta)
690 }
691
692 pub fn handle_reasoning(&mut self, delta: &str) -> Option<String> {
694 let result = self.reasoning_buffer.push(delta);
695 if let Some(ref d) = result {
696 self.reasoning.push_str(d);
697 }
698 result
699 }
700
701 pub fn set_reasoning_details(&mut self, details: &[Value]) {
703 if details.is_empty() {
704 return;
705 }
706
707 self.reasoning_details = details
708 .iter()
709 .map(|detail| {
710 detail
711 .as_str()
712 .map(ToOwned::to_owned)
713 .unwrap_or_else(|| detail.to_string())
714 })
715 .collect();
716 }
717
718 pub fn handle_tool_calls(&mut self, deltas: &[Value]) {
720 update_tool_calls(&mut self.tool_builders, deltas);
721 }
722
723 pub fn set_usage(&mut self, usage: crate::llm::provider::Usage) {
725 self.usage = Some(usage);
726 }
727
728 pub fn set_finish_reason(&mut self, reason: crate::llm::provider::FinishReason) {
730 self.finish_reason = reason;
731 }
732
733 pub fn finalize(mut self) -> LLMResponse {
735 for event in self.sanitizer.finalize() {
737 match event {
738 LLMStreamEvent::Token { delta } => {
739 self.content.push_str(&delta);
740 }
741 LLMStreamEvent::Reasoning { delta } => {
742 self.reasoning.push_str(&delta);
743 }
744 _ => {}
745 }
746 }
747
748 let reasoning_details = if self.reasoning_details.is_empty() {
749 None
750 } else {
751 Some(self.reasoning_details)
752 };
753 let mut reasoning = if self.reasoning.is_empty() {
754 self.reasoning_buffer.finalize()
755 } else {
756 Some(self.reasoning)
757 };
758 if reasoning.is_none() {
759 reasoning = reasoning_details
760 .as_ref()
761 .and_then(|details| extract_reasoning_text_from_serialized_details(details));
762 }
763
764 LLMResponse {
765 content: if self.content.is_empty() {
766 None
767 } else {
768 Some(self.content)
769 },
770 tool_calls: finalize_tool_calls(self.tool_builders),
771 model: self.model,
772 usage: self.usage,
773 finish_reason: self.finish_reason,
774 reasoning,
775 reasoning_details,
776 tool_references: Vec::new(),
777 request_id: None,
778 organization_id: None,
779 compaction: self.compaction,
780 }
781 }
782}
783
784pub async fn process_openai_stream<S, E, F>(
789 mut byte_stream: S,
790 provider_name: &'static str,
791 model: String,
792 mut on_chunk: F,
793) -> Result<LLMResponse, LLMError>
794where
795 S: futures::Stream<Item = Result<bytes::Bytes, E>> + Unpin,
796 E: std::fmt::Display,
797 F: FnMut(Value) -> Result<(), LLMError>,
798{
799 use crate::llm::providers::error_handling::format_network_error;
800 use futures::StreamExt;
801
802 let mut buffer = String::new();
803 let mut last_response_value = None;
804
805 while let Some(chunk_result) = byte_stream.next().await {
806 let chunk_bytes =
807 chunk_result.map_err(|e| format_network_error(provider_name, &e.to_string()))?;
808 let chunk_str = String::from_utf8_lossy(&chunk_bytes);
809 buffer.push_str(&chunk_str);
810
811 while let Some((boundary_idx, boundary_len)) = find_sse_boundary(&buffer) {
812 let event = buffer[..boundary_idx].to_string();
813 buffer.drain(..boundary_idx + boundary_len);
814
815 if let Some(data) = extract_data_payload(&event) {
816 if data == "[DONE]" {
817 break;
818 }
819
820 for line in data.lines() {
821 let trimmed = line.trim();
822 if trimmed.is_empty() {
823 continue;
824 }
825
826 if let Ok(value) = serde_json::from_str::<Value>(trimmed) {
827 on_chunk(value.clone())?;
828 last_response_value = Some(value);
829 }
830 }
831 }
832 }
833 }
834
835 let mut final_response = LLMResponse {
837 content: None,
838 tool_calls: None,
839 model,
840 usage: None,
841 finish_reason: crate::llm::provider::FinishReason::Stop,
842 reasoning: None,
843 reasoning_details: None,
844 tool_references: Vec::new(),
845 request_id: None,
846 organization_id: None,
847 compaction: None,
848 };
849
850 if let Some(value) = last_response_value
851 && value.get("usage").is_some()
852 {
853 final_response.usage =
854 crate::llm::providers::common::parse_usage_openai_format(&value, true);
855 }
856
857 Ok(final_response)
858}
859
860pub fn parse_openai_tool_calls(calls: &[Value]) -> Vec<ToolCall> {
861 calls
862 .iter()
863 .filter_map(|call| {
864 let id = call.get("id").and_then(|v| v.as_str())?;
865 let function = call.get("function")?;
866 let namespace = call
867 .get("namespace")
868 .and_then(|v| v.as_str())
869 .or_else(|| function.get("namespace").and_then(|v| v.as_str()))
870 .map(ToOwned::to_owned);
871 let name = function.get("name").and_then(|v| v.as_str())?;
872 let arguments = function.get("arguments");
873 let serialized = arguments.map_or_else(
874 || "{}".to_string(),
875 |value| {
876 if value.is_string() {
877 value.as_str().unwrap_or("").to_string()
878 } else {
879 value.to_string()
880 }
881 },
882 );
883 Some(ToolCall::function_with_namespace(
884 id.to_string(),
885 namespace,
886 name.to_string(),
887 serialized,
888 ))
889 })
890 .collect()
891}
892
893fn push_unique_tool_reference(tool_references: &mut Vec<String>, tool_name: &str) {
894 if !tool_references.iter().any(|existing| existing == tool_name) {
895 tool_references.push(tool_name.to_string());
896 }
897}
898
899pub(crate) fn collect_tool_references_from_tool_search_output(
900 value: &Value,
901 tool_references: &mut Vec<String>,
902) {
903 match value {
904 Value::Array(items) => {
905 for item in items {
906 collect_tool_references_from_tool_search_output(item, tool_references);
907 }
908 }
909 Value::Object(object) => {
910 if let Some(tools) = object.get("tools").and_then(Value::as_array) {
911 for tool in tools {
912 collect_tool_references_from_tool_search_output(tool, tool_references);
913 }
914 } else if let Some(tool_name) = object.get("tool_name").and_then(Value::as_str) {
915 push_unique_tool_reference(tool_references, tool_name);
916 } else if let Some(function) = object.get("function").and_then(Value::as_object)
917 && let Some(tool_name) = function.get("name").and_then(Value::as_str)
918 {
919 push_unique_tool_reference(tool_references, tool_name);
920 } else if let Some(tool_name) = object.get("name").and_then(Value::as_str) {
921 push_unique_tool_reference(tool_references, tool_name);
922 }
923
924 if let Some(tool_refs) = object.get("tool_references").and_then(Value::as_array) {
925 for tool_ref in tool_refs {
926 collect_tool_references_from_tool_search_output(tool_ref, tool_references);
927 }
928 }
929 }
930 _ => {}
931 }
932}
933
934pub fn append_text_with_reasoning(
935 text: &str,
936 aggregated_content: &mut String,
937 reasoning: &mut ReasoningBuffer,
938 deltas: &mut StreamDelta,
939 telemetry: &impl StreamTelemetry,
940) {
941 let (segments, cleaned) = split_reasoning_from_text(text);
942
943 if segments.is_empty() && cleaned.is_none() {
944 if !text.is_empty() {
945 aggregated_content.push_str(text);
946 deltas.push_content(text);
947 telemetry.on_content_delta(text);
948 }
949 return;
950 }
951
952 for segment in segments {
953 if let Some(stage) = &segment.stage {
954 telemetry.on_reasoning_stage(stage);
955 }
956 if let Some(delta) = reasoning.push(&segment.text) {
957 telemetry.on_reasoning_delta(&delta);
958 deltas.push_reasoning(&delta);
959 }
960 }
961
962 if let Some(cleaned_text) = cleaned
963 && !cleaned_text.is_empty()
964 {
965 aggregated_content.push_str(&cleaned_text);
966 telemetry.on_content_delta(&cleaned_text);
967 deltas.push_content(&cleaned_text);
968 }
969}
970
971#[inline]
972pub fn extract_data_payload(event: &str) -> Option<String> {
973 let mut out = String::new();
974
975 for raw_line in event.lines() {
976 let line = raw_line.trim_end_matches('\r');
977 if line.is_empty() || line.starts_with(':') {
978 continue;
979 }
980
981 if let Some(value) = line.strip_prefix("data:") {
982 if !out.is_empty() {
983 out.push('\n');
984 }
985 out.push_str(value.trim_start());
986 }
987 }
988
989 (!out.is_empty()).then_some(out)
990}
991
992#[inline]
993pub fn find_sse_boundary(buffer: &str) -> Option<(usize, usize)> {
994 let newline_boundary = buffer.find("\n\n").map(|idx| (idx, 2));
995 let carriage_boundary = buffer.find("\r\n\r\n").map(|idx| (idx, 4));
996
997 match (newline_boundary, carriage_boundary) {
998 (Some((n_idx, n_len)), Some((c_idx, c_len))) => {
999 if n_idx <= c_idx {
1000 Some((n_idx, n_len))
1001 } else {
1002 Some((c_idx, c_len))
1003 }
1004 }
1005 (Some(boundary), None) => Some(boundary),
1006 (None, Some(boundary)) => Some(boundary),
1007 (None, None) => None,
1008 }
1009}
1010
1011pub fn apply_tool_call_delta_from_content(
1012 builders: &mut Vec<ToolCallBuilder>,
1013 container: &Map<String, Value>,
1014 telemetry: &impl StreamTelemetry,
1015) {
1016 apply_tool_call_delta_with_index(builders, container, telemetry, None, None);
1017}
1018
1019fn apply_tool_call_delta_with_index(
1020 builders: &mut Vec<ToolCallBuilder>,
1021 container: &Map<String, Value>,
1022 telemetry: &impl StreamTelemetry,
1023 fallback_index: Option<usize>,
1024 fallback_id: Option<Value>,
1025) {
1026 fn extract_tool_call_id(container: &Map<String, Value>) -> Option<Value> {
1027 container.get("id").cloned().or_else(|| {
1028 container
1029 .get("tool_call")
1030 .and_then(|value| value.as_object())
1031 .and_then(|inner| inner.get("id"))
1032 .cloned()
1033 })
1034 }
1035
1036 let explicit_index = container
1037 .get("tool_call")
1038 .and_then(|value| value.as_object())
1039 .and_then(|tool_call| tool_call.get("index"))
1040 .and_then(|value| value.as_u64())
1041 .or_else(|| container.get("index").and_then(|value| value.as_u64()));
1042
1043 let index = explicit_index
1044 .map(|value| value as usize)
1045 .or(fallback_index)
1046 .unwrap_or(0);
1047
1048 let current_id = extract_tool_call_id(container).or_else(|| fallback_id.clone());
1049
1050 if let Some(nested) = container.get("delta").and_then(|value| value.as_object()) {
1051 apply_tool_call_delta_with_index(
1052 builders,
1053 nested,
1054 telemetry,
1055 Some(index),
1056 current_id.clone(),
1057 );
1058 }
1059
1060 let delta_source = container
1061 .get("tool_call")
1062 .and_then(|value| value.as_object())
1063 .unwrap_or(container);
1064
1065 let mut delta_map = Map::new();
1066
1067 if let Some(id_value) = extract_tool_call_id(delta_source).or_else(|| current_id.clone()) {
1068 delta_map.insert("id".to_string(), id_value);
1069 }
1070
1071 if let Some(function_value) = delta_source
1072 .get("function")
1073 .or_else(|| container.get("function"))
1074 {
1075 delta_map.insert("function".to_string(), function_value.clone());
1076 }
1077
1078 if delta_map.is_empty() {
1079 return;
1080 }
1081
1082 if builders.len() <= index {
1083 builders.resize_with(index + 1, ToolCallBuilder::default);
1084 }
1085
1086 let mut deltas = vec![Value::Null; index + 1];
1087 deltas[index] = Value::Object(delta_map);
1088 update_tool_calls(builders, &deltas);
1089 telemetry.on_tool_call_delta();
1090}
1091
1092#[cfg(test)]
1093mod tests {
1094 use super::*;
1095 use serde_json::json;
1096
1097 #[test]
1098 fn finalize_tool_calls_drops_empty_builders() {
1099 let builders = vec![ToolCallBuilder::default()];
1100 assert!(finalize_tool_calls(builders).is_none());
1101 }
1102
1103 #[test]
1104 fn append_text_with_reasoning_tracks_segments() {
1105 let telemetry = NoopStreamTelemetry;
1106 let mut aggregated = String::new();
1107 let mut reasoning = ReasoningBuffer::default();
1108 let mut delta = StreamDelta::default();
1109 append_text_with_reasoning(
1110 "Hello",
1111 &mut aggregated,
1112 &mut reasoning,
1113 &mut delta,
1114 &telemetry,
1115 );
1116 assert_eq!(aggregated, "Hello");
1117 assert_eq!(
1118 delta.into_fragments(),
1119 vec![StreamFragment::Content("Hello".into())]
1120 );
1121 }
1122
1123 #[test]
1124 fn apply_tool_call_delta_updates_builder() {
1125 let telemetry = NoopStreamTelemetry;
1126 let mut builders = Vec::new();
1127 let container = json!({
1128 "index": 0,
1129 "function": {"name": "foo", "arguments": "{}"}
1130 })
1131 .as_object()
1132 .cloned()
1133 .unwrap();
1134 apply_tool_call_delta_from_content(&mut builders, &container, &telemetry);
1135 let calls = finalize_tool_calls(builders).expect("call expected");
1136 let func = calls[0]
1137 .function
1138 .as_ref()
1139 .expect("function call should be present");
1140 assert_eq!(func.name, "foo");
1141 }
1142
1143 #[test]
1144 fn apply_tool_call_delta_uses_outer_index_for_nested_delta() {
1145 let telemetry = NoopStreamTelemetry;
1146 let mut builders = Vec::new();
1147 let container = json!({
1148 "delta": {
1149 "tool_call": {
1150 "function": {
1151 "name": "foo",
1152 "arguments": "{\"value\":1}"
1153 }
1154 }
1155 },
1156 "index": 1,
1157 "id": "call-1"
1158 })
1159 .as_object()
1160 .cloned()
1161 .unwrap();
1162
1163 apply_tool_call_delta_from_content(&mut builders, &container, &telemetry);
1164
1165 let calls = finalize_tool_calls(builders).expect("call expected");
1166 assert_eq!(calls.len(), 1);
1167 assert_eq!(calls[0].id, "call-1");
1168 let func = calls[0]
1169 .function
1170 .as_ref()
1171 .expect("function call should be present");
1172 assert_eq!(func.arguments, "{\"value\":1}");
1173 }
1174
1175 #[test]
1176 fn update_tool_calls_respects_explicit_index() {
1177 let mut builders = Vec::new();
1178 let deltas = vec![json!({
1179 "index": 2,
1180 "id": "call_3",
1181 "function": {
1182 "name": "get_weather",
1183 "arguments": "{\"city\":\"Beijing\"}"
1184 }
1185 })];
1186
1187 update_tool_calls(&mut builders, &deltas);
1188
1189 let calls = finalize_tool_calls(builders).expect("call expected");
1190 assert_eq!(calls.len(), 1);
1191 assert_eq!(calls[0].id, "call_3");
1192 let function = calls[0].function.as_ref().expect("function expected");
1193 assert_eq!(function.name, "get_weather");
1194 assert_eq!(function.arguments, "{\"city\":\"Beijing\"}");
1195 }
1196
1197 #[test]
1198 fn extract_data_payload_merges_lines() {
1199 let event = ": keep-alive\n".to_string() + "data: {\"a\":1}\n" + "data: {\"b\":2}\n";
1200 let payload = extract_data_payload(&event);
1201 assert_eq!(payload.as_deref(), Some("{\"a\":1}\n{\"b\":2}"));
1202 }
1203
1204 #[test]
1205 fn find_sse_boundary_prefers_newline() {
1206 let buffer = "data: foo\n\nrest";
1207 assert_eq!(find_sse_boundary(buffer), Some((9, 2)));
1208 }
1209
1210 #[test]
1211 fn parse_compacted_output_messages_keeps_messages() {
1212 let output = vec![json!({
1213 "type": "message",
1214 "role": "assistant",
1215 "phase": "final_answer",
1216 "content": [
1217 { "type": "output_text", "text": "Compacted response" }
1218 ]
1219 })];
1220
1221 let parsed = parse_compacted_output_messages(&output);
1222 assert_eq!(parsed.len(), 1);
1223 assert_eq!(parsed[0].role, MessageRole::Assistant);
1224 assert_eq!(parsed[0].phase, Some(AssistantPhase::FinalAnswer));
1225 assert_eq!(parsed[0].content.as_text(), "Compacted response");
1226 }
1227
1228 #[test]
1229 fn parse_compacted_output_messages_keeps_tool_pairs() {
1230 let output = vec![
1231 json!({
1232 "type": "function_call",
1233 "id": "call_1",
1234 "name": "shell",
1235 "arguments": "{\"command\":\"pwd\"}"
1236 }),
1237 json!({
1238 "type": "function_call_output",
1239 "call_id": "call_1",
1240 "output": "/tmp/work"
1241 }),
1242 ];
1243
1244 let parsed = parse_compacted_output_messages(&output);
1245 assert_eq!(parsed.len(), 2);
1246 assert_eq!(parsed[0].role, MessageRole::Assistant);
1247 assert!(parsed[0].tool_calls.is_some());
1248 assert_eq!(parsed[1].role, MessageRole::Tool);
1249 assert_eq!(parsed[1].tool_call_id.as_deref(), Some("call_1"));
1250 }
1251
1252 #[test]
1253 fn parse_compacted_output_messages_serializes_multimodal_function_output() {
1254 let output = vec![json!({
1255 "type": "function_call_output",
1256 "call_id": "call_1",
1257 "output": [
1258 { "type": "input_text", "text": "inline image note" },
1259 { "type": "input_image", "image_url": "data:image/png;base64,abc" }
1260 ]
1261 })];
1262
1263 let parsed = parse_compacted_output_messages(&output);
1264 assert_eq!(parsed.len(), 1);
1265 assert_eq!(parsed[0].role, MessageRole::Tool);
1266 assert_eq!(parsed[0].tool_call_id.as_deref(), Some("call_1"));
1267 let text = parsed[0].content.as_text();
1268 assert!(text.contains("\"input_image\""));
1269 assert!(text.contains("inline image note"));
1270 }
1271
1272 #[test]
1273 fn tool_result_content_parses_multimodal_tool_output_text() {
1274 let content = MessageContent::Text(
1275 r#"[{"type":"input_text","text":"note"},{"type":"input_image","image_url":"data:image/png;base64,abc"}]"#
1276 .to_string(),
1277 );
1278 let parts = tool_result_content_from_message_content(&content);
1279 assert_eq!(parts.len(), 2);
1280 assert_eq!(parts[0]["type"], "output_text");
1281 assert_eq!(parts[0]["text"], "note");
1282 assert_eq!(parts[1]["type"], "input_image");
1283 assert_eq!(parts[1]["image_url"], "data:image/png;base64,abc");
1284 }
1285
1286 #[test]
1287 fn function_output_value_parses_multimodal_tool_output_text() {
1288 let content = MessageContent::Text(
1289 r#"[{"type":"input_text","text":"note"},{"type":"input_image","image_url":"data:image/png;base64,abc"}]"#
1290 .to_string(),
1291 );
1292 let output = function_output_value_from_message_content(&content);
1293 let items = output.as_array().expect("expected array output");
1294 assert_eq!(items.len(), 2);
1295 assert_eq!(items[0]["type"], "input_text");
1296 assert_eq!(items[0]["text"], "note");
1297 assert_eq!(items[1]["type"], "input_image");
1298 assert_eq!(items[1]["image_url"], "data:image/png;base64,abc");
1299 }
1300
1301 #[test]
1302 fn parse_compacted_output_messages_preserves_compaction_items() {
1303 let output = vec![json!({
1304 "type": "compaction",
1305 "encrypted_content": "opaque_state"
1306 })];
1307
1308 let parsed = parse_compacted_output_messages(&output);
1309 assert_eq!(parsed.len(), 1);
1310 assert_eq!(parsed[0].role, MessageRole::Assistant);
1311 let preserved = parsed[0]
1312 .reasoning_details
1313 .as_ref()
1314 .and_then(|items| items.first())
1315 .and_then(|item| item.get("type"))
1316 .and_then(Value::as_str);
1317 assert_eq!(preserved, Some("compaction"));
1318 }
1319
1320 #[test]
1321 fn parse_compacted_output_messages_parses_tool_result_messages() {
1322 let output = vec![json!({
1323 "type": "message",
1324 "role": "tool",
1325 "content": [
1326 {
1327 "type": "tool_result",
1328 "tool_call_id": "call_42",
1329 "content": [
1330 { "type": "output_text", "text": "done" }
1331 ]
1332 }
1333 ]
1334 })];
1335
1336 let parsed = parse_compacted_output_messages(&output);
1337 assert_eq!(parsed.len(), 1);
1338 assert_eq!(parsed[0].role, MessageRole::Tool);
1339 assert_eq!(parsed[0].tool_call_id.as_deref(), Some("call_42"));
1340 assert_eq!(parsed[0].content.as_text(), "done");
1341 }
1342
1343 #[test]
1344 fn stream_aggregator_derives_reasoning_from_details_when_missing() {
1345 let mut aggregator = StreamAggregator::new("test-model".to_string());
1346 aggregator.set_reasoning_details(&[json!({
1347 "type": "reasoning.text",
1348 "text": "step one"
1349 })]);
1350
1351 let response = aggregator.finalize();
1352 assert_eq!(response.reasoning.as_deref(), Some("step one"));
1353 assert!(response.reasoning_details.is_some());
1354 }
1355}