vtcode_core/llm/providers/gemini/
llm_provider.rs1use super::helpers::InteractionStreamState;
2use super::*;
3use crate::llm::providers::shared::{StreamAssemblyError, extract_data_payload, find_sse_boundary};
4
5#[async_trait]
6impl LLMProvider for GeminiProvider {
7 fn name(&self) -> &str {
8 "gemini"
9 }
10
11 fn supports_streaming(&self) -> bool {
12 true
13 }
14
15 fn supports_reasoning(&self, model: &str) -> bool {
16 models::google::REASONING_MODELS.contains(&model)
19 || self
20 .model_behavior
21 .as_ref()
22 .and_then(|b| b.model_supports_reasoning)
23 .unwrap_or(false)
24 }
25
26 fn supports_reasoning_effort(&self, model: &str) -> bool {
27 models::google::REASONING_MODELS.contains(&model)
29 || self
30 .model_behavior
31 .as_ref()
32 .and_then(|b| b.model_supports_reasoning_effort)
33 .unwrap_or(false)
34 }
35
36 fn supports_context_caching(&self, model: &str) -> bool {
37 models::google::CACHING_MODELS.contains(&model)
38 }
39
40 fn effective_context_size(&self, model: &str) -> usize {
41 if model.contains("gemini-3.1") {
42 1_048_576
43 } else if model.contains("3") || model.contains("1.5-pro") {
44 2_097_152
45 } else {
46 1_048_576
47 }
48 }
49
50 async fn generate(&self, request: LLMRequest) -> Result<LLMResponse, LLMError> {
51 let model = request.model.clone();
52 if self.should_use_interactions(&request) {
53 let interaction_request = self.convert_to_interaction_request(&request)?;
54 let url = format!("{}/interactions", self.base_url);
55 let response = self
56 .http_client
57 .post(&url)
58 .header("x-goog-api-key", self.api_key.as_ref())
59 .json(&interaction_request)
60 .send()
61 .await
62 .map_err(|e| format_network_error("Gemini", &e))?;
63
64 if !response.status().is_success() {
65 let status = response.status();
66 let error_text = response.text().await.unwrap_or_default();
67 return Err(Self::handle_http_error(status, &error_text));
68 }
69
70 let interaction_response: Interaction = response
71 .json()
72 .await
73 .map_err(|e| format_parse_error("Gemini", &e))?;
74
75 return Self::convert_from_interaction_response(interaction_response, model);
76 }
77
78 let gemini_request = self.convert_to_gemini_request(&request)?;
79
80 let url = format!("{}/models/{}:generateContent", self.base_url, request.model);
81
82 let response = self
83 .http_client
84 .post(&url)
85 .header("x-goog-api-key", self.api_key.as_ref())
86 .json(&gemini_request)
87 .send()
88 .await
89 .map_err(|e| format_network_error("Gemini", &e))?;
90
91 if !response.status().is_success() {
92 let status = response.status();
93 let error_text = response.text().await.unwrap_or_default();
94 return Err(Self::handle_http_error(status, &error_text));
95 }
96
97 let gemini_response: GenerateContentResponse = response
98 .json()
99 .await
100 .map_err(|e| format_parse_error("Gemini", &e))?;
101
102 Self::convert_from_gemini_response(gemini_response, model)
103 }
104
105 async fn stream(&self, request: LLMRequest) -> Result<LLMStream, LLMError> {
106 if self.should_use_interactions(&request) {
107 let model = request.model.clone();
108 let interaction_request = self.convert_to_interaction_request(&request)?;
109 let url = format!("{}/interactions?alt=sse", self.base_url);
110 let response = self
111 .http_client
112 .post(&url)
113 .header("x-goog-api-key", self.api_key.as_ref())
114 .json(&interaction_request)
115 .send()
116 .await
117 .map_err(|e| format_network_error("Gemini", &e))?;
118
119 if !response.status().is_success() {
120 let status = response.status();
121 let error_text = response.text().await.unwrap_or_default();
122 return Err(Self::handle_http_error(status, &error_text));
123 }
124
125 let stream = {
126 try_stream! {
127 let mut body_stream = response.bytes_stream();
128 let mut buffer = String::new();
129 let mut state = InteractionStreamState::default();
130
131 while let Some(chunk_result) = body_stream.next().await {
132 let chunk = chunk_result.map_err(|err| {
133 let formatted_error = error_display::format_llm_error(
134 "Gemini",
135 &format!("Streaming error: {}", err),
136 );
137 LLMError::Network {
138 message: formatted_error,
139 metadata: None,
140 }
141 })?;
142
143 buffer.push_str(&String::from_utf8_lossy(&chunk));
144
145 while let Some((split_idx, delimiter_len)) = find_sse_boundary(&buffer) {
146 let event = buffer[..split_idx].to_string();
147 buffer.drain(..split_idx + delimiter_len);
148
149 let Some(data_payload) = extract_data_payload(&event) else {
150 continue;
151 };
152
153 let trimmed_payload = data_payload.trim();
154 if trimmed_payload.is_empty() || trimmed_payload == "[DONE]" {
155 continue;
156 }
157
158 let payload: Value = serde_json::from_str(trimmed_payload)
159 .map_err(|err| {
160 StreamAssemblyError::InvalidPayload(err.to_string())
161 .into_llm_error("Gemini")
162 })?;
163
164 for stream_event in Self::apply_interaction_stream_payload(&mut state, &payload)? {
165 yield stream_event;
166 }
167 }
168 }
169
170 if !state.completed {
171 let formatted_error = error_display::format_llm_error(
172 "Gemini",
173 "Interactions stream ended without an interaction.complete event",
174 );
175 Err(LLMError::Provider {
176 message: formatted_error,
177 metadata: None,
178 })?;
179 }
180
181 let response =
182 Self::finalize_interaction_stream_state(state, model)?;
183 yield LLMStreamEvent::Completed { response: Box::new(response) };
184 }
185 };
186 return Ok(Box::pin(stream));
187 }
188
189 let model = request.model.clone();
190 let gemini_request = self.convert_to_gemini_request(&request)?;
191
192 let url = format!(
193 "{}/models/{}:streamGenerateContent",
194 self.base_url, request.model
195 );
196
197 let response = self
198 .http_client
199 .post(&url)
200 .header("x-goog-api-key", self.api_key.as_ref())
201 .json(&gemini_request)
202 .send()
203 .await
204 .map_err(|e| format_network_error("Gemini", &e))?;
205
206 if !response.status().is_success() {
207 let status = response.status();
208 let error_text = response.text().await.unwrap_or_default();
209 return Err(Self::handle_http_error(status, &error_text));
210 }
211
212 let (event_tx, event_rx) = mpsc::unbounded_channel::<Result<LLMStreamEvent, LLMError>>();
213 let completion_sender = event_tx.clone();
214
215 let streaming_timeout = self.timeouts.streaming_ceiling_seconds;
216
217 let model_clone = model.clone();
218 tokio::spawn(async move {
219 let config = StreamingConfig::with_total_timeout(streaming_timeout);
220 let mut processor = StreamingProcessor::with_config(config);
221 let event_sender = completion_sender.clone();
222 let mut aggregator =
223 crate::llm::providers::shared::StreamAggregator::new(model_clone.clone());
224
225 #[expect(clippy::collapsible_if)]
226 let mut on_chunk = |chunk: &str| -> Result<(), StreamingError> {
227 if chunk.is_empty() {
228 return Ok(());
229 }
230
231 if let Some(delta) = Self::apply_stream_delta(&mut aggregator.content, chunk) {
232 if delta.is_empty() {
233 return Ok(());
234 }
235
236 for event in aggregator.sanitizer.process_chunk(&delta) {
237 event_sender.send(Ok(event)).map_err(|_| {
238 StreamingError::StreamingError {
239 message: "Streaming consumer dropped".to_string(),
240 partial_content: Some(chunk.to_string()),
241 }
242 })?;
243 }
244 }
245 Ok(())
246 };
247
248 let result = processor.process_stream(response, &mut on_chunk).await;
249 match result {
250 Ok(mut streaming_response) => {
251 if streaming_response.candidates.is_empty()
252 && !aggregator.content.trim().is_empty()
253 {
254 streaming_response.candidates.push(StreamingCandidate {
255 content: Content {
256 role: "model".to_string(),
257 parts: vec![Part::Text {
258 text: aggregator.content.clone(),
259 thought_signature: None,
260 }],
261 },
262 finish_reason: None,
263 index: Some(0),
264 });
265 }
266
267 match Self::convert_from_streaming_response(streaming_response, model_clone) {
268 Ok(mut final_response) => {
269 let aggregator_response = aggregator.finalize();
270 if final_response.reasoning.is_none() {
271 final_response.reasoning = aggregator_response.reasoning;
272 }
273 if final_response.content.is_none() {
274 final_response.content = aggregator_response.content;
275 }
276
277 let _ = completion_sender.send(Ok(LLMStreamEvent::Completed {
278 response: Box::new(final_response),
279 }));
280 }
281 Err(err) => {
282 let _ = completion_sender.send(Err(err));
283 }
284 }
285 }
286 Err(error) => {
287 let mapped = Self::map_streaming_error(error);
288 let _ = completion_sender.send(Err(mapped));
289 }
290 }
291 });
292
293 drop(event_tx);
294
295 let stream = {
296 let mut receiver = event_rx;
297 try_stream! {
298 while let Some(event) = receiver.recv().await {
299 yield event?;
300 }
301 }
302 };
303
304 Ok(Box::pin(stream))
305 }
306
307 fn supported_models(&self) -> Vec<String> {
308 models::google::SUPPORTED_MODELS
309 .iter()
310 .map(|s| s.to_string())
311 .collect()
312 }
313
314 fn validate_request(&self, request: &LLMRequest) -> Result<(), LLMError> {
315 if request.previous_response_id.is_some() && request.response_store == Some(false) {
316 let formatted_error = error_display::format_llm_error(
317 "Gemini",
318 "Interactions with previous_interaction_id cannot set store=false",
319 );
320 return Err(LLMError::InvalidRequest {
321 message: formatted_error,
322 metadata: None,
323 });
324 }
325
326 if !models::google::SUPPORTED_MODELS
327 .iter()
328 .any(|m| *m == request.model)
329 {
330 let formatted_error = error_display::format_llm_error(
331 "Gemini",
332 &format!("Unsupported model: {}", request.model),
333 );
334 return Err(LLMError::InvalidRequest {
335 message: formatted_error,
336 metadata: None,
337 });
338 }
339
340 if let Some(max_tokens) = request.max_tokens {
341 let model = request.model.as_str();
342 let max_output_tokens = if model.contains("3") { 65536 } else { 8192 };
343
344 if max_tokens > max_output_tokens {
345 let formatted_error = error_display::format_llm_error(
346 "Gemini",
347 &format!(
348 "Requested max_tokens ({}) exceeds model limit ({}) for {}",
349 max_tokens, max_output_tokens, model
350 ),
351 );
352 return Err(LLMError::InvalidRequest {
353 message: formatted_error,
354 metadata: None,
355 });
356 }
357 }
358
359 Ok(())
360 }
361}