1use crate::attachments::validate_request_attachments;
12use crate::impls::anthropic::{
13 MODEL_FABLE_5, MODEL_OPUS_46, MODEL_OPUS_47, MODEL_OPUS_48, MODEL_SONNET_46,
14 data as anthropic_data,
15};
16use crate::impls::gemini::data::{
17 ApiContent, ApiFunctionCallingConfig, ApiGenerateContentRequest, ApiGenerateContentResponse,
18 ApiGenerationConfig, ApiPart, ApiUsageMetadata, build_api_contents, build_content_blocks,
19 convert_tools_to_config, gemini_response_schema, map_finish_reason, map_thinking_config,
20 stream_gemini_response,
21};
22use crate::provider::LlmProvider;
23use crate::streaming::{StreamBox, StreamDelta, StreamErrorKind};
24use agent_sdk_foundation::llm::{
25 ChatOutcome, ChatRequest, ChatResponse, ThinkingConfig, ThinkingMode, Usage,
26};
27use anyhow::Result;
28use async_trait::async_trait;
29use futures::StreamExt;
30use reqwest::StatusCode;
31
32pub const MODEL_GEMINI_3_FLASH: &str = "gemini-3-flash-preview";
33pub const MODEL_GEMINI_31_PRO: &str = "gemini-3.1-pro-preview";
34
35pub const MODEL_GEMINI_3_PRO: &str = "gemini-3.0-pro";
37
38const VERTEX_ANTHROPIC_VERSION: &str = "vertex-2023-10-16";
40const DEFAULT_SAFE_MAX_OUTPUT_TOKENS: u32 = 32_000;
41
42const fn vertex_cache_control() -> anthropic_data::ApiCacheControl {
43 anthropic_data::ApiCacheControl::ephemeral()
44}
45
46#[derive(Clone)]
55pub struct VertexProvider {
56 client: reqwest::Client,
57 access_token: String,
58 project_id: String,
59 region: String,
60 model: String,
61 thinking: Option<ThinkingConfig>,
62}
63
64impl VertexProvider {
65 #[must_use]
67 pub fn new(access_token: String, project_id: String, region: String, model: String) -> Self {
68 let client = reqwest::Client::builder()
69 .connect_timeout(std::time::Duration::from_secs(30))
70 .tcp_keepalive(std::time::Duration::from_secs(30))
71 .build()
72 .unwrap_or_default();
73
74 Self {
75 client,
76 access_token,
77 project_id,
78 region,
79 model,
80 thinking: None,
81 }
82 }
83
84 #[must_use]
86 pub fn flash(access_token: String, project_id: String, region: String) -> Self {
87 Self::new(
88 access_token,
89 project_id,
90 region,
91 MODEL_GEMINI_3_FLASH.to_owned(),
92 )
93 }
94
95 #[must_use]
97 pub fn pro(access_token: String, project_id: String, region: String) -> Self {
98 Self::new(
99 access_token,
100 project_id,
101 region,
102 MODEL_GEMINI_31_PRO.to_owned(),
103 )
104 }
105
106 fn is_claude_model(&self) -> bool {
108 self.model.starts_with("claude-")
109 }
110
111 fn base_url(&self, publisher: &str) -> String {
116 let domain = if self.region == "global" {
117 "aiplatform.googleapis.com".to_owned()
118 } else {
119 format!("{}-aiplatform.googleapis.com", self.region)
120 };
121 format!(
122 "https://{domain}/v1/projects/{project}/locations/{region}/publishers/{publisher}/models/{model}",
123 domain = domain,
124 region = self.region,
125 project = self.project_id,
126 publisher = publisher,
127 model = self.model,
128 )
129 }
130
131 #[must_use]
133 pub const fn with_thinking(mut self, thinking: ThinkingConfig) -> Self {
134 self.thinking = Some(thinking);
135 self
136 }
137
138 fn requires_anthropic_adaptive_thinking(&self) -> bool {
139 matches!(
140 self.model.as_str(),
141 MODEL_SONNET_46 | MODEL_OPUS_46 | MODEL_OPUS_47 | MODEL_OPUS_48 | MODEL_FABLE_5
142 )
143 }
144
145 fn build_cached_vertex_claude_messages(
146 request: &ChatRequest,
147 ) -> Vec<anthropic_data::ApiMessage> {
148 let mut messages = anthropic_data::build_api_messages(request);
149 anthropic_data::apply_cache_control_to_last_user_message(
150 &mut messages,
151 vertex_cache_control(),
152 );
153 messages
154 }
155
156 fn build_vertex_claude_system_prompt(
157 system: &str,
158 ) -> Option<anthropic_data::ApiSystemPrompt<'_>> {
159 anthropic_data::build_api_system_prompt(system, Some(vertex_cache_control()))
160 }
161
162 fn map_claude_response(api_response: anthropic_data::ApiResponse) -> ChatResponse {
163 let content = anthropic_data::map_content_blocks(api_response.content);
164 let stop_reason = api_response
165 .stop_reason
166 .as_ref()
167 .map(anthropic_data::map_stop_reason);
168
169 ChatResponse {
170 id: api_response.id,
171 content,
172 model: api_response.model,
173 stop_reason,
174 usage: Usage {
175 input_tokens: api_response.usage.total_input_tokens(),
176 output_tokens: api_response.usage.output,
177 cached_input_tokens: api_response.usage.cached_input_tokens(),
178 cache_creation_input_tokens: api_response.usage.cache_creation_input_tokens(),
179 },
180 }
181 }
182}
183
184#[async_trait]
185impl LlmProvider for VertexProvider {
186 async fn chat(&self, request: ChatRequest) -> Result<ChatOutcome> {
187 if self.is_claude_model() {
188 return self.chat_claude(request).await;
189 }
190 self.chat_gemini(request).await
191 }
192
193 fn chat_stream(&self, request: ChatRequest) -> StreamBox<'_> {
194 if self.is_claude_model() {
195 return self.chat_stream_claude(request);
196 }
197 self.chat_stream_gemini(request)
198 }
199
200 fn validate_thinking_config(&self, thinking: Option<&ThinkingConfig>) -> Result<()> {
201 let Some(thinking) = thinking else {
202 return Ok(());
203 };
204
205 if self
206 .capabilities()
207 .is_some_and(|caps| !caps.supports_thinking)
208 {
209 return Err(anyhow::anyhow!(
210 "thinking is not supported for provider={} model={}",
211 self.provider(),
212 self.model()
213 ));
214 }
215
216 if matches!(thinking.mode, ThinkingMode::Adaptive)
217 && !self
218 .capabilities()
219 .is_some_and(|caps| caps.supports_adaptive_thinking)
220 {
221 return Err(anyhow::anyhow!(
222 "adaptive thinking is not supported for provider={} model={}",
223 self.provider(),
224 self.model()
225 ));
226 }
227
228 if self.is_claude_model()
229 && self.requires_anthropic_adaptive_thinking()
230 && matches!(thinking.mode, ThinkingMode::Enabled { .. })
231 {
232 return Err(anyhow::anyhow!(
233 "budget_tokens thinking is deprecated for provider={} model={}; use ThinkingConfig::adaptive() instead",
234 self.provider(),
235 self.model()
236 ));
237 }
238
239 Ok(())
240 }
241
242 fn model(&self) -> &str {
243 &self.model
244 }
245
246 fn provider(&self) -> &'static str {
247 "vertex"
248 }
249
250 fn configured_thinking(&self) -> Option<&ThinkingConfig> {
251 self.thinking.as_ref()
252 }
253
254 fn default_max_tokens(&self) -> u32 {
255 let provider = if self.is_claude_model() {
256 "anthropic"
257 } else {
258 "gemini"
259 };
260 let model_max = self
261 .capabilities()
262 .and_then(|caps| caps.max_output_tokens)
263 .or_else(|| {
264 crate::model_capabilities::default_max_output_tokens(provider, self.model())
265 })
266 .unwrap_or(4096);
267 model_max.clamp(4096, DEFAULT_SAFE_MAX_OUTPUT_TOKENS)
268 }
269}
270
271impl VertexProvider {
276 #[allow(clippy::too_many_lines)]
277 async fn chat_gemini(&self, request: ChatRequest) -> Result<ChatOutcome> {
278 let thinking = match self.resolve_thinking_config(request.thinking.as_ref()) {
279 Ok(thinking) => thinking,
280 Err(error) => return Ok(ChatOutcome::InvalidRequest(error.to_string())),
281 };
282 if let Err(error) = validate_request_attachments(self.provider(), self.model(), &request) {
283 return Ok(ChatOutcome::InvalidRequest(error.to_string()));
284 }
285 let contents = build_api_contents(&request.messages);
286 let tools = request.tools.map(convert_tools_to_config);
287 let tool_config = request
288 .tool_choice
289 .as_ref()
290 .map(ApiFunctionCallingConfig::from_tool_choice);
291 let system_instruction = if request.system.is_empty() {
292 None
293 } else {
294 Some(ApiContent {
295 role: None,
296 parts: vec![ApiPart::Text {
297 text: request.system.clone(),
298 thought_signature: None,
299 }],
300 })
301 };
302
303 let thinking_config = thinking.as_ref().map(map_thinking_config);
304 let (response_mime_type, response_schema) =
305 request.response_format.as_ref().map_or((None, None), |rf| {
306 (
307 Some("application/json"),
308 Some(gemini_response_schema(&rf.schema)),
309 )
310 });
311
312 let api_request = ApiGenerateContentRequest {
313 contents: &contents,
314 system_instruction: system_instruction.as_ref(),
315 tools: tools.as_ref().map(std::slice::from_ref),
316 tool_config,
317 generation_config: Some(ApiGenerationConfig {
318 max_output_tokens: Some(request.max_tokens),
319 thinking_config,
320 response_mime_type,
321 response_schema,
322 }),
323 cached_content: request.cached_content.as_deref(),
324 };
325
326 log::debug!(
327 "Vertex AI LLM request model={} max_tokens={}",
328 self.model,
329 request.max_tokens
330 );
331
332 let url = format!("{}:generateContent", self.base_url("google"));
333
334 let response = self
335 .client
336 .post(&url)
337 .header("Content-Type", "application/json")
338 .bearer_auth(&self.access_token)
339 .json(&api_request)
340 .send()
341 .await
342 .map_err(|e| anyhow::anyhow!("request failed: {e}"))?;
343
344 let status = response.status();
345 let bytes = response
346 .bytes()
347 .await
348 .map_err(|e| anyhow::anyhow!("failed to read response body: {e}"))?;
349
350 log::debug!(
351 "Vertex AI LLM response status={} body_len={}",
352 status,
353 bytes.len()
354 );
355
356 if status == StatusCode::TOO_MANY_REQUESTS {
357 return Ok(ChatOutcome::RateLimited);
358 }
359
360 if status.is_server_error() {
361 let body = String::from_utf8_lossy(&bytes);
362 log::error!("Vertex AI server error status={status} body={body}");
363 return Ok(ChatOutcome::ServerError(body.into_owned()));
364 }
365
366 if status.is_client_error() {
367 let body = String::from_utf8_lossy(&bytes);
368 log::warn!("Vertex AI client error status={status} body={body}");
369 return Ok(ChatOutcome::InvalidRequest(body.into_owned()));
370 }
371
372 let api_response: ApiGenerateContentResponse = serde_json::from_slice(&bytes)
373 .map_err(|e| anyhow::anyhow!("failed to parse response: {e}"))?;
374
375 let candidate = api_response
376 .candidates
377 .into_iter()
378 .next()
379 .ok_or_else(|| anyhow::anyhow!("no candidates in response"))?;
380
381 let content = build_content_blocks(&candidate.content);
382
383 if content.is_empty() && !candidate.content.parts.is_empty() {
384 log::warn!(
385 "Vertex AI parts not converted to content blocks raw_parts={:?}",
386 candidate.content.parts
387 );
388 }
389
390 let has_tool_calls = content
391 .iter()
392 .any(|b| matches!(b, agent_sdk_foundation::llm::ContentBlock::ToolUse { .. }));
393
394 let stop_reason = candidate
395 .finish_reason
396 .as_ref()
397 .map(|r| map_finish_reason(r, has_tool_calls));
398
399 let usage = api_response
400 .usage_metadata
401 .unwrap_or(ApiUsageMetadata {
402 prompt: 0,
403 candidates: 0,
404 cached_content: 0,
405 })
406 .into_usage();
407
408 Ok(ChatOutcome::Success(ChatResponse {
409 id: String::new(),
410 content,
411 model: self.model.clone(),
412 stop_reason,
413 usage,
414 }))
415 }
416
417 fn chat_stream_gemini(&self, request: ChatRequest) -> StreamBox<'_> {
418 Box::pin(async_stream::stream! {
419 let thinking = match self.resolve_thinking_config(request.thinking.as_ref()) {
420 Ok(thinking) => thinking,
421 Err(error) => {
422 yield Ok(StreamDelta::Error {
423 message: error.to_string(),
424 kind: StreamErrorKind::InvalidRequest,
425 });
426 return;
427 }
428 };
429 if let Err(error) = validate_request_attachments(self.provider(), self.model(), &request) {
430 yield Ok(StreamDelta::Error {
431 message: error.to_string(),
432 kind: StreamErrorKind::InvalidRequest,
433 });
434 return;
435 }
436 let contents = build_api_contents(&request.messages);
437 let tools = request.tools.map(convert_tools_to_config);
438 let tool_config = request
439 .tool_choice
440 .as_ref()
441 .map(ApiFunctionCallingConfig::from_tool_choice);
442 let system_instruction = if request.system.is_empty() {
443 None
444 } else {
445 Some(ApiContent {
446 role: None,
447 parts: vec![ApiPart::Text {
448 text: request.system.clone(),
449 thought_signature: None,
450 }],
451 })
452 };
453
454 let thinking_config = thinking.as_ref().map(map_thinking_config);
455 let (response_mime_type, response_schema) = request
456 .response_format
457 .as_ref()
458 .map_or((None, None), |rf| {
459 (
460 Some("application/json"),
461 Some(gemini_response_schema(&rf.schema)),
462 )
463 });
464
465 let api_request = ApiGenerateContentRequest {
466 contents: &contents,
467 system_instruction: system_instruction.as_ref(),
468 tools: tools.as_ref().map(std::slice::from_ref),
469 tool_config,
470 generation_config: Some(ApiGenerationConfig {
471 max_output_tokens: Some(request.max_tokens),
472 thinking_config,
473 response_mime_type,
474 response_schema,
475 }),
476 cached_content: request.cached_content.as_deref(),
477 };
478
479 log::debug!(
480 "Vertex AI streaming LLM request model={} max_tokens={}",
481 self.model,
482 request.max_tokens
483 );
484
485 let url = format!("{}:streamGenerateContent?alt=sse", self.base_url("google"));
486
487 let Ok(response) = self
488 .client
489 .post(&url)
490 .header("Content-Type", "application/json")
491 .bearer_auth(&self.access_token)
492 .json(&api_request)
493 .send()
494 .await
495 else {
496 yield Err(anyhow::anyhow!("request failed"));
497 return;
498 };
499
500 let status = response.status();
501 if !status.is_success() {
502 let body = response.text().await.unwrap_or_default();
503 let kind = if status == StatusCode::TOO_MANY_REQUESTS {
504 StreamErrorKind::RateLimited
505 } else if status.is_server_error() {
506 StreamErrorKind::ServerError
507 } else {
508 StreamErrorKind::InvalidRequest
509 };
510 log::warn!("Vertex AI error status={status} body={body}");
511 yield Ok(StreamDelta::Error {
512 message: body,
513 kind,
514 });
515 return;
516 }
517
518 let mut inner = stream_gemini_response(response);
519 while let Some(item) = futures::StreamExt::next(&mut inner).await {
520 yield item;
521 }
522 })
523 }
524}
525
526impl VertexProvider {
531 async fn chat_claude(&self, request: ChatRequest) -> Result<ChatOutcome> {
532 let thinking_config = match self.resolve_thinking_config(request.thinking.as_ref()) {
533 Ok(thinking) => thinking,
534 Err(error) => return Ok(ChatOutcome::InvalidRequest(error.to_string())),
535 };
536 if let Err(error) = validate_request_attachments(self.provider(), self.model(), &request) {
537 return Ok(ChatOutcome::InvalidRequest(error.to_string()));
538 }
539 let messages = Self::build_cached_vertex_claude_messages(&request);
540 let tools = anthropic_data::build_api_tools(&request);
541 let thinking = thinking_config
542 .as_ref()
543 .map(anthropic_data::ApiThinkingConfig::from_thinking_config);
544 let output_config = thinking_config
545 .as_ref()
546 .and_then(|t| t.effort)
547 .map(|effort| anthropic_data::ApiOutputConfig { effort });
548 let system = Self::build_vertex_claude_system_prompt(&request.system);
549 let tool_choice = request
550 .tool_choice
551 .as_ref()
552 .map(anthropic_data::ApiToolChoice::from_tool_choice);
553
554 let api_request = anthropic_data::ApiMessagesRequest {
555 model: None, max_tokens: request.max_tokens,
557 system,
558 messages: &messages,
559 tools: tools.as_deref(),
560 tool_choice,
561 stream: false,
562 thinking,
563 output_config,
564 anthropic_version: Some(VERTEX_ANTHROPIC_VERSION),
565 };
566
567 log::debug!(
568 "Vertex AI (Claude) LLM request model={} max_tokens={}",
569 self.model,
570 request.max_tokens
571 );
572
573 if log::log_enabled!(log::Level::Debug) {
574 match serde_json::to_string_pretty(&api_request) {
575 Ok(json) => log::debug!("Vertex AI (Claude) request payload:\n{json}"),
576 Err(e) => log::debug!("Failed to serialize request for logging: {e}"),
577 }
578 }
579
580 let url = format!("{}:rawPredict", self.base_url("anthropic"));
581
582 let response = self
583 .client
584 .post(&url)
585 .header("Content-Type", "application/json")
586 .bearer_auth(&self.access_token)
587 .json(&api_request)
588 .send()
589 .await
590 .map_err(|e| anyhow::anyhow!("request failed: {e}"))?;
591
592 let status = response.status();
593 let bytes = response
594 .bytes()
595 .await
596 .map_err(|e| anyhow::anyhow!("failed to read response body: {e}"))?;
597
598 log::debug!(
599 "Vertex AI (Claude) response status={} body_len={}",
600 status,
601 bytes.len()
602 );
603
604 if status == StatusCode::TOO_MANY_REQUESTS {
605 return Ok(ChatOutcome::RateLimited);
606 }
607
608 if status.is_server_error() {
609 let body = String::from_utf8_lossy(&bytes);
610 log::error!("Vertex AI (Claude) server error status={status} body={body}");
611 return Ok(ChatOutcome::ServerError(body.into_owned()));
612 }
613
614 if status.is_client_error() {
615 let body = String::from_utf8_lossy(&bytes);
616 log::warn!("Vertex AI (Claude) client error status={status} body={body}");
617 return Ok(ChatOutcome::InvalidRequest(body.into_owned()));
618 }
619
620 let api_response: anthropic_data::ApiResponse = serde_json::from_slice(&bytes)
621 .map_err(|e| anyhow::anyhow!("failed to parse response: {e}"))?;
622
623 log::debug!(
624 "Vertex AI (Claude) response: id={} model={} stop_reason={:?} usage={{input_tokens={}, output_tokens={}}} content_blocks={}",
625 api_response.id,
626 api_response.model,
627 api_response.stop_reason,
628 api_response.usage.total_input_tokens(),
629 api_response.usage.output,
630 api_response.content.len()
631 );
632
633 Ok(ChatOutcome::Success(Self::map_claude_response(
634 api_response,
635 )))
636 }
637
638 #[allow(clippy::too_many_lines)]
639 fn chat_stream_claude(&self, request: ChatRequest) -> StreamBox<'_> {
640 Box::pin(async_stream::stream! {
641 let thinking_config = match self.resolve_thinking_config(request.thinking.as_ref()) {
642 Ok(thinking) => thinking,
643 Err(error) => {
644 yield Ok(StreamDelta::Error {
645 message: error.to_string(),
646 kind: StreamErrorKind::InvalidRequest,
647 });
648 return;
649 }
650 };
651 if let Err(error) = validate_request_attachments(self.provider(), self.model(), &request) {
652 yield Ok(StreamDelta::Error {
653 message: error.to_string(),
654 kind: StreamErrorKind::InvalidRequest,
655 });
656 return;
657 }
658 let messages = Self::build_cached_vertex_claude_messages(&request);
659 let tools = anthropic_data::build_api_tools(&request);
660 let thinking = thinking_config
661 .as_ref()
662 .map(anthropic_data::ApiThinkingConfig::from_thinking_config);
663 let output_config = thinking_config
664 .as_ref()
665 .and_then(|t| t.effort)
666 .map(|effort| anthropic_data::ApiOutputConfig { effort });
667 let system = Self::build_vertex_claude_system_prompt(&request.system);
668 let tool_choice = request
669 .tool_choice
670 .as_ref()
671 .map(anthropic_data::ApiToolChoice::from_tool_choice);
672
673 let api_request = anthropic_data::ApiMessagesRequest {
674 model: None, max_tokens: request.max_tokens,
676 system,
677 messages: &messages,
678 tools: tools.as_deref(),
679 tool_choice,
680 stream: true,
681 thinking,
682 output_config,
683 anthropic_version: Some(VERTEX_ANTHROPIC_VERSION),
684 };
685
686 log::debug!(
687 "Vertex AI (Claude) streaming request model={} max_tokens={}",
688 self.model,
689 request.max_tokens
690 );
691
692 if log::log_enabled!(log::Level::Debug) {
693 match serde_json::to_string_pretty(&api_request) {
694 Ok(json) => log::debug!("Vertex AI (Claude) streaming request payload:\n{json}"),
695 Err(e) => log::debug!("Failed to serialize request for logging: {e}"),
696 }
697 }
698
699 let url = format!("{}:streamRawPredict", self.base_url("anthropic"));
700
701 let response = match self
702 .client
703 .post(&url)
704 .header("Content-Type", "application/json")
705 .bearer_auth(&self.access_token)
706 .json(&api_request)
707 .send()
708 .await
709 {
710 Ok(r) => r,
711 Err(e) => {
712 yield Err(anyhow::anyhow!("request failed: {e}"));
713 return;
714 }
715 };
716
717 let status = response.status();
718
719 if status == StatusCode::TOO_MANY_REQUESTS {
720 yield Ok(StreamDelta::Error {
721 message: "Rate limited".to_string(),
722 kind: StreamErrorKind::RateLimited,
723 });
724 return;
725 }
726
727 if status.is_server_error() {
728 let body = response.text().await.unwrap_or_default();
729 log::error!("Vertex AI (Claude) server error status={status} body={body}");
730 yield Ok(StreamDelta::Error {
731 message: body,
732 kind: StreamErrorKind::ServerError,
733 });
734 return;
735 }
736
737 if status.is_client_error() {
738 let body = response.text().await.unwrap_or_default();
739 log::warn!("Vertex AI (Claude) client error status={status} body={body}");
740 yield Ok(StreamDelta::Error {
741 message: body,
742 kind: StreamErrorKind::InvalidRequest,
743 });
744 return;
745 }
746
747 let mut stream = response.bytes_stream();
749 let mut buffer = String::new();
750 let mut input_tokens: u32 = 0;
751 let mut output_tokens: u32 = 0;
752 let mut cached_input_tokens: u32 = 0;
753 let mut cache_creation_input_tokens: u32 = 0;
754 let mut tool_ids: std::collections::HashMap<usize, String> =
755 std::collections::HashMap::new();
756 let mut received_message_stop = false;
757 let mut pending_stop_reason: Option<agent_sdk_foundation::llm::StopReason> = None;
758
759 while let Some(chunk_result) = stream.next().await {
760 let Ok(chunk) = chunk_result else {
761 yield Err(anyhow::anyhow!("stream error"));
762 return;
763 };
764
765 buffer.push_str(&String::from_utf8_lossy(&chunk));
766
767 while let Some(event_block) = anthropic_data::take_next_sse_event(&mut buffer) {
769 if anthropic_data::is_message_stop_event(&event_block) {
770 received_message_stop = true;
771 }
772
773 if let Some(delta) = anthropic_data::parse_sse_event(
774 &event_block,
775 &mut input_tokens,
776 &mut output_tokens,
777 &mut cached_input_tokens,
778 &mut cache_creation_input_tokens,
779 &mut tool_ids,
780 &mut pending_stop_reason,
781 ) {
782 yield Ok(delta);
783 }
784 if anthropic_data::is_message_stop_event(&event_block) {
785 yield Ok(StreamDelta::Done {
786 stop_reason: pending_stop_reason.take(),
787 });
788 }
789 }
790 }
791
792 let remaining = buffer.trim();
794 if !remaining.is_empty() {
795 if anthropic_data::is_message_stop_event(remaining) {
796 received_message_stop = true;
797 }
798
799 if let Some(delta) = anthropic_data::parse_sse_event(
800 remaining,
801 &mut input_tokens,
802 &mut output_tokens,
803 &mut cached_input_tokens,
804 &mut cache_creation_input_tokens,
805 &mut tool_ids,
806 &mut pending_stop_reason,
807 ) {
808 yield Ok(delta);
809 }
810 if anthropic_data::is_message_stop_event(remaining) {
811 yield Ok(StreamDelta::Done {
812 stop_reason: pending_stop_reason.take(),
813 });
814 }
815 }
816
817 if !received_message_stop {
818 log::warn!(
819 "Vertex AI (Claude) SSE stream ended without message_stop"
820 );
821 yield Ok(StreamDelta::Error {
822 message: "Stream ended unexpectedly without completion".to_string(),
823 kind: StreamErrorKind::ServerError,
824 });
825 }
826 })
827 }
828}
829
830#[cfg(test)]
831mod tests {
832 use super::*;
833
834 #[test]
835 fn test_new_creates_provider() {
836 let provider = VertexProvider::new(
837 "token".to_string(),
838 "my-project".to_string(),
839 "us-central1".to_string(),
840 "custom-model".to_string(),
841 );
842
843 assert_eq!(provider.model(), "custom-model");
844 assert_eq!(provider.provider(), "vertex");
845 }
846
847 #[test]
848 fn test_flash_factory() {
849 let provider = VertexProvider::flash(
850 "token".to_string(),
851 "my-project".to_string(),
852 "us-central1".to_string(),
853 );
854
855 assert_eq!(provider.model(), MODEL_GEMINI_3_FLASH);
856 assert_eq!(provider.provider(), "vertex");
857 }
858
859 #[test]
860 fn test_pro_factory() {
861 let provider = VertexProvider::pro(
862 "token".to_string(),
863 "my-project".to_string(),
864 "us-central1".to_string(),
865 );
866
867 assert_eq!(provider.model(), MODEL_GEMINI_31_PRO);
868 assert_eq!(provider.provider(), "vertex");
869 }
870
871 #[test]
872 fn test_provider_is_cloneable() {
873 let provider = VertexProvider::new(
874 "token".to_string(),
875 "my-project".to_string(),
876 "us-central1".to_string(),
877 "test-model".to_string(),
878 );
879 let cloned = provider.clone();
880
881 assert_eq!(provider.model(), cloned.model());
882 assert_eq!(provider.provider(), cloned.provider());
883 }
884
885 #[test]
886 fn test_is_claude_model() {
887 let claude_provider = VertexProvider::new(
888 "token".to_string(),
889 "project".to_string(),
890 "us-central1".to_string(),
891 "claude-sonnet-4-20250514".to_string(),
892 );
893 assert!(claude_provider.is_claude_model());
894
895 let gemini_provider = VertexProvider::new(
896 "token".to_string(),
897 "project".to_string(),
898 "us-central1".to_string(),
899 "gemini-3-flash-preview".to_string(),
900 );
901 assert!(!gemini_provider.is_claude_model());
902 }
903
904 #[test]
905 fn test_base_url_gemini() {
906 let provider = VertexProvider::new(
907 "token".to_string(),
908 "my-project".to_string(),
909 "us-central1".to_string(),
910 "gemini-3-flash-preview".to_string(),
911 );
912
913 let url = provider.base_url("google");
914 assert_eq!(
915 url,
916 "https://us-central1-aiplatform.googleapis.com/v1/projects/my-project/locations/us-central1/publishers/google/models/gemini-3-flash-preview"
917 );
918 }
919
920 #[test]
921 fn test_base_url_claude() {
922 let provider = VertexProvider::new(
923 "token".to_string(),
924 "my-project".to_string(),
925 "us-central1".to_string(),
926 "claude-sonnet-4-20250514".to_string(),
927 );
928
929 let url = provider.base_url("anthropic");
930 assert_eq!(
931 url,
932 "https://us-central1-aiplatform.googleapis.com/v1/projects/my-project/locations/us-central1/publishers/anthropic/models/claude-sonnet-4-20250514"
933 );
934 }
935
936 #[test]
937 fn test_base_url_with_different_region() {
938 let provider = VertexProvider::new(
939 "token".to_string(),
940 "other-project".to_string(),
941 "europe-west4".to_string(),
942 "gemini-3.1-pro-preview".to_string(),
943 );
944
945 let url = provider.base_url("google");
946 assert!(url.starts_with("https://europe-west4-aiplatform.googleapis.com/"));
947 assert!(url.contains("/projects/other-project/"));
948 assert!(url.contains("/locations/europe-west4/"));
949 assert!(url.ends_with("/models/gemini-3.1-pro-preview"));
950 }
951
952 #[test]
953 fn test_base_url_global_region_has_no_prefix() {
954 let provider = VertexProvider::new(
955 "token".to_string(),
956 "my-project".to_string(),
957 "global".to_string(),
958 "gemini-3.1-pro-preview".to_string(),
959 );
960
961 let url = provider.base_url("google");
962 assert_eq!(
963 url,
964 "https://aiplatform.googleapis.com/v1/projects/my-project/locations/global/publishers/google/models/gemini-3.1-pro-preview"
965 );
966 }
967
968 #[test]
969 fn test_vertex_claude_46_rejects_budgeted_thinking() {
970 let provider = VertexProvider::new(
971 "token".to_string(),
972 "project".to_string(),
973 "global".to_string(),
974 MODEL_SONNET_46.to_string(),
975 );
976
977 let error = provider
978 .validate_thinking_config(Some(&ThinkingConfig::new(10_000)))
979 .unwrap_err();
980 assert!(error.to_string().contains("ThinkingConfig::adaptive()"));
981 }
982
983 #[test]
984 fn test_vertex_claude_opus_47_rejects_budgeted_thinking() {
985 let provider = VertexProvider::new(
986 "token".to_string(),
987 "project".to_string(),
988 "global".to_string(),
989 MODEL_OPUS_47.to_string(),
990 );
991
992 let error = provider
993 .validate_thinking_config(Some(&ThinkingConfig::new(10_000)))
994 .unwrap_err();
995 assert!(error.to_string().contains("ThinkingConfig::adaptive()"));
996 }
997
998 #[test]
999 fn test_vertex_claude_opus_48_rejects_budgeted_thinking() {
1000 let provider = VertexProvider::new(
1001 "token".to_string(),
1002 "project".to_string(),
1003 "global".to_string(),
1004 MODEL_OPUS_48.to_string(),
1005 );
1006
1007 let error = provider
1008 .validate_thinking_config(Some(&ThinkingConfig::new(10_000)))
1009 .unwrap_err();
1010 assert!(error.to_string().contains("ThinkingConfig::adaptive()"));
1011 }
1012
1013 #[test]
1014 fn test_vertex_claude_fable_5_rejects_budgeted_thinking() {
1015 let provider = VertexProvider::new(
1016 "token".to_string(),
1017 "project".to_string(),
1018 "global".to_string(),
1019 MODEL_FABLE_5.to_string(),
1020 );
1021
1022 let error = provider
1023 .validate_thinking_config(Some(&ThinkingConfig::new(10_000)))
1024 .unwrap_err();
1025 assert!(error.to_string().contains("ThinkingConfig::adaptive()"));
1026 }
1027
1028 #[test]
1029 fn test_model_constants() {
1030 assert_eq!(MODEL_GEMINI_3_FLASH, "gemini-3-flash-preview");
1031 assert_eq!(MODEL_GEMINI_31_PRO, "gemini-3.1-pro-preview");
1032 assert_eq!(MODEL_GEMINI_3_PRO, "gemini-3.0-pro");
1033 }
1034}