1use std::collections::BTreeMap;
2
3use crate::gemini::count_tokens::types::{GeminiContentRole, GeminiFunctionCall, GeminiPart};
4use crate::gemini::generate_content::response::ResponseBody as GeminiGenerateContentResponseBody;
5use crate::gemini::generate_content::types::{
6 GeminiBlockReason, GeminiCandidate, GeminiContent, GeminiFinishReason, GeminiPromptFeedback,
7 GeminiUsageMetadata,
8};
9use crate::openai::count_tokens::types::{
10 ResponseCustomToolCallOutputContent, ResponseFunctionCallOutputContent, ResponseInputContent,
11};
12use crate::openai::create_response::response::ResponseBody as OpenAiCreateResponseBody;
13use crate::openai::create_response::stream::{ResponseStreamContentPart, ResponseStreamEvent};
14use crate::openai::create_response::types::{ResponseIncompleteReason, ResponseOutputItem};
15use crate::transform::gemini::stream_generate_content::utils::parse_json_object_or_empty;
16
17#[derive(Debug, Clone, Default)]
18struct FunctionCallState {
19 name: String,
20 arguments: String,
21}
22
23#[derive(Debug, Clone, Default)]
24pub struct OpenAiResponseToGeminiStream {
25 response_id: Option<String>,
26 model_version: Option<String>,
27 usage_metadata: Option<GeminiUsageMetadata>,
28 function_calls: BTreeMap<String, FunctionCallState>,
29 finished: bool,
30}
31
32impl OpenAiResponseToGeminiStream {
33 pub fn is_finished(&self) -> bool {
34 self.finished
35 }
36
37 fn apply_response_state(&mut self, response: &OpenAiCreateResponseBody) {
38 self.response_id = Some(response.id.clone());
39 self.model_version = Some(response.model.clone());
40 self.usage_metadata = response.usage.as_ref().map(|usage| GeminiUsageMetadata {
41 prompt_token_count: Some(usage.input_tokens),
42 cached_content_token_count: Some(usage.input_tokens_details.cached_tokens),
43 candidates_token_count: Some(usage.output_tokens),
44 thoughts_token_count: Some(usage.output_tokens_details.reasoning_tokens),
45 total_token_count: Some(usage.total_tokens),
46 ..GeminiUsageMetadata::default()
47 });
48 }
49
50 fn finish_reason_from_incomplete_reason(
51 reason: Option<&ResponseIncompleteReason>,
52 ) -> GeminiFinishReason {
53 match reason {
54 Some(ResponseIncompleteReason::MaxOutputTokens) => GeminiFinishReason::MaxTokens,
55 Some(ResponseIncompleteReason::ContentFilter) => GeminiFinishReason::Safety,
56 None => GeminiFinishReason::Stop,
57 }
58 }
59
60 fn chunk_from_parts(
61 &self,
62 parts: Vec<GeminiPart>,
63 finish_reason: Option<GeminiFinishReason>,
64 prompt_feedback: Option<GeminiPromptFeedback>,
65 ) -> GeminiGenerateContentResponseBody {
66 GeminiGenerateContentResponseBody {
67 candidates: Some(vec![GeminiCandidate {
68 content: Some(GeminiContent {
69 parts,
70 role: Some(GeminiContentRole::Model),
71 }),
72 finish_reason,
73 index: Some(0),
74 ..GeminiCandidate::default()
75 }]),
76 prompt_feedback,
77 usage_metadata: self.usage_metadata.clone(),
78 model_version: self.model_version.clone(),
79 response_id: self.response_id.clone(),
80 model_status: None,
81 }
82 }
83
84 fn text_chunk(&self, text: String) -> Option<GeminiGenerateContentResponseBody> {
85 if text.is_empty() {
86 None
87 } else {
88 Some(self.chunk_from_parts(
89 vec![GeminiPart {
90 text: Some(text),
91 ..GeminiPart::default()
92 }],
93 None,
94 None,
95 ))
96 }
97 }
98
99 fn thinking_chunk(
100 &self,
101 signature: String,
102 thinking: String,
103 ) -> Option<GeminiGenerateContentResponseBody> {
104 if thinking.is_empty() {
105 None
106 } else {
107 Some(self.chunk_from_parts(
108 vec![GeminiPart {
109 thought: Some(true),
110 thought_signature: Some(signature),
111 text: Some(thinking),
112 ..GeminiPart::default()
113 }],
114 None,
115 None,
116 ))
117 }
118 }
119
120 fn function_call_chunk(
121 &self,
122 id: String,
123 name: String,
124 arguments: String,
125 ) -> GeminiGenerateContentResponseBody {
126 self.chunk_from_parts(
127 vec![GeminiPart {
128 function_call: Some(GeminiFunctionCall {
129 id: Some(id),
130 name,
131 args: Some(parse_json_object_or_empty(&arguments)),
132 }),
133 ..GeminiPart::default()
134 }],
135 None,
136 None,
137 )
138 }
139
140 fn input_content_to_text(items: Vec<ResponseInputContent>) -> String {
141 items
142 .into_iter()
143 .filter_map(|item| match item {
144 ResponseInputContent::Text(text) => Some(text.text),
145 ResponseInputContent::Image(image) => {
146 if let Some(url) = image.image_url {
147 Some(url)
148 } else {
149 image.file_id.map(|file_id| format!("file:{file_id}"))
150 }
151 }
152 ResponseInputContent::File(file) => {
153 if let Some(data) = file.file_data {
154 Some(data)
155 } else if let Some(url) = file.file_url {
156 Some(url)
157 } else if let Some(file_id) = file.file_id {
158 Some(format!("file:{file_id}"))
159 } else {
160 file.filename
161 }
162 }
163 })
164 .collect::<Vec<_>>()
165 .join("\n")
166 }
167
168 fn map_output_item(
169 &self,
170 item: ResponseOutputItem,
171 out: &mut Vec<GeminiGenerateContentResponseBody>,
172 ) {
173 match item {
174 ResponseOutputItem::Message(message) => {
175 for content in message.content {
176 match content {
177 crate::openai::count_tokens::types::ResponseOutputContent::Text(text) => {
178 if let Some(chunk) = self.text_chunk(text.text) {
179 out.push(chunk);
180 }
181 }
182 crate::openai::count_tokens::types::ResponseOutputContent::Refusal(
183 refusal,
184 ) => {
185 if let Some(chunk) = self.text_chunk(refusal.refusal) {
186 out.push(chunk);
187 }
188 }
189 }
190 }
191 }
192 ResponseOutputItem::FunctionToolCall(call) => {
193 out.push(self.function_call_chunk(
194 call.id.unwrap_or(call.call_id),
195 call.name,
196 call.arguments,
197 ));
198 }
199 ResponseOutputItem::CustomToolCall(call) => {
200 out.push(self.function_call_chunk(
201 call.id.unwrap_or(call.call_id),
202 call.name,
203 call.input,
204 ));
205 }
206 ResponseOutputItem::ReasoningItem(item) => {
207 if let Some(signature) = item.id.filter(|id| !id.is_empty()) {
208 for summary in item.summary {
209 if let Some(chunk) = self.thinking_chunk(signature.clone(), summary.text) {
210 out.push(chunk);
211 }
212 }
213 if let Some(content) = item.content {
214 for reasoning in content {
215 if let Some(chunk) =
216 self.thinking_chunk(signature.clone(), reasoning.text)
217 {
218 out.push(chunk);
219 }
220 }
221 }
222 if let Some(encrypted_content) = item.encrypted_content
223 && let Some(chunk) = self.thinking_chunk(signature, encrypted_content)
224 {
225 out.push(chunk);
226 }
227 }
228 }
229 ResponseOutputItem::FunctionCallOutput(call) => {
230 let text = match call.output {
231 ResponseFunctionCallOutputContent::Text(text) => text,
232 ResponseFunctionCallOutputContent::Content(items) => {
233 Self::input_content_to_text(items)
234 }
235 };
236 if let Some(chunk) = self.text_chunk(text) {
237 out.push(chunk);
238 }
239 }
240 ResponseOutputItem::CustomToolCallOutput(call) => {
241 let text = match call.output {
242 ResponseCustomToolCallOutputContent::Text(text) => text,
243 ResponseCustomToolCallOutputContent::Content(items) => {
244 Self::input_content_to_text(items)
245 }
246 };
247 if let Some(chunk) = self.text_chunk(text) {
248 out.push(chunk);
249 }
250 }
251 ResponseOutputItem::ShellCallOutput(call) => {
252 let text = call
253 .output
254 .into_iter()
255 .map(|entry| format!("stdout: {}\nstderr: {}", entry.stdout, entry.stderr))
256 .collect::<Vec<_>>()
257 .join("\n");
258 if let Some(chunk) = self.text_chunk(text) {
259 out.push(chunk);
260 }
261 }
262 ResponseOutputItem::LocalShellCallOutput(call) => {
263 if let Some(chunk) = self.text_chunk(call.output) {
264 out.push(chunk);
265 }
266 }
267 ResponseOutputItem::McpCall(call) => {
268 if let Some(output) = call.output
269 && let Some(chunk) = self.text_chunk(output)
270 {
271 out.push(chunk);
272 }
273 if let Some(error) = call.error
274 && let Some(chunk) = self.text_chunk(error)
275 {
276 out.push(chunk);
277 }
278 }
279 ResponseOutputItem::ImageGenerationCall(call) => {
280 if let Some(chunk) = call.result.and_then(|r| self.text_chunk(r)) {
281 out.push(chunk);
282 }
283 }
284 _ => {}
285 }
286 }
287
288 pub fn on_stream_event(
289 &mut self,
290 event: ResponseStreamEvent,
291 out: &mut Vec<GeminiGenerateContentResponseBody>,
292 ) {
293 if self.finished {
294 return;
295 }
296
297 match event {
298 ResponseStreamEvent::Created { response, .. }
299 | ResponseStreamEvent::Queued { response, .. }
300 | ResponseStreamEvent::InProgress { response, .. } => {
301 self.apply_response_state(&response);
302 }
303 ResponseStreamEvent::Completed { response, .. }
304 | ResponseStreamEvent::Incomplete { response, .. } => {
305 self.apply_response_state(&response);
306 let reason = Self::finish_reason_from_incomplete_reason(
307 response
308 .incomplete_details
309 .as_ref()
310 .and_then(|details| details.reason.as_ref()),
311 );
312 let prompt_feedback = if matches!(reason, GeminiFinishReason::Safety) {
313 Some(GeminiPromptFeedback {
314 block_reason: Some(GeminiBlockReason::Safety),
315 safety_ratings: None,
316 })
317 } else {
318 None
319 };
320 out.push(self.chunk_from_parts(Vec::new(), Some(reason), prompt_feedback));
321 }
322 ResponseStreamEvent::Failed { response, .. } => {
323 self.apply_response_state(&response);
324 if let Some(error) = response.error
325 && let Some(chunk) = self.text_chunk(error.message)
326 {
327 out.push(chunk);
328 }
329 out.push(self.chunk_from_parts(
330 Vec::new(),
331 Some(GeminiFinishReason::Safety),
332 Some(GeminiPromptFeedback {
333 block_reason: Some(GeminiBlockReason::Safety),
334 safety_ratings: None,
335 }),
336 ));
337 }
338 ResponseStreamEvent::OutputTextDelta { delta, .. }
339 | ResponseStreamEvent::OutputTextDone { text: delta, .. } => {
340 if let Some(chunk) = self.text_chunk(delta) {
341 out.push(chunk);
342 }
343 }
344 ResponseStreamEvent::RefusalDelta { delta, .. }
345 | ResponseStreamEvent::RefusalDone { refusal: delta, .. } => {
346 if let Some(chunk) = self.text_chunk(delta) {
347 out.push(chunk);
348 }
349 }
350 ResponseStreamEvent::ReasoningTextDelta { item_id, delta, .. }
351 | ResponseStreamEvent::ReasoningTextDone {
352 item_id,
353 text: delta,
354 ..
355 }
356 | ResponseStreamEvent::ReasoningSummaryTextDelta { item_id, delta, .. }
357 | ResponseStreamEvent::ReasoningSummaryTextDone {
358 item_id,
359 text: delta,
360 ..
361 } => {
362 if let Some(chunk) = self.thinking_chunk(item_id, delta) {
363 out.push(chunk);
364 }
365 }
366 ResponseStreamEvent::FunctionCallArgumentsDelta { item_id, delta, .. } => {
367 let snapshot = {
368 let entry = self
369 .function_calls
370 .entry(item_id.clone())
371 .or_insert_with(|| FunctionCallState {
372 name: "function".to_string(),
373 arguments: String::new(),
374 });
375 entry.arguments.push_str(&delta);
376 (entry.name.clone(), entry.arguments.clone())
377 };
378 out.push(self.function_call_chunk(item_id, snapshot.0, snapshot.1));
379 }
380 ResponseStreamEvent::FunctionCallArgumentsDone {
381 item_id,
382 name,
383 arguments,
384 ..
385 } => {
386 let snapshot = {
387 let entry = self.function_calls.entry(item_id.clone()).or_default();
388 if let Some(name) = name
389 && !name.is_empty()
390 {
391 entry.name = name;
392 }
393 entry.arguments = arguments;
394 (entry.name.clone(), entry.arguments.clone())
395 };
396 out.push(self.function_call_chunk(item_id, snapshot.0, snapshot.1));
397 }
398 ResponseStreamEvent::McpCallArgumentsDelta { item_id, delta, .. } => {
399 let snapshot = {
400 let entry = self
401 .function_calls
402 .entry(item_id.clone())
403 .or_insert_with(|| FunctionCallState {
404 name: "mcp_call".to_string(),
405 arguments: String::new(),
406 });
407 entry.arguments.push_str(&delta);
408 (entry.name.clone(), entry.arguments.clone())
409 };
410 out.push(self.function_call_chunk(item_id, snapshot.0, snapshot.1));
411 }
412 ResponseStreamEvent::McpCallArgumentsDone {
413 item_id, arguments, ..
414 } => {
415 let snapshot = {
416 let entry = self
417 .function_calls
418 .entry(item_id.clone())
419 .or_insert_with(|| FunctionCallState {
420 name: "mcp_call".to_string(),
421 arguments: String::new(),
422 });
423 entry.arguments = arguments;
424 (entry.name.clone(), entry.arguments.clone())
425 };
426 out.push(self.function_call_chunk(item_id, snapshot.0, snapshot.1));
427 }
428 ResponseStreamEvent::CustomToolCallInputDelta { item_id, delta, .. } => {
429 let snapshot = {
430 let entry = self
431 .function_calls
432 .entry(item_id.clone())
433 .or_insert_with(|| FunctionCallState {
434 name: "custom_tool".to_string(),
435 arguments: String::new(),
436 });
437 entry.arguments.push_str(&delta);
438 (entry.name.clone(), entry.arguments.clone())
439 };
440 out.push(self.function_call_chunk(item_id, snapshot.0, snapshot.1));
441 }
442 ResponseStreamEvent::CustomToolCallInputDone { item_id, input, .. } => {
443 let snapshot = {
444 let entry = self
445 .function_calls
446 .entry(item_id.clone())
447 .or_insert_with(|| FunctionCallState {
448 name: "custom_tool".to_string(),
449 arguments: String::new(),
450 });
451 entry.arguments = input;
452 (entry.name.clone(), entry.arguments.clone())
453 };
454 out.push(self.function_call_chunk(item_id, snapshot.0, snapshot.1));
455 }
456 ResponseStreamEvent::ContentPartAdded { item_id, part, .. }
457 | ResponseStreamEvent::ContentPartDone { item_id, part, .. } => match part {
458 ResponseStreamContentPart::OutputText(text) => {
459 if let Some(chunk) = self.text_chunk(text.text) {
460 out.push(chunk);
461 }
462 }
463 ResponseStreamContentPart::Refusal(refusal) => {
464 if let Some(chunk) = self.text_chunk(refusal.refusal) {
465 out.push(chunk);
466 }
467 }
468 ResponseStreamContentPart::ReasoningText(reasoning) => {
469 if let Some(chunk) = self.thinking_chunk(item_id, reasoning.text) {
470 out.push(chunk);
471 }
472 }
473 },
474 ResponseStreamEvent::OutputItemAdded { item, .. }
475 | ResponseStreamEvent::OutputItemDone { item, .. } => {
476 self.map_output_item(item, out);
477 }
478 ResponseStreamEvent::Error { error, .. } => {
479 if let Some(chunk) = self.text_chunk(error.message) {
480 out.push(chunk);
481 }
482 }
483 _ => {}
484 }
485 }
486
487 pub fn finish(&mut self, out: &mut Vec<GeminiGenerateContentResponseBody>) {
488 if !self.finished {
489 self.finished = true;
490 let _ = out;
491 }
492 }
493}