Skip to main content

vtcode_core/llm/providers/gemini/
llm_provider.rs

1use super::helpers::InteractionStreamState;
2use super::*;
3use crate::llm::providers::shared::{StreamAssemblyError, extract_data_payload, find_sse_boundary};
4
5#[async_trait]
6impl LLMProvider for GeminiProvider {
7    fn name(&self) -> &str {
8        "gemini"
9    }
10
11    fn supports_streaming(&self) -> bool {
12        true
13    }
14
15    fn supports_reasoning(&self, model: &str) -> bool {
16        // Codex-inspired robustness: Setting model_supports_reasoning to false
17        // does NOT disable it for known reasoning models.
18        models::google::REASONING_MODELS.contains(&model)
19            || self
20                .model_behavior
21                .as_ref()
22                .and_then(|b| b.model_supports_reasoning)
23                .unwrap_or(false)
24    }
25
26    fn supports_reasoning_effort(&self, model: &str) -> bool {
27        // Same robustness logic for reasoning effort
28        models::google::REASONING_MODELS.contains(&model)
29            || self
30                .model_behavior
31                .as_ref()
32                .and_then(|b| b.model_supports_reasoning_effort)
33                .unwrap_or(false)
34    }
35
36    fn supports_context_caching(&self, model: &str) -> bool {
37        models::google::CACHING_MODELS.contains(&model)
38    }
39
40    fn effective_context_size(&self, model: &str) -> usize {
41        if model.contains("gemini-3.1") {
42            1_048_576
43        } else if model.contains("3") || model.contains("1.5-pro") {
44            2_097_152
45        } else {
46            1_048_576
47        }
48    }
49
50    async fn generate(&self, request: LLMRequest) -> Result<LLMResponse, LLMError> {
51        let model = request.model.clone();
52        if self.should_use_interactions(&request) {
53            let interaction_request = self.convert_to_interaction_request(&request)?;
54            let url = format!("{}/interactions", self.base_url);
55            let response = self
56                .http_client
57                .post(&url)
58                .header("x-goog-api-key", self.api_key.as_ref())
59                .json(&interaction_request)
60                .send()
61                .await
62                .map_err(|e| format_network_error("Gemini", &e))?;
63
64            if !response.status().is_success() {
65                let status = response.status();
66                let error_text = response.text().await.unwrap_or_default();
67                return Err(Self::handle_http_error(status, &error_text));
68            }
69
70            let interaction_response: Interaction = response
71                .json()
72                .await
73                .map_err(|e| format_parse_error("Gemini", &e))?;
74
75            return Self::convert_from_interaction_response(interaction_response, model);
76        }
77
78        let gemini_request = self.convert_to_gemini_request(&request)?;
79
80        let url = format!("{}/models/{}:generateContent", self.base_url, request.model);
81
82        let response = self
83            .http_client
84            .post(&url)
85            .header("x-goog-api-key", self.api_key.as_ref())
86            .json(&gemini_request)
87            .send()
88            .await
89            .map_err(|e| format_network_error("Gemini", &e))?;
90
91        if !response.status().is_success() {
92            let status = response.status();
93            let error_text = response.text().await.unwrap_or_default();
94            return Err(Self::handle_http_error(status, &error_text));
95        }
96
97        let gemini_response: GenerateContentResponse = response
98            .json()
99            .await
100            .map_err(|e| format_parse_error("Gemini", &e))?;
101
102        Self::convert_from_gemini_response(gemini_response, model)
103    }
104
105    async fn stream(&self, request: LLMRequest) -> Result<LLMStream, LLMError> {
106        if self.should_use_interactions(&request) {
107            let model = request.model.clone();
108            let interaction_request = self.convert_to_interaction_request(&request)?;
109            let url = format!("{}/interactions?alt=sse", self.base_url);
110            let response = self
111                .http_client
112                .post(&url)
113                .header("x-goog-api-key", self.api_key.as_ref())
114                .json(&interaction_request)
115                .send()
116                .await
117                .map_err(|e| format_network_error("Gemini", &e))?;
118
119            if !response.status().is_success() {
120                let status = response.status();
121                let error_text = response.text().await.unwrap_or_default();
122                return Err(Self::handle_http_error(status, &error_text));
123            }
124
125            let stream = {
126                try_stream! {
127                    let mut body_stream = response.bytes_stream();
128                    let mut buffer = String::new();
129                    let mut state = InteractionStreamState::default();
130
131                    while let Some(chunk_result) = body_stream.next().await {
132                        let chunk = chunk_result.map_err(|err| {
133                            let formatted_error = error_display::format_llm_error(
134                                "Gemini",
135                                &format!("Streaming error: {}", err),
136                            );
137                            LLMError::Network {
138                                message: formatted_error,
139                                metadata: None,
140                            }
141                        })?;
142
143                        buffer.push_str(&String::from_utf8_lossy(&chunk));
144
145                        while let Some((split_idx, delimiter_len)) = find_sse_boundary(&buffer) {
146                            let event = buffer[..split_idx].to_string();
147                            buffer.drain(..split_idx + delimiter_len);
148
149                            let Some(data_payload) = extract_data_payload(&event) else {
150                                continue;
151                            };
152
153                            let trimmed_payload = data_payload.trim();
154                            if trimmed_payload.is_empty() || trimmed_payload == "[DONE]" {
155                                continue;
156                            }
157
158                            let payload: Value = serde_json::from_str(trimmed_payload)
159                                .map_err(|err| {
160                                    StreamAssemblyError::InvalidPayload(err.to_string())
161                                        .into_llm_error("Gemini")
162                                })?;
163
164                            for stream_event in Self::apply_interaction_stream_payload(&mut state, &payload)? {
165                                yield stream_event;
166                            }
167                        }
168                    }
169
170                    if !state.completed {
171                        let formatted_error = error_display::format_llm_error(
172                            "Gemini",
173                            "Interactions stream ended without an interaction.complete event",
174                        );
175                        Err(LLMError::Provider {
176                            message: formatted_error,
177                            metadata: None,
178                        })?;
179                    }
180
181                    let response =
182                        Self::finalize_interaction_stream_state(state, model)?;
183                    yield LLMStreamEvent::Completed { response: Box::new(response) };
184                }
185            };
186            return Ok(Box::pin(stream));
187        }
188
189        let model = request.model.clone();
190        let gemini_request = self.convert_to_gemini_request(&request)?;
191
192        let url = format!(
193            "{}/models/{}:streamGenerateContent",
194            self.base_url, request.model
195        );
196
197        let response = self
198            .http_client
199            .post(&url)
200            .header("x-goog-api-key", self.api_key.as_ref())
201            .json(&gemini_request)
202            .send()
203            .await
204            .map_err(|e| format_network_error("Gemini", &e))?;
205
206        if !response.status().is_success() {
207            let status = response.status();
208            let error_text = response.text().await.unwrap_or_default();
209            return Err(Self::handle_http_error(status, &error_text));
210        }
211
212        let (event_tx, event_rx) = mpsc::unbounded_channel::<Result<LLMStreamEvent, LLMError>>();
213        let completion_sender = event_tx.clone();
214
215        let streaming_timeout = self.timeouts.streaming_ceiling_seconds;
216
217        let model_clone = model.clone();
218        tokio::spawn(async move {
219            let config = StreamingConfig::with_total_timeout(streaming_timeout);
220            let mut processor = StreamingProcessor::with_config(config);
221            let event_sender = completion_sender.clone();
222            let mut aggregator =
223                crate::llm::providers::shared::StreamAggregator::new(model_clone.clone());
224
225            #[expect(clippy::collapsible_if)]
226            let mut on_chunk = |chunk: &str| -> Result<(), StreamingError> {
227                if chunk.is_empty() {
228                    return Ok(());
229                }
230
231                if let Some(delta) = Self::apply_stream_delta(&mut aggregator.content, chunk) {
232                    if delta.is_empty() {
233                        return Ok(());
234                    }
235
236                    for event in aggregator.sanitizer.process_chunk(&delta) {
237                        event_sender.send(Ok(event)).map_err(|_| {
238                            StreamingError::StreamingError {
239                                message: "Streaming consumer dropped".to_string(),
240                                partial_content: Some(chunk.to_string()),
241                            }
242                        })?;
243                    }
244                }
245                Ok(())
246            };
247
248            let result = processor.process_stream(response, &mut on_chunk).await;
249            match result {
250                Ok(mut streaming_response) => {
251                    if streaming_response.candidates.is_empty()
252                        && !aggregator.content.trim().is_empty()
253                    {
254                        streaming_response.candidates.push(StreamingCandidate {
255                            content: Content {
256                                role: "model".to_string(),
257                                parts: vec![Part::Text {
258                                    text: aggregator.content.clone(),
259                                    thought_signature: None,
260                                }],
261                            },
262                            finish_reason: None,
263                            index: Some(0),
264                        });
265                    }
266
267                    match Self::convert_from_streaming_response(streaming_response, model_clone) {
268                        Ok(mut final_response) => {
269                            let aggregator_response = aggregator.finalize();
270                            if final_response.reasoning.is_none() {
271                                final_response.reasoning = aggregator_response.reasoning;
272                            }
273                            if final_response.content.is_none() {
274                                final_response.content = aggregator_response.content;
275                            }
276
277                            let _ = completion_sender.send(Ok(LLMStreamEvent::Completed {
278                                response: Box::new(final_response),
279                            }));
280                        }
281                        Err(err) => {
282                            let _ = completion_sender.send(Err(err));
283                        }
284                    }
285                }
286                Err(error) => {
287                    let mapped = Self::map_streaming_error(error);
288                    let _ = completion_sender.send(Err(mapped));
289                }
290            }
291        });
292
293        drop(event_tx);
294
295        let stream = {
296            let mut receiver = event_rx;
297            try_stream! {
298                while let Some(event) = receiver.recv().await {
299                    yield event?;
300                }
301            }
302        };
303
304        Ok(Box::pin(stream))
305    }
306
307    fn supported_models(&self) -> Vec<String> {
308        models::google::SUPPORTED_MODELS
309            .iter()
310            .map(|s| s.to_string())
311            .collect()
312    }
313
314    fn validate_request(&self, request: &LLMRequest) -> Result<(), LLMError> {
315        if request.previous_response_id.is_some() && request.response_store == Some(false) {
316            let formatted_error = error_display::format_llm_error(
317                "Gemini",
318                "Interactions with previous_interaction_id cannot set store=false",
319            );
320            return Err(LLMError::InvalidRequest {
321                message: formatted_error,
322                metadata: None,
323            });
324        }
325
326        if !models::google::SUPPORTED_MODELS
327            .iter()
328            .any(|m| *m == request.model)
329        {
330            let formatted_error = error_display::format_llm_error(
331                "Gemini",
332                &format!("Unsupported model: {}", request.model),
333            );
334            return Err(LLMError::InvalidRequest {
335                message: formatted_error,
336                metadata: None,
337            });
338        }
339
340        if let Some(max_tokens) = request.max_tokens {
341            let model = request.model.as_str();
342            let max_output_tokens = if model.contains("3") { 65536 } else { 8192 };
343
344            if max_tokens > max_output_tokens {
345                let formatted_error = error_display::format_llm_error(
346                    "Gemini",
347                    &format!(
348                        "Requested max_tokens ({}) exceeds model limit ({}) for {}",
349                        max_tokens, max_output_tokens, model
350                    ),
351                );
352                return Err(LLMError::InvalidRequest {
353                    message: formatted_error,
354                    metadata: None,
355                });
356            }
357        }
358
359        Ok(())
360    }
361}